amqp plugin: Implement the "Format" config option.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 13:01:40 +0000 (15:01 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 13:03:08 +0000 (15:03 +0200)
By default, the publishing code now creates "command" (i.e. PUTVAL) output.
For now this is easier to parse, so use this for the subscribing code.
(For now, anyways. I guess JSON will come later, too.)

src/Makefile.am
src/amqp.c

index 222d916..d4d6a1b 100644 (file)
@@ -125,7 +125,9 @@ CLEANFILES =
 
 if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
-amqp_la_SOURCES = amqp.c utils_format_json.c utils_format_json.h
+amqp_la_SOURCES = amqp.c \
+                 utils_cmd_putval.c utils_cmd_putval.h \
+                 utils_format_json.c utils_format_json.h
 amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
 amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
 amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
index eb54795..84bcc06 100644 (file)
@@ -35,6 +35,7 @@
 #include "collectd.h"
 #include "common.h"
 #include "plugin.h"
+#include "utils_cmd_putval.h"
 #include "utils_format_json.h"
 
 #include <amqp.h>
@@ -45,6 +46,9 @@
 #define CAMQP_DM_VOLATILE   1
 #define CAMQP_DM_PERSISTENT 2
 
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON    2
+
 #define CAMQP_CHANNEL 1
 
 /*
@@ -54,6 +58,7 @@ struct camqp_config_s
 {
     _Bool   publish;
     char   *name;
+    int     format;
 
     char   *host;
     int     port;
@@ -392,6 +397,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     return (0);
 } /* }}} int camqp_connect */
 
+static int shutdown (void) /* {{{ */
+{
+    size_t i;
+
+    DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+            subscriber_threads_num);
+
+    subscriber_threads_running = 0;
+    for (i = 0; i < subscriber_threads_num; i++)
+    {
+        /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+         * to use a timeout in the thread and check for the variable in regular
+         * intervals. */
+        pthread_kill (subscriber_threads[i], SIGTERM);
+        pthread_join (subscriber_threads[i], /* retval = */ NULL);
+    }
+
+    subscriber_threads_num = 0;
+    sfree (subscriber_threads);
+
+    DEBUG ("amqp plugin: All subscriber threads exited.");
+
+    return (0);
+} /* }}} int shutdown */
+
+/*
+ * Subscribing code
+ */
 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
         size_t body_size)
 {
@@ -552,6 +585,9 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     return (0);
 } /* }}} int camqp_subscribe_init */
 
+/*
+ * Publishing code
+ */
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         const char *buffer)
 {
@@ -604,9 +640,27 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     bfree = sizeof (buffer);
     bfill = 0;
 
-    format_json_initialize (buffer, &bfill, &bfree);
-    format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
-    format_json_finalize (buffer, &bfill, &bfree);
+    if (conf->format == CAMQP_FORMAT_COMMAND)
+    {
+        status = create_putval (buffer, sizeof (buffer), ds, vl);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: create_putval failed with status %i.",
+                    status);
+            return (status);
+        }
+    }
+    else if (conf->format == CAMQP_FORMAT_JSON)
+    {
+        format_json_initialize (buffer, &bfill, &bfree);
+        format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+        format_json_finalize (buffer, &bfill, &bfree);
+    }
+    else
+    {
+        ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+        return (-1);
+    }
 
     pthread_mutex_lock (&conf->lock);
     status = camqp_write_locked (conf, buffer);
@@ -615,6 +669,36 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     return (status);
 } /* }}} int camqp_write */
 
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+        camqp_config_t *conf)
+{
+    char *string;
+    int status;
+
+    string = NULL;
+    status = cf_util_get_string (ci, &string);
+    if (status != 0)
+        return (status);
+
+    assert (string != NULL);
+    if (strcasecmp ("Command", string) == 0)
+        conf->format = CAMQP_FORMAT_COMMAND;
+    else if (strcasecmp ("JSON", string) == 0)
+        conf->format = CAMQP_FORMAT_JSON;
+    else
+    {
+        WARNING ("amqp plugin: Invalid format string: %s",
+                string);
+    }
+
+    free (string);
+
+    return (0);
+} /* }}} int config_set_string */
+
 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         _Bool publish)
 {
@@ -633,6 +717,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     memset (conf, 0, sizeof (*conf));
     conf->publish = publish;
     conf->name = NULL;
+    conf->format = CAMQP_FORMAT_COMMAND;
     conf->host = NULL;
     conf->port = 5672;
     conf->vhost = NULL;
@@ -662,7 +747,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     {
         oconfig_item_t *child = ci->children + i;
 
-        if (strcasecmp ("Host", child->key) == 0)
+        if (strcasecmp ("Format", child->key) == 0)
+            status = camqp_config_set_format (child, conf);
+        else if (strcasecmp ("Host", child->key) == 0)
             status = cf_util_get_string (child, &conf->host);
         else if (strcasecmp ("Port", child->key) == 0)
         {
@@ -776,31 +863,6 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */
     return (0);
 } /* }}} int camqp_config */
 
-static int shutdown (void) /* {{{ */
-{
-    size_t i;
-
-    DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
-            subscriber_threads_num);
-
-    subscriber_threads_running = 0;
-    for (i = 0; i < subscriber_threads_num; i++)
-    {
-        /* FIXME: Sending a signal is not very elegant here. Maybe find out how
-         * to use a timeout in the thread and check for the variable in regular
-         * intervals. */
-        pthread_kill (subscriber_threads[i], SIGTERM);
-        pthread_join (subscriber_threads[i], /* retval = */ NULL);
-    }
-
-    subscriber_threads_num = 0;
-    sfree (subscriber_threads);
-
-    DEBUG ("amqp plugin: All subscriber threads exited.");
-
-    return (0);
-} /* }}} int shutdown */
-
 void module_register (void)
 {
     plugin_register_complex_config ("amqp", camqp_config);