Merge branch 'collectd-5.2' into collectd-5.3
authorFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:23:32 +0000 (11:23 +0200)
committerFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:23:32 +0000 (11:23 +0200)
1  2 
src/plugin.c
src/write_graphite.c

diff --combined src/plugin.c
@@@ -1,6 -1,6 +1,6 @@@
  /**
   * collectd - src/plugin.c
 - * Copyright (C) 2005-2011  Florian octo Forster
 + * Copyright (C) 2005-2013  Florian octo Forster
   *
   * This program is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License as published by the
   **/
  
  #include "collectd.h"
 -#include "utils_complain.h"
 -
 -#include <ltdl.h>
 -
 -#if HAVE_PTHREAD_H
 -# include <pthread.h>
 -#endif
 -
  #include "common.h"
  #include "plugin.h"
  #include "configfile.h"
 +#include "filter_chain.h"
  #include "utils_avltree.h"
 +#include "utils_cache.h"
 +#include "utils_complain.h"
  #include "utils_llist.h"
  #include "utils_heap.h"
 -#include "utils_cache.h"
 -#include "filter_chain.h"
 +#include "utils_time.h"
 +
 +#if HAVE_PTHREAD_H
 +# include <pthread.h>
 +#endif
 +
 +#include <ltdl.h>
  
  /*
   * Private structures
@@@ -63,21 -63,12 +63,21 @@@ struct read_func_
        char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
        int rf_type;
 -      struct timespec rf_interval;
 -      struct timespec rf_effective_interval;
 -      struct timespec rf_next_read;
 +      cdtime_t rf_interval;
 +      cdtime_t rf_effective_interval;
 +      cdtime_t rf_next_read;
  };
  typedef struct read_func_s read_func_t;
  
 +struct write_queue_s;
 +typedef struct write_queue_s write_queue_t;
 +struct write_queue_s
 +{
 +      value_list_t *vl;
 +      plugin_ctx_t ctx;
 +      write_queue_t *next;
 +};
 +
  /*
   * Private variables
   */
