{
_Bool publish;
char *name;
- int format;
char *host;
int port;
/* publish only */
uint8_t delivery_mode;
_Bool store_rates;
+ int format;
/* subscribe only */
char *exchange_type;
* Subscribing code
*/
static int camqp_read_body (camqp_config_t *conf, /* {{{ */
- size_t body_size)
+ size_t body_size, const char *content_type)
{
char body[body_size + 1];
char *body_ptr;
received += frame.payload.body_fragment.len;
} /* while (received < body_size) */
- if (conf->format == CAMQP_FORMAT_COMMAND)
+ if (strcasecmp ("text/collectd", content_type) == 0)
{
status = handle_putval (stderr, body);
if (status != 0)
status);
return (status);
}
- else if (conf->format == CAMQP_FORMAT_JSON)
+ else if (strcasecmp ("application/json", content_type) == 0)
{
ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
"been implemented yet. FIXME!");
}
else
{
- ERROR ("amqp plugin: camqp_read_body: Unknown format option (%i).",
- conf->format);
+ ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+ content_type);
return (EINVAL);
}
{
int status;
amqp_frame_t frame;
+ amqp_basic_properties_t *properties;
+ char *content_type;
status = amqp_simple_wait_frame (conf->connection, &frame);
if (status < 0)
return (-1);
}
- return (camqp_read_body (conf, frame.payload.properties.body_size));
+ properties = frame.payload.properties.decoded;
+ content_type = camqp_bytes_cstring (&properties->content_type);
+ if (content_type == NULL)
+ {
+ ERROR ("amqp plugin: Unable to determine content type.");
+ return (-1);
+ }
+
+ status = camqp_read_body (conf,
+ (size_t) frame.payload.properties.body_size,
+ content_type);
+
+ sfree (content_type);
+ return (status);
} /* }}} int camqp_read_header */
static void *camqp_subscribe_thread (void *user_data) /* {{{ */
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
| AMQP_BASIC_DELIVERY_MODE_FLAG
| AMQP_BASIC_APP_ID_FLAG;
- props.content_type = amqp_cstring_bytes("application/json");
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ props.content_type = amqp_cstring_bytes("text/collectd");
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ props.content_type = amqp_cstring_bytes("application/json");
+ else
+ assert (23 == 42);
props.delivery_mode = conf->delivery_mode;
props.app_id = amqp_cstring_bytes("collectd");
{
oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Format", child->key) == 0)
- status = camqp_config_set_format (child, conf);
- else if (strcasecmp ("Host", child->key) == 0)
+ if (strcasecmp ("Host", child->key) == 0)
status = cf_util_get_string (child, &conf->host);
else if (strcasecmp ("Port", child->key) == 0)
{
}
else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
status = cf_util_get_boolean (child, &conf->store_rates);
+ else if ((strcasecmp ("Format", child->key) == 0) && publish)
+ status = camqp_config_set_format (child, conf);
else
WARNING ("amqp plugin: Ignoring unknown "
"configuration option \"%s\".", child->key);