char *password;
char *exchange;
- char *routingkey;
+ char *routing_key;
/* publish only */
uint8_t delivery_mode;
static const char *def_user = "guest";
static const char *def_password = "guest";
static const char *def_exchange = "amq.fanout";
-static const char *def_routingkey = "collectd";
static pthread_t *subscriber_threads = NULL;
static size_t subscriber_threads_num = 0;
sfree (conf->exchange);
sfree (conf->exchange_type);
sfree (conf->queue);
- sfree (conf->routingkey);
+ sfree (conf->routing_key);
sfree (conf);
} /* }}} void camqp_config_free */
}
}
- DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
- conf->queue, conf->exchange, CONF (conf, routingkey));
-
assert (conf->queue != NULL);
qb_ret = amqp_queue_bind (conf->connection,
/* channel = */ CAMQP_CHANNEL,
/* queue = */ amqp_cstring_bytes (conf->queue),
/* exchange = */ amqp_cstring_bytes (conf->exchange),
-#if 1
- /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
-#else
- /* routing_key = */ AMQP_EMPTY_BYTES,
-#endif
+ /* routing_key = */ (conf->routing_key != NULL)
+ ? amqp_cstring_bytes (conf->routing_key)
+ : AMQP_EMPTY_BYTES,
/* arguments = */ AMQP_EMPTY_TABLE);
if ((qb_ret == NULL) && camqp_is_error (conf))
{
* Publishing code
*/
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
- const char *buffer)
+ const char *buffer, const char *routing_key)
{
amqp_basic_properties_t props;
int status;
status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
amqp_cstring_bytes(CONF(conf, exchange)),
- amqp_cstring_bytes(CONF(conf, routingkey)),
+ amqp_cstring_bytes (routing_key),
/* mandatory = */ 0,
/* immediate = */ 0,
&props,
user_data_t *user_data)
{
camqp_config_t *conf = user_data->data;
+ char routing_key[6 * DATA_MAX_NAME_LEN];
char buffer[4096];
- size_t bfree;
- size_t bfill;
int status;
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
memset (buffer, 0, sizeof (buffer));
- bfree = sizeof (buffer);
- bfill = 0;
+
+ if (conf->routing_key != NULL)
+ {
+ sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+ }
+ else
+ {
+ size_t i;
+ ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host,
+ vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+
+ /* Switch slashes (the only character forbidden by collectd) and dots
+ * (the separation character used by AMQP). */
+ for (i = 0; routing_key[i] != 0; i++)
+ {
+ if (routing_key[i] == '.')
+ routing_key[i] = '/';
+ else if (routing_key[i] == '/')
+ routing_key[i] = '.';
+ }
+ }
if (conf->format == CAMQP_FORMAT_COMMAND)
{
}
else if (conf->format == CAMQP_FORMAT_JSON)
{
+ size_t bfree = sizeof (buffer);
+ size_t bfill = 0;
+
format_json_initialize (buffer, &bfill, &bfree);
format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
format_json_finalize (buffer, &bfill, &bfree);
}
pthread_mutex_lock (&conf->lock);
- status = camqp_write_locked (conf, buffer);
+ status = camqp_write_locked (conf, buffer, routing_key);
pthread_mutex_unlock (&conf->lock);
return (status);
conf->user = NULL;
conf->password = NULL;
conf->exchange = NULL;
- conf->routingkey = NULL;
+ conf->routing_key = NULL;
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;
conf->store_rates = 0;
else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
status = cf_util_get_string (child, &conf->queue);
else if (strcasecmp ("RoutingKey", child->key) == 0)
- status = cf_util_get_string (child, &conf->routingkey);
+ status = cf_util_get_string (child, &conf->routing_key);
else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
{
_Bool tmp = 0;
if ((status == 0) && !publish && (conf->exchange == NULL))
{
- if (conf->routingkey != NULL)
+ if (conf->routing_key != NULL)
WARNING ("amqp plugin: The option \"RoutingKey\" was given "
"without the \"Exchange\" option. It will be ignored.");