@@@ -104,22 -95,12 +104,22 @@@ static pthread_cond_t  read_cond = PTHR
  static pthread_t      *read_threads = NULL;
  static int             read_threads_num = 0;
  
 +static write_queue_t  *write_queue_head;
 +static write_queue_t  *write_queue_tail;
 +static _Bool           write_loop = 1;
 +static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
 +static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
 +static pthread_t      *write_threads = NULL;
 +static size_t          write_threads_num = 0;
 +
  static pthread_key_t   plugin_ctx_key;
  static _Bool           plugin_ctx_key_initialized = 0;
  
  /*
   * Static functions
   */
 +static int plugin_dispatch_values_internal (value_list_t *vl);
 +
  static const char *plugin_get_dir (void)
  {
        if (plugindir == NULL)
@@@ -360,6 -341,13 +360,6 @@@ static int plugin_load_file (char *file
        return (0);
  }
  
 -static _Bool timeout_reached(struct timespec timeout)
 -{
 -      struct timeval now;
 -      gettimeofday(&now, NULL);
 -      return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
 -}
 -
  static void *plugin_read_thread (void __attribute__((unused)) *args)
  {
        while (read_loop != 0)
                }
                pthread_mutex_unlock (&read_lock);
  
 -              if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
 +              if (rf->rf_interval == 0)
                {
                        /* this should not happen, because the interval is set
                         * for each plugin when loading it
                         * XXX: issue a warning? */
 -                      now = cdtime ();
 -
 -                      CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
 -
 +                      rf->rf_interval = plugin_get_interval ();
                        rf->rf_effective_interval = rf->rf_interval;
  
 -                      CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
 +                      rf->rf_next_read = cdtime ();
                }
  
                /* sleep until this entry is due,
                 * pthread_cond_timedwait returns. */
                rc = 0;
                while ((read_loop != 0)
 -                              && !timeout_reached(rf->rf_next_read)
 +                              && (cdtime () < rf->rf_next_read)
                                && rc == 0)
                {
 +                      struct timespec ts = { 0 };
 +
 +                      CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
 +
                        rc = pthread_cond_timedwait (&read_cond, &read_lock,
 -                              &rf->rf_next_read);
 +                              &ts);
                }
  
                /* Must hold `read_lock' when accessing `rf->rf_type'. */
                 * intervals in which it will be called. */
                if (status != 0)
                {
 -                      rf->rf_effective_interval.tv_sec *= 2;
 -                      rf->rf_effective_interval.tv_nsec *= 2;
 -                      NORMALIZE_TIMESPEC (rf->rf_effective_interval);
 -
 -                      if (rf->rf_effective_interval.tv_sec >= 86400)
 -                      {
 -                              rf->rf_effective_interval.tv_sec = 86400;
 -                              rf->rf_effective_interval.tv_nsec = 0;
 -                      }
 +                      rf->rf_effective_interval *= 2;
 +                      if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
 +                              rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
  
                        NOTICE ("read-function of plugin `%s' failed. "
 -                                      "Will suspend it for %i seconds.",
 +                                      "Will suspend it for %.3f seconds.",
                                        rf->rf_name,
 -                                      (int) rf->rf_effective_interval.tv_sec);
 +                                      CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
                }
                else
                {
                now = cdtime ();
  
                DEBUG ("plugin_read_thread: Effective interval of the "
 -                              "%s plugin is %i.%09i.",
 +                              "%s plugin is %.3f seconds.",
                                rf->rf_name,
 -                              (int) rf->rf_effective_interval.tv_sec,
 -                              (int) rf->rf_effective_interval.tv_nsec);
 +                              CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
  
                /* Calculate the next (absolute) time at which this function
                 * should be called. */
 -              rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec
 -                      + rf->rf_effective_interval.tv_sec;
 -              rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec
 -                      + rf->rf_effective_interval.tv_nsec;
 -              NORMALIZE_TIMESPEC (rf->rf_next_read);
 +              rf->rf_next_read += rf->rf_effective_interval;
  
                /* Check, if `rf_next_read' is in the past. */
 -              if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now)
 +              if (rf->rf_next_read < now)
                {
                        /* `rf_next_read' is in the past. Insert `now'
                         * so this value doesn't trail off into the
                         * past too much. */
 -                      CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
 +                      rf->rf_next_read = now;
                }
  
 -              DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
 +              DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.",
                                rf->rf_name,
 -                              (int) rf->rf_next_read.tv_sec,
 -                              (int) rf->rf_next_read.tv_nsec);
 +                              CDTIME_T_TO_DOUBLE (rf->rf_next_read));
  
                /* Re-insert this read function into the heap again. */
                c_heap_insert (read_heap, rf);
