From a5d42f913d160a25e921b94008285a5da7009680 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 6 Aug 2010 17:06:11 +0200 Subject: [PATCH] amqp plugin: Use the content type to determine how to decode received messages. The "Format" config option is now only valid in "Publish" blocks. --- src/amqp.c | 42 +++++++++++++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index 6be483e2..7b9f41b2 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -58,7 +58,6 @@ struct camqp_config_s { _Bool publish; char *name; - int format; char *host; int port; @@ -72,6 +71,7 @@ struct camqp_config_s /* publish only */ uint8_t delivery_mode; _Bool store_rates; + int format; /* subscribe only */ char *exchange_type; @@ -420,7 +420,7 @@ static int shutdown (void) /* {{{ */ * Subscribing code */ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ - size_t body_size) + size_t body_size, const char *content_type) { char body[body_size + 1]; char *body_ptr; @@ -464,7 +464,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ received += frame.payload.body_fragment.len; } /* while (received < body_size) */ - if (conf->format == CAMQP_FORMAT_COMMAND) + if (strcasecmp ("text/collectd", content_type) == 0) { status = handle_putval (stderr, body); if (status != 0) @@ -472,7 +472,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ status); return (status); } - else if (conf->format == CAMQP_FORMAT_JSON) + else if (strcasecmp ("application/json", content_type) == 0) { ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not " "been implemented yet. FIXME!"); @@ -480,8 +480,8 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ } else { - ERROR ("amqp plugin: camqp_read_body: Unknown format option (%i).", - conf->format); + ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".", + content_type); return (EINVAL); } @@ -493,6 +493,8 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ { int status; amqp_frame_t frame; + amqp_basic_properties_t *properties; + char *content_type; status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) @@ -512,7 +514,20 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ return (-1); } - return (camqp_read_body (conf, frame.payload.properties.body_size)); + properties = frame.payload.properties.decoded; + content_type = camqp_bytes_cstring (&properties->content_type); + if (content_type == NULL) + { + ERROR ("amqp plugin: Unable to determine content type."); + return (-1); + } + + status = camqp_read_body (conf, + (size_t) frame.payload.properties.body_size, + content_type); + + sfree (content_type); + return (status); } /* }}} int camqp_read_header */ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ @@ -616,7 +631,12 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_APP_ID_FLAG; - props.content_type = amqp_cstring_bytes("application/json"); + if (conf->format == CAMQP_FORMAT_COMMAND) + props.content_type = amqp_cstring_bytes("text/collectd"); + else if (conf->format == CAMQP_FORMAT_JSON) + props.content_type = amqp_cstring_bytes("application/json"); + else + assert (23 == 42); props.delivery_mode = conf->delivery_mode; props.app_id = amqp_cstring_bytes("collectd"); @@ -784,9 +804,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Format", child->key) == 0) - status = camqp_config_set_format (child, conf); - else if (strcasecmp ("Host", child->key) == 0) + if (strcasecmp ("Host", child->key) == 0) status = cf_util_get_string (child, &conf->host); else if (strcasecmp ("Port", child->key) == 0) { @@ -822,6 +840,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) status = cf_util_get_boolean (child, &conf->store_rates); + else if ((strcasecmp ("Format", child->key) == 0) && publish) + status = camqp_config_set_format (child, conf); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); -- 2.11.0