/* default to the global interval set before loading this plugin */
memset (&ctx, 0, sizeof (ctx));
ctx.interval = cf_get_default_interval ();
+ ctx.flush_interval = 0;
+ ctx.flush_timeout = 0;
- for (i = 0; i < ci->children_num; ++i) {
- if (strcasecmp("Globals", ci->children[i].key) == 0)
- cf_util_get_flag (ci->children + i, &flags, PLUGIN_FLAGS_GLOBAL);
- else if (strcasecmp ("Interval", ci->children[i].key) == 0) {
- if (cf_util_get_cdtime (ci->children + i, &ctx.interval) != 0) {
+ for (i = 0; i < ci->children_num; ++i)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Globals", child->key) == 0)
+ cf_util_get_flag (child, &flags, PLUGIN_FLAGS_GLOBAL);
+ else if (strcasecmp ("Interval", child->key) == 0) {
+ if (cf_util_get_cdtime (child, &ctx.interval) != 0) {
+ /* cf_util_get_cdtime will log an error */
+ continue;
+ }
+ }
+ else if (strcasecmp ("FlushInterval", child->key) == 0) {
+ if (cf_util_get_cdtime (child, &ctx.flush_interval) != 0) {
+ /* cf_util_get_cdtime will log an error */
+ continue;
+ }
+ }
+ else if (strcasecmp ("FlushTimeout", child->key) == 0) {
+ if (cf_util_get_cdtime (child, &ctx.flush_timeout) != 0) {
/* cf_util_get_cdtime will log an error */
continue;
}
else {
WARNING("Ignoring unknown LoadPlugin option \"%s\" "
"for plugin \"%s\"",
- ci->children[i].key, ci->values[0].value.string);
+ child->key, ci->values[0].value.string);
}
}
write_queue_t *next;
};
+struct flush_callback_s {
+ char *name;
+ cdtime_t timeout;
+};
+typedef struct flush_callback_s flush_callback_t;
+
/*
* Private variables
*/
(void *) callback, ud));
} /* int plugin_register_write */
+static int plugin_flush_timeout_callback (user_data_t *ud)
+{
+ flush_callback_t *cb = ud->data;
+
+ return plugin_flush (cb->name, cb->timeout, /* identifier = */ NULL);
+} /* static int plugin_flush_callback */
+
+static void plugin_flush_timeout_callback_free (void *data)
+{
+ flush_callback_t *cb = data;
+
+ if (cb == NULL) return;
+
+ sfree(cb->name);
+ sfree(cb);
+} /* static void plugin_flush_callback_free */
+
int plugin_register_flush (const char *name,
plugin_flush_cb callback, user_data_t *ud)
{
- return (create_register_callback (&list_flush, name,
- (void *) callback, ud));
+ int status;
+ plugin_ctx_t ctx = plugin_get_ctx ();
+
+ status = create_register_callback (&list_flush, name,
+ (void *) callback, ud);
+ if (status != 0)
+ return status;
+
+ if (ctx.flush_interval != 0)
+ {
+ char *flush_prefix = "flush/";
+ size_t prefix_size;
+ char *flush_name;
+ size_t name_size;
+ user_data_t ud;
+ flush_callback_t *cb;
+
+ prefix_size = strlen(flush_prefix);
+ name_size = strlen(name);
+
+ flush_name = (char *) malloc (sizeof (char) *
+ (name_size + prefix_size + 1));
+ if (flush_name == NULL)
+ {
+ ERROR ("plugin_register_flush: malloc failed.");
+ plugin_unregister (list_flush, name);
+ return (-1);
+ }
+
+ sstrncpy (flush_name, flush_prefix, prefix_size + 1);
+ sstrncpy (flush_name + prefix_size, name, name_size + 1);
+
+ cb = (flush_callback_t *)malloc(sizeof(flush_callback_t));
+ if (cb == NULL)
+ {
+ ERROR ("plugin_register_flush: malloc failed.");
+ sfree(flush_name);
+ plugin_unregister (list_flush, name);
+ return (-1);
+ }
+
+ cb->name = strdup (name);
+ if (cb->name == NULL)
+ {
+ ERROR ("plugin_register_flush: strdup failed.");
+ sfree(cb);
+ sfree(flush_name);
+ plugin_unregister (list_flush, name);
+ return (-1);
+ }
+ cb->timeout = ctx.flush_timeout;
+
+ ud.data = cb;
+ ud.free_func = plugin_flush_timeout_callback_free;
+
+ status = plugin_register_complex_read (
+ /* group = */ "flush",
+ /* name = */ flush_name,
+ /* callback = */ plugin_flush_timeout_callback,
+ /* interval = */ ctx.flush_interval,
+ /* user data = */ &ud);
+
+ sfree(flush_name);
+ if (status != 0)
+ {
+ sfree(cb->name);
+ sfree(cb);
+ plugin_unregister (list_flush, name);
+ return status;
+ }
+ }
+
+ return 0;
} /* int plugin_register_flush */
int plugin_register_missing (const char *name,
int plugin_unregister_flush (const char *name)
{
- return (plugin_unregister (list_flush, name));
+ plugin_ctx_t ctx = plugin_get_ctx ();
+
+ if (ctx.flush_interval != 0)
+ {
+ char *flush_prefix = "flush/";
+ size_t prefix_size;
+ char *flush_name;
+ size_t name_size;
+
+ prefix_size = strlen(flush_prefix);
+ name_size = strlen(name);
+
+ flush_name = (char *) malloc (sizeof (char) *
+ (name_size + prefix_size + 1));
+ if (flush_name == NULL)
+ {
+ ERROR ("plugin_unregister_flush: malloc failed.");
+ return (-1);
+ }
+
+ sstrncpy (flush_name, flush_prefix, prefix_size + 1);
+ sstrncpy (flush_name + prefix_size, name, name_size + 1);
+
+ plugin_unregister_read(flush_name);
+ sfree(flush_name);
+ }
+
+ return plugin_unregister (list_flush, name);
}
int plugin_unregister_missing (const char *name)