@@@ -574,244 -573,6 +574,244 @@@ static void stop_read_threads (void
        read_threads_num = 0;
  } /* void stop_read_threads */
  
 +static void plugin_value_list_free (value_list_t *vl) /* {{{ */
 +{
 +      if (vl == NULL)
 +              return;
 +
 +      meta_data_destroy (vl->meta);
 +      sfree (vl->values);
 +      sfree (vl);
 +} /* }}} void plugin_value_list_free */
 +
 +static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
 +{
 +      value_list_t *vl;
 +
 +      if (vl_orig == NULL)
 +              return (NULL);
 +
 +      vl = malloc (sizeof (*vl));
 +      if (vl == NULL)
 +              return (NULL);
 +      memcpy (vl, vl_orig, sizeof (*vl));
 +
 +      vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
 +      if (vl->values == NULL)
 +      {
 +              plugin_value_list_free (vl);
 +              return (NULL);
 +      }
 +      memcpy (vl->values, vl_orig->values,
 +                      vl_orig->values_len * sizeof (*vl->values));
 +
 +      vl->meta = meta_data_clone (vl->meta);
 +      if ((vl_orig->meta != NULL) && (vl->meta == NULL))
 +      {
 +              plugin_value_list_free (vl);
 +              return (NULL);
 +      }
 +
 +      if (vl->time == 0)
 +              vl->time = cdtime ();
 +
 +      /* Fill in the interval from the thread context, if it is zero. */
 +      if (vl->interval == 0)
 +      {
 +              plugin_ctx_t ctx = plugin_get_ctx ();
 +
 +              if (ctx.interval != 0)
 +                      vl->interval = ctx.interval;
 +              else
 +              {
 +                      char name[6 * DATA_MAX_NAME_LEN];
 +                      FORMAT_VL (name, sizeof (name), vl);
 +                      ERROR ("plugin_value_list_clone: Unable to determine "
 +                                      "interval from context for "
 +                                      "value list \"%s\". "
 +                                      "This indicates a broken plugin. "
 +                                      "Please report this problem to the "
 +                                      "collectd mailing list or at "
 +                                      "<http://collectd.org/bugs/>.", name);
 +                      vl->interval = cf_get_default_interval ();
 +              }
 +      }
 +
 +      return (vl);
 +} /* }}} value_list_t *plugin_value_list_clone */
 +
 +static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
 +{
 +      write_queue_t *q;
 +
 +      q = malloc (sizeof (*q));
 +      if (q == NULL)
 +              return (ENOMEM);
 +      q->next = NULL;
 +
 +      q->vl = plugin_value_list_clone (vl);
 +      if (q->vl == NULL)
 +      {
 +              sfree (q);
 +              return (ENOMEM);
 +      }
 +
 +      /* Store context of caller (read plugin); otherwise, it would not be
 +       * available to the write plugins when actually dispatching the
 +       * value-list later on. */
 +      q->ctx = plugin_get_ctx ();
 +
 +      pthread_mutex_lock (&write_lock);
 +
 +      if (write_queue_tail == NULL)
 +      {
 +              write_queue_head = q;
 +              write_queue_tail = q;
 +      }
 +      else
 +      {
 +              write_queue_tail->next = q;
 +              write_queue_tail = q;
 +      }
 +
 +      pthread_cond_signal (&write_cond);
 +      pthread_mutex_unlock (&write_lock);
 +
 +      return (0);
 +} /* }}} int plugin_write_enqueue */
 +
 +static value_list_t *plugin_write_dequeue (void) /* {{{ */
 +{
 +      write_queue_t *q;
 +      value_list_t *vl;
 +
 +      pthread_mutex_lock (&write_lock);
 +
 +      while (write_loop && (write_queue_head == NULL))
 +              pthread_cond_wait (&write_cond, &write_lock);
 +
 +      if (write_queue_head == NULL)
 +      {
 +              pthread_mutex_unlock (&write_lock);
 +              return (NULL);
 +      }
 +
 +      q = write_queue_head;
 +      write_queue_head = q->next;
 +      if (write_queue_head == NULL)
 +              write_queue_tail = NULL;
 +
 +      pthread_mutex_unlock (&write_lock);
 +
 +      (void) plugin_set_ctx (q->ctx);
 +
 +      vl = q->vl;
 +      sfree (q);
 +      return (vl);
 +} /* }}} value_list_t *plugin_write_dequeue */
 +
 +static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
 +{
 +      while (write_loop)
 +      {
 +              value_list_t *vl = plugin_write_dequeue ();
 +              if (vl == NULL)
 +                      continue;
 +
 +              plugin_dispatch_values_internal (vl);
 +
 +              plugin_value_list_free (vl);
 +      }
 +
 +      pthread_exit (NULL);
 +      return ((void *) 0);
 +} /* }}} void *plugin_write_thread */
 +
 +static void start_write_threads (size_t num) /* {{{ */
 +{
 +      size_t i;
 +
 +      if (write_threads != NULL)
 +              return;
 +
 +      write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
 +      if (write_threads == NULL)
 +      {
 +              ERROR ("plugin: start_write_threads: calloc failed.");
 +              return;
 +      }
 +
 +      write_threads_num = 0;
 +      for (i = 0; i < num; i++)
 +      {
 +              int status;
 +
 +              status = pthread_create (write_threads + write_threads_num,
 +                              /* attr = */ NULL,
 +                              plugin_write_thread,
 +                              /* arg = */ NULL);
 +              if (status != 0)
 +              {
 +                      char errbuf[1024];
 +                      ERROR ("plugin: start_write_threads: pthread_create failed "
 +                                      "with status %i (%s).", status,
 +                                      sstrerror (status, errbuf, sizeof (errbuf)));
 +                      return;
 +              }
 +
 +              write_threads_num++;
 +      } /* for (i) */
 +} /* }}} void start_write_threads */
 +
 +static void stop_write_threads (void) /* {{{ */
 +{
 +      write_queue_t *q;
 +      int i;
 +
 +      if (write_threads == NULL)
 +              return;
 +
 +      INFO ("collectd: Stopping %zu write threads.", write_threads_num);
 +
 +      pthread_mutex_lock (&write_lock);
 +      write_loop = 0;
 +      DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
 +      pthread_cond_broadcast (&write_cond);
 +      pthread_mutex_unlock (&write_lock);
 +
 +      for (i = 0; i < write_threads_num; i++)
 +      {
 +              if (pthread_join (write_threads[i], NULL) != 0)
 +              {
 +                      ERROR ("plugin: stop_write_threads: pthread_join failed.");
 +              }
 +              write_threads[i] = (pthread_t) 0;
 +      }
 +      sfree (write_threads);
 +      write_threads_num = 0;
 +
 +      pthread_mutex_lock (&write_lock);
 +      i = 0;
 +      for (q = write_queue_head; q != NULL; )
 +      {
 +              write_queue_t *q1 = q;
 +              plugin_value_list_free (q->vl);
 +              q = q->next;
 +              sfree (q1);
 +              i++;
 +      }
 +      write_queue_head = NULL;
 +      write_queue_tail = NULL;
 +      pthread_mutex_unlock (&write_lock);
 +
 +      if (i > 0)
 +      {
 +              WARNING ("plugin: %i value list%s left after shutting down "
 +                              "the write threads.",
 +                              i, (i == 1) ? " was" : "s were");
 +      }
 +} /* }}} void stop_write_threads */
 +
  /*
   * Public functions
   */
