From 4e0e66e22701ed7c4bba5c80b551e4db6c85182d Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 5 Aug 2010 11:18:53 +0200 Subject: [PATCH] amqp plugin: Implement publishing to multiple brokers. --- src/amqp.c | 386 ++++++++++++++++++++++++++++++++------------------- src/collectd.conf.in | 20 +-- 2 files changed, 252 insertions(+), 154 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index 748ff9f1..a72cec36 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -42,8 +42,37 @@ /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ -#define AMQP_DM_VOLATILE 1 -#define AMQP_DM_PERSISTENT 2 +#define CAMQP_DM_VOLATILE 1 +#define CAMQP_DM_PERSISTENT 2 + +#define CAMQP_CHANNEL 1 + +/* + * Data types + */ +struct camqp_config_s +{ + _Bool publish; + char *name; + + char *host; + int port; + char *vhost; + char *user; + char *password; + + char *exchange; + char *exchange_type; + char *queue; + char *routingkey; + uint8_t delivery_mode; + + _Bool store_rates; + + amqp_connection_state_t connection; + pthread_mutex_t lock; +}; +typedef struct camqp_config_s camqp_config_t; /* * Global variables @@ -55,164 +84,118 @@ static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; static const char *def_routingkey = "collectd"; -static char *conf_host = NULL; -static char *conf_vhost = NULL; -static char *conf_user = NULL; -static char *conf_password = NULL; -static char *conf_exchange = NULL; -static char *conf_routingkey = NULL; -static int conf_port = 5672; -static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE; -static _Bool conf_store_rates = 0; - -#define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f) - -static amqp_connection_state_t amqp_conn = NULL; -static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER; - -static const char *config_keys[] = -{ - "Host", - "Port", - "VHost", - "User", - "Password", - "Exchange", - "RoutingKey", - "Persistent", - "StoreRates" -}; -static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); +#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f) /* * Functions */ -static int config_set(char **var, const char *value) +static void camqp_close_connection (camqp_config_t *conf) /* {{{ */ { - sfree(*var); - if ((*var = strdup(value)) == NULL) - return (1); - return (0); -} /* int config_set */ + int sockfd; + + if ((conf == NULL) || (conf->connection == NULL)) + return; -static int config(const char *key, const char *value) + sockfd = amqp_get_sockfd (conf->connection); + amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); + close (sockfd); + conf->connection = NULL; +} /* }}} void camqp_close_connection */ + +static void camqp_config_free (void *ptr) /* {{{ */ { - if (strcasecmp(key, "host") == 0) - return (config_set(&conf_host, value)); - else if(strcasecmp(key, "port") == 0) - { - int tmp; + camqp_config_t *conf = ptr; - tmp = service_name_to_port_number (value); - if (tmp <= 0) - { - ERROR ("AMQP plugin: Cannot parse `%s' as a " - "service name (port number).", value); - return (1); - } + if (conf == NULL) + return; - conf_port = tmp; - return (0); - } - else if (strcasecmp(key, "vhost") == 0) - return (config_set(&conf_vhost, value)); - else if (strcasecmp(key, "user") == 0) - return (config_set(&conf_user, value)); - else if (strcasecmp(key, "password") == 0) - return (config_set(&conf_password, value)); - else if (strcasecmp(key, "exchange") == 0) - return (config_set(&conf_exchange, value)); - else if (strcasecmp(key, "routingkey") == 0) - return (config_set(&conf_routingkey, value)); - else if (strcasecmp ("Persistent", key) == 0) - { - if (IS_TRUE (value)) - conf_delivery_mode = AMQP_DM_PERSISTENT; - else - conf_delivery_mode = AMQP_DM_VOLATILE; - return (0); - } - else if (strcasecmp ("StoreRates", key) == 0) - { - if (IS_TRUE (value)) - conf_store_rates = 1; - else - conf_store_rates = 0; - return (0); - } - return (-1); -} /* int config */ + camqp_close_connection (conf); + + sfree (conf->name); + sfree (conf->host); + sfree (conf->vhost); + sfree (conf->user); + sfree (conf->password); + sfree (conf->exchange); + sfree (conf->exchange_type); + sfree (conf->queue); + sfree (conf->routingkey); -static int amqp_connect (void) + sfree (conf); +} /* }}} void camqp_config_free */ + +static int amqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; int sockfd; int status; - if (amqp_conn != NULL) + if (conf->connection != NULL) return (0); - amqp_conn = amqp_new_connection (); - if (amqp_conn == NULL) + conf->connection = amqp_new_connection (); + if (conf->connection == NULL) { ERROR ("amqp plugin: amqp_new_connection failed."); return (ENOMEM); } - sockfd = amqp_open_socket (CONF(host), conf_port); + sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { char errbuf[1024]; status = (-1) * sockfd; ERROR ("amqp plugin: amqp_open_socket failed: %s", sstrerror (status, errbuf, sizeof (errbuf))); - amqp_destroy_connection(amqp_conn); - amqp_conn = NULL; + amqp_destroy_connection (conf->connection); + conf->connection = NULL; return (status); } + amqp_set_sockfd (conf->connection, sockfd); - amqp_set_sockfd (amqp_conn, sockfd); - - reply = amqp_login (amqp_conn, CONF(vhost), + reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, /* frame max = */ 131072, /* heartbeat = */ 0, /* authentication = */ AMQP_SASL_METHOD_PLAIN, - CONF(user), CONF(password)); + CONF(conf, user), CONF(conf, password)); if (reply.reply_type != AMQP_RESPONSE_NORMAL) { ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", - CONF(vhost), CONF(user)); - amqp_destroy_connection (amqp_conn); + CONF(conf, vhost), CONF(conf, user)); + amqp_destroy_connection (conf->connection); close (sockfd); - amqp_conn = NULL; + conf->connection = NULL; return (1); } - amqp_channel_open (amqp_conn, /* channel = */ 1); + amqp_channel_open (conf->connection, /* channel = */ 1); /* FIXME: Is checking "reply.reply_type" really correct here? How does * it get set? --octo */ if (reply.reply_type != AMQP_RESPONSE_NORMAL) { ERROR ("amqp plugin: amqp_channel_open failed."); - amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(amqp_conn); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); close(sockfd); - amqp_conn = NULL; + conf->connection = NULL; return (1); } INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " - "on %s:%i.", CONF(vhost), CONF(host), conf_port); + "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); return (0); -} /* int amqp_connect */ +} /* }}} int amqp_connect */ -static int amqp_write_locked (const char *buffer) +static int amqp_write_locked (camqp_config_t *conf, /* {{{ */ + const char *buffer) { amqp_basic_properties_t props; int status; - status = amqp_connect (); + status = amqp_connect (conf); if (status != 0) return (status); @@ -221,44 +204,37 @@ static int amqp_write_locked (const char *buffer) | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_APP_ID_FLAG; props.content_type = amqp_cstring_bytes("application/json"); - props.delivery_mode = conf_delivery_mode; + props.delivery_mode = conf->delivery_mode; props.app_id = amqp_cstring_bytes("collectd"); - status = amqp_basic_publish(amqp_conn, + status = amqp_basic_publish(conf->connection, /* channel = */ 1, - amqp_cstring_bytes(CONF(exchange)), - amqp_cstring_bytes(CONF(routingkey)), + amqp_cstring_bytes(CONF(conf, exchange)), + amqp_cstring_bytes(CONF(conf, routingkey)), /* mandatory = */ 0, /* immediate = */ 0, &props, amqp_cstring_bytes(buffer)); if (status != 0) { - int sockfd; - ERROR ("amqp plugin: amqp_basic_publish failed with status %i.", status); - - sockfd = amqp_get_sockfd (amqp_conn); - amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection (amqp_conn); - close (sockfd); - amqp_conn = NULL; + camqp_close_connection (conf); } return (status); -} /* int amqp_write_locked */ +} /* }}} int amqp_write_locked */ -static int amqp_write (const data_set_t *ds, const value_list_t *vl, - __attribute__((unused)) user_data_t *user_data) +static int amqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) { + camqp_config_t *conf = user_data->data; char buffer[4096]; size_t bfree; size_t bfill; int status; - if ((ds == NULL) || (vl == NULL)) + if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); memset (buffer, 0, sizeof (buffer)); @@ -266,47 +242,167 @@ static int amqp_write (const data_set_t *ds, const value_list_t *vl, bfill = 0; format_json_initialize (buffer, &bfill, &bfree); - format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); - pthread_mutex_lock (&amqp_conn_lock); - status = amqp_write_locked (buffer); - pthread_mutex_unlock (&amqp_conn_lock); + pthread_mutex_lock (&conf->lock); + status = amqp_write_locked (conf, buffer); + pthread_mutex_unlock (&conf->lock); return (status); -} /* int amqp_write */ +} /* }}} int amqp_write */ -static int shutdown (void) +static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ + _Bool publish) { - pthread_mutex_lock (&amqp_conn_lock); - if (amqp_conn != NULL) + camqp_config_t *conf; + int status; + int i; + + conf = malloc (sizeof (*conf)); + if (conf == NULL) { - int sockfd; + ERROR ("amqp plugin: malloc failed."); + return (ENOMEM); + } - sockfd = amqp_get_sockfd (amqp_conn); - amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection (amqp_conn); - close(sockfd); - amqp_conn = NULL; + /* Initialize "conf" {{{ */ + memset (conf, 0, sizeof (*conf)); + conf->publish = publish; + conf->name = NULL; + conf->host = NULL; + conf->port = 5672; + conf->vhost = NULL; + conf->user = NULL; + conf->password = NULL; + conf->exchange = NULL; + conf->exchange_type = NULL; + conf->queue = NULL; + conf->routingkey = NULL; + conf->delivery_mode = CAMQP_DM_VOLATILE; + conf->store_rates = 0; + conf->connection = NULL; + pthread_mutex_init (&conf->lock, /* attr = */ NULL); + /* }}} */ + + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + sfree (conf); + return (status); } - pthread_mutex_unlock (&amqp_conn_lock); - sfree(conf_host); - sfree(conf_vhost); - sfree(conf_user); - sfree(conf_password); - sfree(conf_exchange); - sfree(conf_routingkey); + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (ci, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) + { + status = cf_util_get_port_number (child); + if (status > 0) + { + conf->port = status; + status = 0; + } + } + else if (strcasecmp ("VHost", child->key) == 0) + status = cf_util_get_string (ci, &conf->vhost); + else if (strcasecmp ("User", child->key) == 0) + status = cf_util_get_string (ci, &conf->user); + else if (strcasecmp ("Password", child->key) == 0) + status = cf_util_get_string (ci, &conf->password); + else if (strcasecmp ("Exchange", child->key) == 0) + status = cf_util_get_string (ci, &conf->exchange); + else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish) + status = cf_util_get_string (ci, &conf->exchange_type); + else if ((strcasecmp ("Queue", child->key) == 0) && !publish) + status = cf_util_get_string (ci, &conf->queue); + else if (strcasecmp ("RoutingKey", child->key) == 0) + status = cf_util_get_string (ci, &conf->routingkey); + else if (strcasecmp ("Persistent", child->key) == 0) + { + _Bool tmp = 0; + status = cf_util_get_boolean (ci, &tmp); + if (tmp) + conf->delivery_mode = CAMQP_DM_PERSISTENT; + else + conf->delivery_mode = CAMQP_DM_VOLATILE; + } + else if (strcasecmp ("StoreRates", child->key) == 0) + status = cf_util_get_boolean (ci, &conf->store_rates); + else + WARNING ("amqp plugin: Ignoring unknown " + "configuration option \"%s\".", child->key); + + if (status != 0) + break; + } /* for (i = 0; i < ci->children_num; i++) */ + + if ((status == 0) && !publish && (conf->exchange == NULL)) + { + if (conf->routingkey != NULL) + WARNING ("amqp plugin: The option \"RoutingKey\" was given " + "without the \"Exchange\" option. It will be ignored."); + + if (conf->exchange_type != NULL) + WARNING ("amqp plugin: The option \"ExchangeType\" was given " + "without the \"Exchange\" option. It will be ignored."); + } + + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + + if (publish) + { + char cbname[128]; + user_data_t ud = { conf, camqp_config_free }; + + ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name); + + status = plugin_register_write (cbname, amqp_write, &ud); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } + + return (0); +} /* }}} int camqp_config_connection */ + +static int camqp_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Publish", child->key) == 0) + camqp_config_connection (child, /* publish = */ 1); + else + WARNING ("amqp plugin: Ignoring unknown config option \"%s\".", + child->key); + } /* for (ci->children_num) */ + + return (0); +} /* }}} int camqp_config */ + +static int shutdown (void) /* {{{ */ +{ + /* FIXME: Set a global shutdown variable here. */ return (0); -} /* int shutdown */ +} /* }}} int shutdown */ void module_register (void) { - plugin_register_config ("amqp", config, config_keys, config_keys_num); - plugin_register_write ("amqp", amqp_write, NULL); + plugin_register_complex_config ("amqp", camqp_config); plugin_register_shutdown ("amqp", shutdown); } /* void module_register */ -/* vim: set sw=4 sts=4 et : */ +/* vim: set sw=4 sts=4 et fdm=marker : */ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index e07920b6..31a113a0 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -150,15 +150,17 @@ ############################################################################## # -# Host "localhost" -# Port "5672" -# VHost "/" -# User "collectd" -# Password "aiwaeZ0y" -# Exchange "amq.direct" -# RoutingKey "routing_key" -# Persistent false -# StoreRates false +# +# Host "localhost" +# Port "5672" +# VHost "/" +# User "guest" +# Password "guest" +# Exchange "amq.fanout" +# RoutingKey "collectd" +# Persistent false +# StoreRates false +# # # -- 2.11.0