/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2011 Florian octo Forster
+ * Copyright (C) 2005-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
};
typedef struct read_func_s read_func_t;
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+ value_list_t *vl;
+ write_queue_t *next;
+};
+
/*
* Private variables
*/
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static write_queue_t *write_queue_head;
+static write_queue_t *write_queue_tail;
+static _Bool write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t *write_threads = NULL;
+static size_t write_threads_num = 0;
+
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
/*
* Static functions
*/
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
static const char *plugin_get_dir (void)
{
if (plugindir == NULL)
read_threads_num = 0;
} /* void stop_read_threads */
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+ if (vl == NULL)
+ return;
+
+ meta_data_destroy (vl->meta);
+ sfree (vl->values);
+ sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+ value_list_t *vl;
+
+ if (vl_orig == NULL)
+ return (NULL);
+
+ vl = malloc (sizeof (*vl));
+ if (vl == NULL)
+ return (NULL);
+ memcpy (vl, vl_orig, sizeof (*vl));
+
+ vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+ if (vl->values == NULL)
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+ memcpy (vl->values, vl_orig->values,
+ vl_orig->values_len * sizeof (*vl->values));
+
+ vl->meta = meta_data_clone (vl->meta);
+ if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+
+ return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+ write_queue_t *q;
+
+ q = malloc (sizeof (*q));
+ if (q == NULL)
+ return (ENOMEM);
+ q->next = NULL;
+
+ q->vl = plugin_value_list_clone (vl);
+ if (q->vl == NULL)
+ {
+ sfree (q);
+ return (ENOMEM);
+ }
+
+ pthread_mutex_lock (&write_lock);
+
+ if (write_queue_tail == NULL)
+ {
+ write_queue_head = q;
+ write_queue_tail = q;
+ }
+ else
+ {
+ write_queue_tail->next = q;
+ write_queue_tail = q;
+ }
+
+ pthread_cond_signal (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+ write_queue_t *q;
+ value_list_t *vl;
+
+ pthread_mutex_lock (&write_lock);
+
+ while (write_loop && (write_queue_head == NULL))
+ pthread_cond_wait (&write_cond, &write_lock);
+
+ if (write_queue_head == NULL)
+ {
+ pthread_mutex_unlock (&write_lock);
+ return (NULL);
+ }
+
+ q = write_queue_head;
+ write_queue_head = q->next;
+ if (write_queue_head == NULL)
+ write_queue_tail = NULL;
+
+ pthread_mutex_unlock (&write_lock);
+
+ vl = q->vl;
+ sfree (q);
+ return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+ while (write_loop)
+ {
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
+ continue;
+
+ plugin_dispatch_values_internal (vl);
+
+ plugin_value_list_free (vl);
+ }
+
+ pthread_exit (NULL);
+ return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+ size_t i;
+
+ if (write_threads != NULL)
+ return;
+
+ write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (write_threads == NULL)
+ {
+ ERROR ("plugin: start_write_threads: calloc failed.");
+ return;
+ }
+
+ write_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ int status;
+
+ status = pthread_create (write_threads + write_threads_num,
+ /* attr = */ NULL,
+ plugin_write_thread,
+ /* arg = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin: start_write_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ write_threads_num++;
+ } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+ write_queue_t *q;
+ int i;
+
+ if (write_threads == NULL)
+ return;
+
+ INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+ pthread_mutex_lock (&write_lock);
+ write_loop = 0;
+ DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+ pthread_cond_broadcast (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ for (i = 0; i < write_threads_num; i++)
+ {
+ if (pthread_join (write_threads[i], NULL) != 0)
+ {
+ ERROR ("plugin: stop_write_threads: pthread_join failed.");
+ }
+ write_threads[i] = (pthread_t) 0;
+ }
+ sfree (write_threads);
+ write_threads_num = 0;
+
+ pthread_mutex_lock (&write_lock);
+ i = 0;
+ for (q = write_queue_head; q != NULL; q = q->next)
+ {
+ plugin_value_list_free (q->vl);
+ sfree (q);
+ i++;
+ }
+ write_queue_head = NULL;
+ write_queue_tail = NULL;
+ pthread_mutex_unlock (&write_lock);
+
+ if (i > 0)
+ {
+ WARNING ("plugin: %i value list%s left after shutting down "
+ "the write threads.",
+ i, (i == 1) ? " was" : "s were");
+ }
+} /* }}} void stop_write_threads */
+
/*
* Public functions
*/
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ {
+ char const *tmp = global_option_get ("WriteThreads");
+ int num = atoi (tmp);
+
+ if (num < 1)
+ num = 5;
+
+ start_write_threads ((size_t) num);
+ }
if ((list_init == NULL) && (read_heap == NULL))
return;
plugin_set_ctx (old_ctx);
}
+ stop_write_threads ();
+
/* Write plugins which use the `user_data' pointer usually need the
* same data available to the flush callback. If this is the case, set
* the free_function to NULL when registering the flush callback and to
return (0);
} /* int }}} plugin_dispatch_missing */
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
{
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
}
return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
{
- value_list_t vl_copy;
- int status;
-
- if (vl == NULL)
- return EINVAL;
-
- memcpy (&vl_copy, vl, sizeof (vl_copy));
-
- /* Write callbacks must not change the values and meta pointers, so we can
- * savely skip copying those and make this more efficient. */
- if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
- return (plugin_dispatch_values (&vl_copy));
-
- /* Set pointers to NULL, just to be on the save side. */
- vl_copy.values = NULL;
- vl_copy.meta = NULL;
-
- vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
- if (vl_copy.values == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: malloc failed.");
- return (ENOMEM);
- }
- memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
- if (vl->meta != NULL)
- {
- vl_copy.meta = meta_data_clone (vl->meta);
- if (vl_copy.meta == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
- free (vl_copy.values);
- return (ENOMEM);
- }
- } /* if (vl->meta) */
+ int status;
- status = plugin_dispatch_values (&vl_copy);
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return (status);
+ }
- meta_data_destroy (vl_copy.meta);
- free (vl_copy.values);
+ return (0);
+}
- return (status);
+int plugin_dispatch_values_secure (const value_list_t *vl)
+{
+ return (plugin_dispatch_values (vl));
} /* int plugin_dispatch_values_secure */
int plugin_dispatch_notification (const notification_t *notif)