@@@ -843,6 -604,8 +843,6 @@@ int plugin_load (const char *type, uint
        struct dirent *de;
        int status;
  
 -      DEBUG ("type = %s", type);
 -
        dir = plugin_get_dir ();
        ret = 1;
  
        status = ssnprintf (typename, sizeof (typename), "%s.so", type);
        if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
 -              WARNING ("snprintf: truncated: `%s.so'", type);
 +              WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
                return (-1);
        }
        typename_len = strlen (typename);
        if ((dh = opendir (dir)) == NULL)
        {
                char errbuf[1024];
 -              ERROR ("opendir (%s): %s", dir,
 +              ERROR ("plugin_load: opendir (%s) failed: %s", dir,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
                                "%s/%s", dir, de->d_name);
                if ((status < 0) || ((size_t) status >= sizeof (filename)))
                {
 -                      WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
 +                      WARNING ("plugin_load: Filename too long: \"%s/%s\"",
 +                                      dir, de->d_name);
                        continue;
                }
  
                if (lstat (filename, &statbuf) == -1)
                {
                        char errbuf[1024];
 -                      WARNING ("stat %s: %s", filename,
 +                      WARNING ("plugin_load: stat (\"%s\") failed: %s",
 +                                      filename,
                                        sstrerror (errno, errbuf, sizeof (errbuf)));
                        continue;
                }
                else if (!S_ISREG (statbuf.st_mode))
                {
                        /* don't follow symlinks */
 -                      WARNING ("stat %s: not a regular file", filename);
 +                      WARNING ("plugin_load: %s is not a regular file.",
 +                                      filename);
                        continue;
                }
  
 -              if (plugin_load_file (filename, flags) == 0)
 +              status = plugin_load_file (filename, flags);
 +              if (status == 0)
                {
                        /* success */
                        ret = 0;
                }
                else
                {
 -                      fprintf (stderr, "Unable to load plugin %s.\n", type);
 +                      ERROR ("plugin_load: Load plugin \"%s\" failed with "
 +                                      "status %i.", type, status);
                }
        }
  
        closedir (dh);
  
 -      if (filename[0] == '\0')
 -              fprintf (stderr, "Could not find plugin %s.\n", type);
 +      if (filename[0] == 0)
 +              ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
 +                              type, dir);
  
        return (ret);
  }
@@@ -949,9 -706,13 +949,9 @@@ static int plugin_compare_read_func (co
        rf0 = arg0;
        rf1 = arg1;
  
 -      if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec)
 +      if (rf0->rf_next_read < rf1->rf_next_read)
                return (-1);
 -      else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec)
 -              return (1);
 -      else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec)
 -              return (-1);
 -      else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec)
 +      else if (rf0->rf_next_read > rf1->rf_next_read)
                return (1);
        else
                return (0);
