From e94f5c95909f895a80457baf543e03b6d6aff5ee Mon Sep 17 00:00:00 2001 From: =?utf8?q?Manuel=20Luis=20Sanmart=C3=ADn=20Rozada?= Date: Mon, 8 Jun 2015 00:49:20 +0200 Subject: [PATCH] Support for call the flush callback at regular intervals using the read plugin callback. --- src/collectd.conf.pod | 9 ++++ src/daemon/configfile.c | 29 ++++++++--- src/daemon/plugin.c | 127 ++++++++++++++++++++++++++++++++++++++++++++++-- src/daemon/plugin.h | 2 + 4 files changed, 158 insertions(+), 9 deletions(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 1884914f..05ae8f36 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -130,6 +130,15 @@ Sets a plugin-specific interval for collecting metrics. This overrides the global B setting. If a plugin provides own support for specifying an interval, that setting will take precedence. +=item B I + +Specifies the the interval, in seconds, to call the flush callback if it's +defined in this plugin. By default, this is disabled + +=item B I + +Specifies the value of the timeout argument of the flush callback. + =back =item B B|B diff --git a/src/daemon/configfile.c b/src/daemon/configfile.c index 02fd96f6..dde16ca7 100644 --- a/src/daemon/configfile.c +++ b/src/daemon/configfile.c @@ -287,12 +287,29 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) /* default to the global interval set before loading this plugin */ memset (&ctx, 0, sizeof (ctx)); ctx.interval = cf_get_default_interval (); + ctx.flush_interval = 0; + ctx.flush_timeout = 0; - for (i = 0; i < ci->children_num; ++i) { - if (strcasecmp("Globals", ci->children[i].key) == 0) - cf_util_get_flag (ci->children + i, &flags, PLUGIN_FLAGS_GLOBAL); - else if (strcasecmp ("Interval", ci->children[i].key) == 0) { - if (cf_util_get_cdtime (ci->children + i, &ctx.interval) != 0) { + for (i = 0; i < ci->children_num; ++i) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Globals", child->key) == 0) + cf_util_get_flag (child, &flags, PLUGIN_FLAGS_GLOBAL); + else if (strcasecmp ("Interval", child->key) == 0) { + if (cf_util_get_cdtime (child, &ctx.interval) != 0) { + /* cf_util_get_cdtime will log an error */ + continue; + } + } + else if (strcasecmp ("FlushInterval", child->key) == 0) { + if (cf_util_get_cdtime (child, &ctx.flush_interval) != 0) { + /* cf_util_get_cdtime will log an error */ + continue; + } + } + else if (strcasecmp ("FlushTimeout", child->key) == 0) { + if (cf_util_get_cdtime (child, &ctx.flush_timeout) != 0) { /* cf_util_get_cdtime will log an error */ continue; } @@ -300,7 +317,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) else { WARNING("Ignoring unknown LoadPlugin option \"%s\" " "for plugin \"%s\"", - ci->children[i].key, ci->values[0].value.string); + child->key, ci->values[0].value.string); } } diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index b3cb97f6..0ef6273b 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -80,6 +80,12 @@ struct write_queue_s write_queue_t *next; }; +struct flush_callback_s { + char *name; + cdtime_t timeout; +}; +typedef struct flush_callback_s flush_callback_t; + /* * Private variables */ @@ -1295,11 +1301,99 @@ int plugin_register_write (const char *name, (void *) callback, ud)); } /* int plugin_register_write */ +static int plugin_flush_timeout_callback (user_data_t *ud) +{ + flush_callback_t *cb = ud->data; + + return plugin_flush (cb->name, cb->timeout, /* identifier = */ NULL); +} /* static int plugin_flush_callback */ + +static void plugin_flush_timeout_callback_free (void *data) +{ + flush_callback_t *cb = data; + + if (cb == NULL) return; + + sfree(cb->name); + sfree(cb); +} /* static void plugin_flush_callback_free */ + int plugin_register_flush (const char *name, plugin_flush_cb callback, user_data_t *ud) { - return (create_register_callback (&list_flush, name, - (void *) callback, ud)); + int status; + plugin_ctx_t ctx = plugin_get_ctx (); + + status = create_register_callback (&list_flush, name, + (void *) callback, ud); + if (status != 0) + return status; + + if (ctx.flush_interval != 0) + { + char *flush_prefix = "flush/"; + size_t prefix_size; + char *flush_name; + size_t name_size; + user_data_t ud; + flush_callback_t *cb; + + prefix_size = strlen(flush_prefix); + name_size = strlen(name); + + flush_name = (char *) malloc (sizeof (char) * + (name_size + prefix_size + 1)); + if (flush_name == NULL) + { + ERROR ("plugin_register_flush: malloc failed."); + plugin_unregister (list_flush, name); + return (-1); + } + + sstrncpy (flush_name, flush_prefix, prefix_size + 1); + sstrncpy (flush_name + prefix_size, name, name_size + 1); + + cb = (flush_callback_t *)malloc(sizeof(flush_callback_t)); + if (cb == NULL) + { + ERROR ("plugin_register_flush: malloc failed."); + sfree(flush_name); + plugin_unregister (list_flush, name); + return (-1); + } + + cb->name = strdup (name); + if (cb->name == NULL) + { + ERROR ("plugin_register_flush: strdup failed."); + sfree(cb); + sfree(flush_name); + plugin_unregister (list_flush, name); + return (-1); + } + cb->timeout = ctx.flush_timeout; + + ud.data = cb; + ud.free_func = plugin_flush_timeout_callback_free; + + status = plugin_register_complex_read ( + /* group = */ "flush", + /* name = */ flush_name, + /* callback = */ plugin_flush_timeout_callback, + /* interval = */ ctx.flush_interval, + /* user data = */ &ud); + + sfree(flush_name); + if (status != 0) + { + sfree(cb->name); + sfree(cb); + plugin_unregister (list_flush, name); + return status; + } + } + + return 0; } /* int plugin_register_flush */ int plugin_register_missing (const char *name, @@ -1518,7 +1612,34 @@ int plugin_unregister_write (const char *name) int plugin_unregister_flush (const char *name) { - return (plugin_unregister (list_flush, name)); + plugin_ctx_t ctx = plugin_get_ctx (); + + if (ctx.flush_interval != 0) + { + char *flush_prefix = "flush/"; + size_t prefix_size; + char *flush_name; + size_t name_size; + + prefix_size = strlen(flush_prefix); + name_size = strlen(name); + + flush_name = (char *) malloc (sizeof (char) * + (name_size + prefix_size + 1)); + if (flush_name == NULL) + { + ERROR ("plugin_unregister_flush: malloc failed."); + return (-1); + } + + sstrncpy (flush_name, flush_prefix, prefix_size + 1); + sstrncpy (flush_name + prefix_size, name, name_size + 1); + + plugin_unregister_read(flush_name); + sfree(flush_name); + } + + return plugin_unregister (list_flush, name); } int plugin_unregister_missing (const char *name) diff --git a/src/daemon/plugin.h b/src/daemon/plugin.h index 2e20da4c..daea4fc9 100644 --- a/src/daemon/plugin.h +++ b/src/daemon/plugin.h @@ -177,6 +177,8 @@ typedef struct user_data_s user_data_t; struct plugin_ctx_s { cdtime_t interval; + cdtime_t flush_interval; + cdtime_t flush_timeout; }; typedef struct plugin_ctx_s plugin_ctx_t; -- 2.11.0