X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Famqp.c;h=89f051e81960668c957abfe72239c542ed4681d2;hb=4d370741101aeb037ae52f3529a4a0869e0dc08a;hp=767a8776bbfb07ce2a26f932cf7fb5da4d22d047;hpb=affac33e83584e7538c358e3bd0a587d0c692bc3;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 767a8776..89f051e8 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -23,21 +23,34 @@ * * Authors: * Sebastien Pahl - * Florian Forster + * Florian Forster **/ #include "collectd.h" + #include "common.h" #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" #include "utils_format_graphite.h" -#include - #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_TCP_SOCKET +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 @@ -66,6 +79,9 @@ struct camqp_config_s char *exchange; char *routing_key; + /* Number of seconds to wait before connection is retried */ + int connection_retry_delay; + /* publish only */ uint8_t delivery_mode; _Bool store_rates; @@ -79,6 +95,8 @@ struct camqp_config_s /* subscribe only */ char *exchange_type; char *queue; + _Bool queue_durable; + _Bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -180,11 +198,11 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - sstrncpy (buffer, "Success", sizeof (buffer)); + sstrncpy (buffer, "Success", buffer_size); break; case AMQP_RESPONSE_NONE: - sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + sstrncpy (buffer, "Missing RPC reply type", buffer_size); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: @@ -196,7 +214,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (sstrerror (r.library_error, buffer, buffer_size)); #endif else - sstrncpy (buffer, "End of stream", sizeof (buffer)); + sstrncpy (buffer, "End of stream", buffer_size); break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -285,6 +303,10 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ /* type = */ amqp_cstring_bytes (conf->exchange_type), /* passive = */ 0, /* durable = */ 0, +#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000 + /* auto delete = */ 0, + /* internal = */ 0, +#endif /* arguments = */ argument_table); if ((ed_ret == NULL) && camqp_is_error (conf)) { @@ -314,9 +336,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ ? amqp_cstring_bytes (conf->queue) : AMQP_EMPTY_BYTES, /* passive = */ 0, - /* durable = */ 0, + /* durable = */ conf->queue_durable, /* exclusive = */ 0, - /* auto_delete = */ 1, + /* auto_delete = */ conf->queue_auto_delete, /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { @@ -389,13 +411,32 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { + static time_t last_connect_time = 0; + amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); + time_t now = time(NULL); + if (now < (last_connect_time + conf->connection_retry_delay)) + { + DEBUG("amqp plugin: skipping connection retry, " + "ConnectionRetryDelay: %d", conf->connection_retry_delay); + return(1); + } + else + { + DEBUG ("amqp plugin: retrying connection"); + last_connect_time = now; + } + conf->connection = amqp_new_connection (); if (conf->connection == NULL) { @@ -403,6 +444,33 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */ + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET() close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -415,6 +483,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -427,7 +496,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -440,7 +509,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -459,13 +528,11 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ static int camqp_shutdown (void) /* {{{ */ { - size_t i; - DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", subscriber_threads_num); subscriber_threads_running = 0; - for (i = 0; i < subscriber_threads_num; i++) + for (size_t i = 0; i < subscriber_threads_num; i++) { /* FIXME: Sending a signal is not very elegant here. Maybe find out how * to use a timeout in the thread and check for the variable in regular @@ -646,7 +713,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ continue; } - status = camqp_read_header (conf); + camqp_read_header (conf); amqp_maybe_release_buffers (conf->connection); } /* while (subscriber_threads_running) */ @@ -666,7 +733,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ if (tmp == NULL) { ERROR ("amqp plugin: realloc failed."); - camqp_config_free (conf); + sfree (subscriber_threads); return (ENOMEM); } subscriber_threads = tmp; @@ -680,7 +747,6 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ char errbuf[1024]; ERROR ("amqp plugin: pthread_create failed: %s", sstrerror (status, errbuf, sizeof (errbuf))); - camqp_config_free (conf); return (status); } @@ -696,17 +762,20 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer, const char *routing_key) { - amqp_basic_properties_t props; int status; status = camqp_connect (conf); if (status != 0) return (status); - memset (&props, 0, sizeof (props)); - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG - | AMQP_BASIC_DELIVERY_MODE_FLAG - | AMQP_BASIC_APP_ID_FLAG; + amqp_basic_properties_t props = { + ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG + | AMQP_BASIC_APP_ID_FLAG, + .delivery_mode = conf->delivery_mode, + .app_id = amqp_cstring_bytes("collectd") + }; + if (conf->format == CAMQP_FORMAT_COMMAND) props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) @@ -715,8 +784,6 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); - props.delivery_mode = conf->delivery_mode; - props.app_id = amqp_cstring_bytes("collectd"); status = amqp_basic_publish(conf->connection, /* channel = */ 1, @@ -741,21 +808,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { camqp_config_t *conf = user_data->data; char routing_key[6 * DATA_MAX_NAME_LEN]; - char buffer[4096]; + char buffer[8192]; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); - memset (buffer, 0, sizeof (buffer)); - if (conf->routing_key != NULL) { sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); } else { - size_t i; ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", vl->host, vl->plugin, vl->plugin_instance, @@ -763,7 +827,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ /* Switch slashes (the only character forbidden by collectd) and dots * (the separation character used by AMQP). */ - for (i = 0; routing_key[i] != 0; i++) + for (size_t i = 0; routing_key[i] != 0; i++) { if (routing_key[i] == '.') routing_key[i] = '/'; @@ -853,17 +917,15 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { camqp_config_t *conf; int status; - int i; - conf = malloc (sizeof (*conf)); + conf = calloc (1, sizeof (*conf)); if (conf == NULL) { - ERROR ("amqp plugin: malloc failed."); + ERROR ("amqp plugin: calloc failed."); return (ENOMEM); } /* Initialize "conf" {{{ */ - memset (conf, 0, sizeof (*conf)); conf->publish = publish; conf->name = NULL; conf->format = CAMQP_FORMAT_COMMAND; @@ -874,9 +936,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->password = NULL; conf->exchange = NULL; conf->routing_key = NULL; + conf->connection_retry_delay = 0; + /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -884,6 +949,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; + conf->queue_durable = 0; + conf->queue_auto_delete = 1; /* general */ conf->connection = NULL; pthread_mutex_init (&conf->lock, /* attr = */ NULL); @@ -896,7 +963,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ return (status); } - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -923,6 +990,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); + else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_durable); + else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_auto_delete); else if (strcasecmp ("RoutingKey", child->key) == 0) status = cf_util_get_string (child, &conf->routing_key); else if ((strcasecmp ("Persistent", child->key) == 0) && publish) @@ -942,6 +1013,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) @@ -956,6 +1033,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->escape_char = tmp_buff[0]; sfree (tmp_buff); } + else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0) + status = cf_util_get_int (child, &conf->connection_retry_delay); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); @@ -1017,9 +1096,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ static int camqp_config (oconfig_item_t *ci) /* {{{ */ { - int i; - - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i;