@@@ -965,8 -726,8 +965,8 @@@ static int plugin_insert_read (read_fun
        int status;
        llentry_t *le;
  
 -      cdtime_t now = cdtime ();
 -      CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
 +      rf->rf_next_read = cdtime ();
 +      rf->rf_effective_interval = rf->rf_interval;
  
        pthread_mutex_lock (&read_lock);
  
        return (0);
  } /* int plugin_insert_read */
  
 -static int read_cb_wrapper (user_data_t *ud)
 -{
 -      int (*callback) (void);
 -
 -      if (ud == NULL)
 -              return -1;
 -
 -      callback = ud->data;
 -      return callback();
 -} /* int read_cb_wrapper */
 -
  int plugin_register_read (const char *name,
                int (*callback) (void))
  {
        read_func_t *rf;
 -      plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
  
 -      if (ctx.interval != 0) {
 -              /* If ctx.interval is not zero (== use the plugin or global
 -               * interval), we need to use the "complex" read callback,
 -               * because only that allows to specify a different interval.
 -               * Wrap the callback using read_cb_wrapper(). */
 -              struct timespec interval;
 -              user_data_t user_data;
 -
 -              user_data.data = callback;
 -              user_data.free_func = NULL;
 -
 -              CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
 -              return plugin_register_complex_read (/* group = */ NULL,
 -                              name, read_cb_wrapper, &interval, &user_data);
 -      }
 -
 -      DEBUG ("plugin_register_read: default_interval = %.3f",
 -                      CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
 -
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
 -      rf->rf_ctx = ctx;
 +      rf->rf_ctx = plugin_get_ctx ();
        rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
 -      rf->rf_interval.tv_sec = 0;
 -      rf->rf_interval.tv_nsec = 0;
 -      rf->rf_effective_interval = rf->rf_interval;
 +      rf->rf_interval = plugin_get_interval ();
  
        status = plugin_insert_read (rf);
        if (status != 0)
@@@ -1065,6 -859,7 +1065,6 @@@ int plugin_register_complex_read (cons
                user_data_t *user_data)
  {
        read_func_t *rf;
 -      plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
  
        rf = malloc (sizeof (*rf));
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_COMPLEX;
        if (interval != NULL)
 -      {
 -              rf->rf_interval = *interval;
 -      }
 -      else if (ctx.interval != 0)
 -      {
 -              CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
 -      }
 -      rf->rf_effective_interval = rf->rf_interval;
 -
 -      DEBUG ("plugin_register_read: interval = %i.%09i",
 -                      (int) rf->rf_interval.tv_sec,
 -                      (int) rf->rf_interval.tv_nsec);
 +              rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
 +      else
 +              rf->rf_interval = plugin_get_interval ();
  
        /* Set user data */
        if (user_data == NULL)
                rf->rf_udata = *user_data;
        }
  
 -      rf->rf_ctx = ctx;
 +      rf->rf_ctx = plugin_get_ctx ();
  
        status = plugin_insert_read (rf);
        if (status != 0)
@@@ -1135,6 -939,27 +1135,27 @@@ int plugin_register_shutdown (const cha
                                (void *) callback, /* user_data = */ NULL));
  } /* int plugin_register_shutdown */
  
