From 2193369303b99a35fe09d14632d66c00e20755c6 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 6 Aug 2010 15:01:40 +0200 Subject: [PATCH] amqp plugin: Implement the "Format" config option. By default, the publishing code now creates "command" (i.e. PUTVAL) output. For now this is easier to parse, so use this for the subscribing code. (For now, anyways. I guess JSON will come later, too.) --- src/Makefile.am | 4 +- src/amqp.c | 120 ++++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 94 insertions(+), 30 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 222d9168..d4d6a1ba 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -125,7 +125,9 @@ CLEANFILES = if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la -amqp_la_SOURCES = amqp.c utils_format_json.c utils_format_json.h +amqp_la_SOURCES = amqp.c \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_format_json.c utils_format_json.h amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) diff --git a/src/amqp.c b/src/amqp.c index eb547950..84bcc066 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -35,6 +35,7 @@ #include "collectd.h" #include "common.h" #include "plugin.h" +#include "utils_cmd_putval.h" #include "utils_format_json.h" #include @@ -45,6 +46,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 + #define CAMQP_CHANNEL 1 /* @@ -54,6 +58,7 @@ struct camqp_config_s { _Bool publish; char *name; + int format; char *host; int port; @@ -392,6 +397,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_connect */ +static int shutdown (void) /* {{{ */ +{ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + + return (0); +} /* }}} int shutdown */ + +/* + * Subscribing code + */ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ size_t body_size) { @@ -552,6 +585,9 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_subscribe_init */ +/* + * Publishing code + */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer) { @@ -604,9 +640,27 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ bfree = sizeof (buffer); bfill = 0; - format_json_initialize (buffer, &bfill, &bfree); - format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); - format_json_finalize (buffer, &bfill, &bfree); + if (conf->format == CAMQP_FORMAT_COMMAND) + { + status = create_putval (buffer, sizeof (buffer), ds, vl); + if (status != 0) + { + ERROR ("amqp plugin: create_putval failed with status %i.", + status); + return (status); + } + } + else if (conf->format == CAMQP_FORMAT_JSON) + { + format_json_initialize (buffer, &bfill, &bfree); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); + format_json_finalize (buffer, &bfill, &bfree); + } + else + { + ERROR ("amqp plugin: Invalid format (%i).", conf->format); + return (-1); + } pthread_mutex_lock (&conf->lock); status = camqp_write_locked (conf, buffer); @@ -615,6 +669,36 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ return (status); } /* }}} int camqp_write */ +/* + * Config handling + */ +static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ + camqp_config_t *conf) +{ + char *string; + int status; + + string = NULL; + status = cf_util_get_string (ci, &string); + if (status != 0) + return (status); + + assert (string != NULL); + if (strcasecmp ("Command", string) == 0) + conf->format = CAMQP_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + conf->format = CAMQP_FORMAT_JSON; + else + { + WARNING ("amqp plugin: Invalid format string: %s", + string); + } + + free (string); + + return (0); +} /* }}} int config_set_string */ + static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ _Bool publish) { @@ -633,6 +717,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ memset (conf, 0, sizeof (*conf)); conf->publish = publish; conf->name = NULL; + conf->format = CAMQP_FORMAT_COMMAND; conf->host = NULL; conf->port = 5672; conf->vhost = NULL; @@ -662,7 +747,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Host", child->key) == 0) + if (strcasecmp ("Format", child->key) == 0) + status = camqp_config_set_format (child, conf); + else if (strcasecmp ("Host", child->key) == 0) status = cf_util_get_string (child, &conf->host); else if (strcasecmp ("Port", child->key) == 0) { @@ -776,31 +863,6 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ return (0); } /* }}} int camqp_config */ -static int shutdown (void) /* {{{ */ -{ - size_t i; - - DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", - subscriber_threads_num); - - subscriber_threads_running = 0; - for (i = 0; i < subscriber_threads_num; i++) - { - /* FIXME: Sending a signal is not very elegant here. Maybe find out how - * to use a timeout in the thread and check for the variable in regular - * intervals. */ - pthread_kill (subscriber_threads[i], SIGTERM); - pthread_join (subscriber_threads[i], /* retval = */ NULL); - } - - subscriber_threads_num = 0; - sfree (subscriber_threads); - - DEBUG ("amqp plugin: All subscriber threads exited."); - - return (0); -} /* }}} int shutdown */ - void module_register (void) { plugin_register_complex_config ("amqp", camqp_config); -- 2.11.0