From fb14e1f8ff0d3b7dddde1baa4a7a5b44532d27a4 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Tue, 26 Feb 2008 18:12:07 +0100 Subject: [PATCH] collectd, plugin: Added support for "flush" callbacks. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit A flush callback may be provided to make it possible to flush internal caches (e.g. the rrdtool plugin's data cache) from outside the plugin. On SIGUSR1, flush callback is invoked for all plugins. As flushing large amounts of data might take some time a new thread is started to handle the request asynchronously. Thanks to Stefan Völkel for proposing this. Signed-off-by: Sebastian Harl Signed-off-by: Florian Forster --- src/collectd.c | 28 ++++++++++++++++++++++++++++ src/plugin.c | 29 +++++++++++++++++++++++++++++ src/plugin.h | 4 ++++ 3 files changed, 61 insertions(+) diff --git a/src/collectd.c b/src/collectd.c index 984ff757..4c9aafc2 100644 --- a/src/collectd.c +++ b/src/collectd.c @@ -27,6 +27,8 @@ #include #include +#include + #include "plugin.h" #include "configfile.h" @@ -41,6 +43,15 @@ kstat_ctl_t *kc; static int loop = 0; +static void *do_flush (void *arg) +{ + INFO ("Flushing all data."); + plugin_flush_all (-1); + INFO ("Finished flushing all data."); + pthread_exit (NULL); + return NULL; +} + static void sigIntHandler (int signal) { loop++; @@ -51,6 +62,18 @@ static void sigTermHandler (int signal) loop++; } +static void sigUsr1Handler (int signal) +{ + pthread_t thread; + pthread_attr_t attr; + + /* flushing the data might take a while, + * so it should be done asynchronously */ + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + pthread_create (&thread, &attr, do_flush, NULL); +} + static int init_hostname (void) { const char *str; @@ -367,6 +390,7 @@ int main (int argc, char **argv) { struct sigaction sigIntAction; struct sigaction sigTermAction; + struct sigaction sigUsr1Action; char *configfile = CONFIGFILE; int test_config = 0; const char *basedir; @@ -519,6 +543,10 @@ int main (int argc, char **argv) sigTermAction.sa_handler = sigTermHandler; sigaction (SIGTERM, &sigTermAction, NULL); + memset (&sigUsr1Action, '\0', sizeof (sigUsr1Action)); + sigUsr1Action.sa_handler = sigUsr1Handler; + sigaction (SIGUSR1, &sigUsr1Action, NULL); + /* * run the actual loops */ diff --git a/src/plugin.c b/src/plugin.c index 1dd6daf3..ca6193be 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -53,6 +53,7 @@ typedef struct read_func_s read_func_t; static llist_t *list_init; static llist_t *list_read; static llist_t *list_write; +static llist_t *list_flush; static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; @@ -433,6 +434,11 @@ int plugin_register_write (const char *name, return (register_callback (&list_write, name, (void *) callback)); } /* int plugin_register_write */ +int plugin_register_flush (const char *name, int (*callback) (const int)) +{ + return (register_callback (&list_flush, name, (void *) callback)); +} /* int plugin_register_flush */ + int plugin_register_shutdown (char *name, int (*callback) (void)) { @@ -527,6 +533,11 @@ int plugin_unregister_write (const char *name) return (plugin_unregister (list_write, name)); } +int plugin_unregister_flush (const char *name) +{ + return (plugin_unregister (list_flush, name)); +} + int plugin_unregister_shutdown (const char *name) { return (plugin_unregister (list_shutdown, name)); @@ -639,6 +650,24 @@ void plugin_read_all (void) pthread_mutex_unlock (&read_lock); } /* void plugin_read_all */ +void plugin_flush_all (int timeout) +{ + int (*callback) (int); + llentry_t *le; + + if (list_flush == NULL) + return; + + le = llist_head (list_flush); + while (le != NULL) + { + callback = (int (*) (int)) le->value; + le = le->next; + + (*callback) (timeout); + } +} /* void plugin_flush_all */ + void plugin_shutdown_all (void) { int (*callback) (void); diff --git a/src/plugin.h b/src/plugin.h index 25c745cb..aea0e4df 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -149,6 +149,7 @@ int plugin_load (const char *name); void plugin_init_all (void); void plugin_read_all (void); +void plugin_flush_all (int timeout); void plugin_shutdown_all (void); /* @@ -167,6 +168,8 @@ int plugin_register_read (const char *name, int (*callback) (void)); int plugin_register_write (const char *name, int (*callback) (const data_set_t *ds, const value_list_t *vl)); +int plugin_register_flush (const char *name, + int (*callback) (const int)); int plugin_register_shutdown (char *name, int (*callback) (void)); int plugin_register_data_set (const data_set_t *ds); @@ -180,6 +183,7 @@ int plugin_unregister_complex_config (const char *name); int plugin_unregister_init (const char *name); int plugin_unregister_read (const char *name); int plugin_unregister_write (const char *name); +int plugin_unregister_flush (const char *name); int plugin_unregister_shutdown (const char *name); int plugin_unregister_data_set (const char *name); int plugin_unregister_log (const char *name); -- 2.11.0