+ static void plugin_free_data_sets (void)
+ {
+       void *key;
+       void *value;
+       if (data_sets == NULL)
+               return;
+       while (c_avl_pick (data_sets, &key, &value) == 0)
+       {
+               data_set_t *ds = value;
+               /* key is a pointer to ds->type */
+               sfree (ds->ds);
+               sfree (ds);
+       }
+       c_avl_destroy (data_sets);
+       data_sets = NULL;
+ } /* void plugin_free_data_sets */
  int plugin_register_data_set (const data_set_t *ds)
  {
        data_set_t *ds_copy;
@@@ -1365,15 -1190,6 +1386,15 @@@ void plugin_init_all (void
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
  
 +      {
 +              char const *tmp = global_option_get ("WriteThreads");
 +              int num = atoi (tmp);
 +
 +              if (num < 1)
 +                      num = 5;
 +
 +              start_write_threads ((size_t) num);
 +      }
  
        if ((list_init == NULL) && (read_heap == NULL))
                return;
@@@ -1643,8 -1459,6 +1664,8 @@@ void plugin_shutdown_all (void
                plugin_set_ctx (old_ctx);
        }
  
 +      stop_write_threads ();
 +
        /* Write plugins which use the `user_data' pointer usually need the
         * same data available to the flush callback. If this is the case, set
         * the free_function to NULL when registering the flush callback and to
        destroy_all_callbacks (&list_notification);
        destroy_all_callbacks (&list_shutdown);
        destroy_all_callbacks (&list_log);
+       plugin_free_data_sets ();
  } /* void plugin_shutdown_all */
  
  int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
    return (0);
  } /* int }}} plugin_dispatch_missing */
  
 -int plugin_dispatch_values (value_list_t *vl)
 +static int plugin_dispatch_values_internal (value_list_t *vl)
  {
        int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
                return (-1);
        }
  
 -      if (vl->time == 0)
 -              vl->time = cdtime ();
 -
 -      if (vl->interval <= 0)
 -      {
 -              plugin_ctx_t ctx = plugin_get_ctx ();
 -
 -              if (ctx.interval != 0)
 -                      vl->interval = ctx.interval;
 -              else
 -              {
 -                      char name[6 * DATA_MAX_NAME_LEN];
 -                      FORMAT_VL (name, sizeof (name), vl);
 -                      ERROR ("plugin_dispatch_values: Unable to determine "
 -                                      "interval from context for "
 -                                      "value list \"%s\". "
 -                                      "This indicates a broken plugin. "
 -                                      "Please report this problem to the "
 -                                      "collectd mailing list or at "
 -                                      "<http://collectd.org/bugs/>.", name);
 -                      vl->interval = cf_get_default_interval ();
 -              }
 -      }
 +      /* Assured by plugin_value_list_clone(). The time is determined at
 +       * _enqueue_ time. */
 +      assert (vl->time != 0);
 +      assert (vl->interval != 0);
  
        DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
        }
  
        return (0);
 -} /* int plugin_dispatch_values */
 +} /* int plugin_dispatch_values_internal */
  
 -int plugin_dispatch_values_secure (const value_list_t *vl)
 +int plugin_dispatch_values (value_list_t const *vl)
  {
 -  value_list_t vl_copy;
 -  int status;
 -
 -  if (vl == NULL)
 -    return EINVAL;
 -
 -  memcpy (&vl_copy, vl, sizeof (vl_copy));
 -
 -  /* Write callbacks must not change the values and meta pointers, so we can
 -   * savely skip copying those and make this more efficient. */
 -  if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
 -    return (plugin_dispatch_values (&vl_copy));
 -
 -  /* Set pointers to NULL, just to be on the save side. */
 -  vl_copy.values = NULL;
 -  vl_copy.meta = NULL;
 -
 -  vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
 -  if (vl_copy.values == NULL)
 -  {
 -    ERROR ("plugin_dispatch_values_secure: malloc failed.");
 -    return (ENOMEM);
 -  }
 -  memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
 -
 -  if (vl->meta != NULL)
 -  {
 -    vl_copy.meta = meta_data_clone (vl->meta);
 -    if (vl_copy.meta == NULL)
 -    {
 -      ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
 -      free (vl_copy.values);
 -      return (ENOMEM);
 -    }
 -  } /* if (vl->meta) */
 -
 -  status = plugin_dispatch_values (&vl_copy);
 +      int status;
  
 -  meta_data_destroy (vl_copy.meta);
 -  free (vl_copy.values);
 +      status = plugin_write_enqueue (vl);
 +      if (status != 0)
 +      {
 +              char errbuf[1024];
 +              ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
 +                              "with status %i (%s).", status,
 +                              sstrerror (status, errbuf, sizeof (errbuf)));
 +              return (status);
 +      }
  
 -  return (status);
 -} /* int plugin_dispatch_values_secure */
 +      return (0);
 +}
  
  int plugin_dispatch_notification (const notification_t *notif)
  {
@@@ -2016,12 -1880,6 +2039,12 @@@ const data_set_t *plugin_get_ds (const 
  {
        data_set_t *ds;
  
 +      if (data_sets == NULL)
 +      {
 +              ERROR ("plugin_get_ds: No data sets are defined yet.");
 +              return (NULL);
 +      }
 +
        if (c_avl_get (data_sets, name, (void *) &ds) != 0)
        {
                DEBUG ("No such dataset registered: %s", name);
diff --combined src/write_graphite.c
@@@ -80,8 -80,6 +80,8 @@@ struct wg_callbac
  {
      int      sock_fd;
  
 +    char    *name;
 +
      char    *node;
      char    *service;
      char    *prefix;
@@@ -222,7 -220,6 +222,6 @@@ static int wg_callback_init (struct wg_
                  "write_graphite plugin: Connecting to %s:%s failed. "
                  "The last error was: %s", node, service,
                  sstrerror (errno, errbuf, sizeof (errbuf)));
-         close (cb->sock_fd);
          return (-1);
      }
      else
@@@ -250,10 -247,12 +249,13 @@@ static void wg_callback_free (void *dat
  
      wg_flush_nolock (/* timeout = */ 0, cb);
  
-     close(cb->sock_fd);
-     cb->sock_fd = -1;
+     if (cb->sock_fd >= 0)
+     {
+         close (cb->sock_fd);
+         cb->sock_fd = -1;
+     }
  
 +    sfree(cb->name);
      sfree(cb->node);
      sfree(cb->service);
      sfree(cb->prefix);
@@@ -367,12 -366,9 +369,9 @@@ static int wg_write_messages (const dat
          return (status);
  
      /* Send the message to graphite */
-     wg_send_message (buffer, cb);
-     if (status != 0)
-     {
-         /* An error message has already been printed. */
+     status = wg_send_message (buffer, cb);
+     if (status != 0) /* error message has been printed already. */
          return (status);
-     }
  
      return (0);
  } /* int wg_write_messages */
@@@ -424,7 -420,7 +423,7 @@@ static int config_set_char (char *dest
      return (0);
  }
  
 -static int wg_config_carbon (oconfig_item_t *ci)
 +static int wg_config_node (oconfig_item_t *ci)
  {
      struct wg_callback *cb;
      user_data_t user_data;
      }
      memset (cb, 0, sizeof (*cb));
      cb->sock_fd = -1;
 +    cb->name = NULL;
      cb->node = NULL;
      cb->service = NULL;
      cb->prefix = NULL;
      cb->escape_char = WG_DEFAULT_ESCAPE;
      cb->format_flags = GRAPHITE_STORE_RATES;
  
 +    /* FIXME: Legacy configuration syntax. */
 +    if (strcasecmp ("Carbon", ci->key) != 0)
 +    {
 +        int status = cf_util_get_string (ci, &cb->name);
 +        if (status != 0)
 +        {
 +            wg_callback_free (cb);
 +            return (status);
 +        }
 +    }
 +
      pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
      C_COMPLAIN_INIT (&cb->init_complaint);
  
          }
      }
  
 -    ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s",
 -            cb->node != NULL ? cb->node : WG_DEFAULT_NODE,
 -            cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE);
 +    /* FIXME: Legacy configuration syntax. */
 +    if (cb->name == NULL)
 +        ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s",
 +                cb->node != NULL ? cb->node : WG_DEFAULT_NODE,
 +                cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE);
 +    else
 +        ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s",
 +                cb->name);
  
      memset (&user_data, 0, sizeof (user_data));
      user_data.data = cb;
@@@ -519,11 -498,8 +518,11 @@@ static int wg_config (oconfig_item_t *c
      {
          oconfig_item_t *child = ci->children + i;
  
 -        if (strcasecmp ("Carbon", child->key) == 0)
 -            wg_config_carbon (child);
 +        if (strcasecmp ("Node", child->key) == 0)
 +            wg_config_node (child);
 +        /* FIXME: Remove this legacy mode in version 6. */
 +        else if (strcasecmp ("Carbon", child->key) == 0)
 +            wg_config_node (child);
          else
          {
              ERROR ("write_graphite plugin: Invalid configuration "