amqp plugin: Use the content type to determine how to decode received messages.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 15:06:11 +0000 (17:06 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 15:06:11 +0000 (17:06 +0200)
The "Format" config option is now only valid in "Publish" blocks.

src/amqp.c

index 6be483e..7b9f41b 100644 (file)
@@ -58,7 +58,6 @@ struct camqp_config_s
 {
     _Bool   publish;
     char   *name;
-    int     format;
 
     char   *host;
     int     port;
@@ -72,6 +71,7 @@ struct camqp_config_s
     /* publish only */
     uint8_t delivery_mode;
     _Bool   store_rates;
+    int     format;
 
     /* subscribe only */
     char   *exchange_type;
@@ -420,7 +420,7 @@ static int shutdown (void) /* {{{ */
  * Subscribing code
  */
 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
-        size_t body_size)
+        size_t body_size, const char *content_type)
 {
     char body[body_size + 1];
     char *body_ptr;
@@ -464,7 +464,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
         received += frame.payload.body_fragment.len;
     } /* while (received < body_size) */
 
-    if (conf->format == CAMQP_FORMAT_COMMAND)
+    if (strcasecmp ("text/collectd", content_type) == 0)
     {
         status = handle_putval (stderr, body);
         if (status != 0)
@@ -472,7 +472,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
                     status);
         return (status);
     }
-    else if (conf->format == CAMQP_FORMAT_JSON)
+    else if (strcasecmp ("application/json", content_type) == 0)
     {
         ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
                 "been implemented yet. FIXME!");
@@ -480,8 +480,8 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
     }
     else
     {
-        ERROR ("amqp plugin: camqp_read_body: Unknown format option (%i).",
-                conf->format);
+        ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+                content_type);
         return (EINVAL);
     }
 
@@ -493,6 +493,8 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */
 {
     int status;
     amqp_frame_t frame;
+    amqp_basic_properties_t *properties;
+    char *content_type;
 
     status = amqp_simple_wait_frame (conf->connection, &frame);
     if (status < 0)
@@ -512,7 +514,20 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */
         return (-1);
     }
 
-    return (camqp_read_body (conf, frame.payload.properties.body_size));
+    properties = frame.payload.properties.decoded;
+    content_type = camqp_bytes_cstring (&properties->content_type);
+    if (content_type == NULL)
+    {
+        ERROR ("amqp plugin: Unable to determine content type.");
+        return (-1);
+    }
+
+    status = camqp_read_body (conf,
+            (size_t) frame.payload.properties.body_size,
+            content_type);
+
+    sfree (content_type);
+    return (status);
 } /* }}} int camqp_read_header */
 
 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
@@ -616,7 +631,12 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
         | AMQP_BASIC_DELIVERY_MODE_FLAG
         | AMQP_BASIC_APP_ID_FLAG;
-    props.content_type = amqp_cstring_bytes("application/json");
+    if (conf->format == CAMQP_FORMAT_COMMAND)
+        props.content_type = amqp_cstring_bytes("text/collectd");
+    else if (conf->format == CAMQP_FORMAT_JSON)
+        props.content_type = amqp_cstring_bytes("application/json");
+    else
+        assert (23 == 42);
     props.delivery_mode = conf->delivery_mode;
     props.app_id = amqp_cstring_bytes("collectd");
 
@@ -784,9 +804,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     {
         oconfig_item_t *child = ci->children + i;
 
-        if (strcasecmp ("Format", child->key) == 0)
-            status = camqp_config_set_format (child, conf);
-        else if (strcasecmp ("Host", child->key) == 0)
+        if (strcasecmp ("Host", child->key) == 0)
             status = cf_util_get_string (child, &conf->host);
         else if (strcasecmp ("Port", child->key) == 0)
         {
@@ -822,6 +840,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         }
         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
             status = cf_util_get_boolean (child, &conf->store_rates);
+        else if ((strcasecmp ("Format", child->key) == 0) && publish)
+            status = camqp_config_set_format (child, conf);
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);