From 0b309ea35fcd30a8ba14b5af2295e3f3b1da2a3f Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Wed, 28 Mar 2007 09:20:31 +0200 Subject: [PATCH] plugin.c: Implemented parallel reading of values. --- src/Makefile.am | 3 + src/collectd.conf.in | 8 ++- src/collectd.conf.pod | 7 ++ src/configfile.c | 21 ++++-- src/plugin.c | 188 +++++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 191 insertions(+), 36 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 44445fb8..5c43fb55 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -38,6 +38,9 @@ endif if BUILD_WITH_LIBRESOLV collectd_LDFLAGS += -lresolv endif +if BUILD_WITH_LIBPTHREAD +collectd_LDFLAGS += -lpthread +endif if BUILD_WITH_LIBKSTAT collectd_LDFLAGS += -lkstat endif diff --git a/src/collectd.conf.in b/src/collectd.conf.in index d1614b79..5221c56c 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -4,9 +4,11 @@ # http://collectd.org/ # -#BaseDir "/opt/collectd/var/lib/collectd" -#PIDFile "/opt/collectd/var/run/collectd.pid" -#PluginDir "/opt/collectd/lib/collectd" +#BaseDir "@prefix@/var/lib/@PACKAGE_NAME@" +#PIDFile "@prefix@/var/run/@PACKAGE_NAME@.pid" +#PluginDir "@prefix@/lib/@PACKAGE_NAME@" +#Interval 10 +#ReadThreads 5 @BUILD_MODULE_APACHE_TRUE@LoadPlugin apache @BUILD_MODULE_APCUPS_TRUE@LoadPlugin apcups diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 98612dce..6fc167b5 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -59,6 +59,13 @@ Configures the interval in which to query the read plugins. Obviously smaller values lead to a higher system load produces by collectd, while higher values lead to more coarse statistics. +=item B I + +Number of threads to start for reading plugins. The default value if B<5>, but +you may want to increase this if you have more than five plugins that take a +long time to read. Mostly those are plugin that do network-IO. Setting this to +a value higher than the number of plugins you've loaded is totally useless. + =back =head1 PLUGIN OPTIONS diff --git a/src/configfile.c b/src/configfile.c index eca3dfc0..1d3b8954 100644 --- a/src/configfile.c +++ b/src/configfile.c @@ -79,7 +79,8 @@ static cf_global_option_t cf_global_options[] = {"BaseDir", NULL, PKGLOCALSTATEDIR}, {"PIDFile", NULL, PIDFILE}, {"Hostname", NULL, NULL}, - {"Interval", NULL, "10"} + {"Interval", NULL, "10"}, + {"ReadThreads", NULL, "5"} }; static int cf_global_options_num = STATIC_ARRAY_LEN (cf_global_options); @@ -155,11 +156,18 @@ static int dispatch_global_option (const oconfig_item_t *ci) { if (ci->values_num != 1) return (-1); - if (ci->values[0].type != OCONFIG_TYPE_STRING) - return (-1); + if (ci->values[0].type == OCONFIG_TYPE_STRING) + return (global_option_set (ci->key, ci->values[0].value.string)); + else if (ci->values[0].type == OCONFIG_TYPE_NUMBER) + { + char tmp[128]; + snprintf (tmp, sizeof (tmp), "%lf", ci->values[0].value.number); + tmp[127] = '\0'; + return (global_option_set (ci->key, tmp)); + } - return (global_option_set (ci->key, ci->values[0].value.string)); -} + return (-1); +} /* int dispatch_global_option */ static int dispatch_value_plugindir (const oconfig_item_t *ci) { @@ -294,8 +302,7 @@ int global_option_set (const char *option, const char *value) if (i >= cf_global_options_num) return (-1); - if (cf_global_options[i].value != NULL) - free (cf_global_options[i].value); + sfree (cf_global_options[i].value); if (value != NULL) cf_global_options[i].value = strdup (value); diff --git a/src/plugin.c b/src/plugin.c index 60cd37f4..86ffbde7 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -23,6 +23,10 @@ #include +#if HAVE_PTHREAD_H +# include +#endif + #include "common.h" #include "plugin.h" #include "configfile.h" @@ -36,6 +40,7 @@ struct read_func_s int wait_time; int wait_left; int (*callback) (void); + enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read; }; typedef struct read_func_s read_func_t; @@ -51,6 +56,12 @@ static llist_t *list_log; static char *plugindir = NULL; +static int read_loop = 1; +static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *read_threads = NULL; +static int read_threads_num = 0; + /* * Static functions */ @@ -139,6 +150,128 @@ static int plugin_load_file (char *file) return (0); } +static void *plugin_read_thread (void *args) +{ + llentry_t *le; + read_func_t *rf; + int status; + int done; + + pthread_mutex_lock (&read_lock); + + while (read_loop != 0) + { + le = llist_head (list_read); + done = 0; + + while ((read_loop != 0) && (le != NULL)) + { + rf = (read_func_t *) le->value; + + if (rf->needs_read != TODO) + { + le = le->next; + continue; + } + + /* We will do this read function */ + rf->needs_read = ACTIVE; + + DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s", + (unsigned long int) pthread_self (), le->key); + pthread_mutex_unlock (&read_lock); + + status = rf->callback (); + done++; + + if (status != 0) + { + rf->wait_left = rf->wait_time; + rf->wait_time = rf->wait_time * 2; + if (rf->wait_time > 86400) + rf->wait_time = 86400; + + NOTICE ("read-function of plugin `%s' " + "failed. Will syspend it for %i " + "seconds.", le->key, rf->wait_left); + } + else + { + rf->wait_left = 0; + rf->wait_time = interval_g; + } + + pthread_mutex_lock (&read_lock); + + rf->needs_read = DONE; + le = le->next; + } /* while (le != NULL) */ + + if ((read_loop != 0) && (done == 0)) + { + DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Waiting on read_cond.", + (unsigned long int) pthread_self ()); + pthread_cond_wait (&read_cond, &read_lock); + } + } /* while (read_loop) */ + + pthread_mutex_unlock (&read_lock); + + pthread_exit (NULL); +} /* void *plugin_read_thread */ + +static void start_threads (int num) +{ + int i; + + if (read_threads != NULL) + return; + + read_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); + if (read_threads == NULL) + { + ERROR ("plugin: start_threads: calloc failed."); + return; + } + + read_threads_num = 0; + for (i = 0; i < num; i++) + { + if (pthread_create (read_threads + read_threads_num, NULL, + plugin_read_thread, NULL) == 0) + { + read_threads_num++; + } + else + { + ERROR ("plugin: start_threads: pthread_create failed."); + return; + } + } /* for (i) */ +} /* void start_threads */ + +static void stop_threads (void) +{ + int i; + + pthread_mutex_lock (&read_lock); + read_loop = 0; + DEBUG ("plugin: stop_threads: Signalling `read_cond'"); + pthread_cond_broadcast (&read_cond); + pthread_mutex_unlock (&read_lock); + + for (i = 0; i < read_threads_num; i++) + { + if (pthread_join (read_threads[i], NULL) != 0) + { + ERROR ("plugin: stop_threads: pthread_join failed."); + } + read_threads[i] = -1; + } + sfree (read_threads); + read_threads_num = 0; +} /* void stop_threads */ + /* * Public functions */ @@ -263,6 +396,7 @@ int plugin_register_read (const char *name, rf->wait_time = interval_g; rf->wait_left = 0; rf->callback = callback; + rf->needs_read = DONE; return (register_callback (&list_read, name, (void *) rf)); } /* int plugin_register_read */ @@ -344,6 +478,16 @@ void plugin_init_all (void) llentry_t *le; int status; + /* Start read-threads */ + if (list_read != NULL) + { + const char *rt; + int num; + rt = global_option_get ("ReadThreads"); + num = atoi (rt); + start_threads ((num > 0) ? num : 5); + } + if (list_init == NULL) return; @@ -370,44 +514,34 @@ void plugin_read_all (const int *loop) { llentry_t *le; read_func_t *rf; - int status; if (list_read == NULL) return; + pthread_mutex_lock (&read_lock); + le = llist_head (list_read); - while ((*loop == 0) && (le != NULL)) + while (le != NULL) { rf = (read_func_t *) le->value; + if (rf->needs_read != DONE) + continue; + if (rf->wait_left > 0) rf->wait_left -= interval_g; - if (rf->wait_left > 0) - { - le = le->next; - continue; - } - status = rf->callback (); - if (status != 0) - { - rf->wait_left = rf->wait_time; - rf->wait_time = rf->wait_time * 2; - if (rf->wait_time > 86400) - rf->wait_time = 86400; - - NOTICE ("read-function of plugin `%s' " - "failed. Will syspend it for %i " - "seconds.", le->key, rf->wait_left); - } - else + if (rf->wait_left <= 0) { - rf->wait_left = 0; - rf->wait_time = interval_g; + rf->needs_read = TODO; } le = le->next; - } /* while ((*loop == 0) && (le != NULL)) */ + } + + DEBUG ("plugin: plugin_read_all: Signalling `read_cond'"); + pthread_cond_broadcast (&read_cond); + pthread_mutex_unlock (&read_lock); } /* void plugin_read_all */ void plugin_shutdown_all (void) @@ -415,6 +549,8 @@ void plugin_shutdown_all (void) int (*callback) (void); llentry_t *le; + stop_threads (); + if (list_shutdown == NULL) return; @@ -446,9 +582,9 @@ int plugin_dispatch_values (const char *name, const value_list_t *vl) ds = (data_set_t *) le->value; - DEBUG ("time = %u; host = %s; " - "plugin = %s; plugin_instance = %s; " - "type = %s; type_instance = %s;", + DEBUG ("plugin: plugin_dispatch_values: time = %u; host = %s; " + "plugin = %s; plugin_instance = %s; type = %s; " + "type_instance = %s;", (unsigned int) vl->time, vl->host, vl->plugin, vl->plugin_instance, ds->type, vl->type_instance); -- 2.11.0