Merge branch 'collectd-5.4' into collectd-5.5
[collectd.git] / src / amqp.c
index bf0f80f..aba4f01 100644 (file)
@@ -23,7 +23,7 @@
  *
  * Authors:
  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
- *   Florian Forster <octo at verplant.org>
+ *   Florian Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
@@ -80,6 +80,9 @@ struct camqp_config_s
     char   *exchange;
     char   *routing_key;
 
+    /* Number of seconds to wait before connection is retried */
+    int     connection_retry_delay;
+
     /* publish only */
     uint8_t delivery_mode;
     _Bool   store_rates;
@@ -93,6 +96,8 @@ struct camqp_config_s
     /* subscribe only */
     char   *exchange_type;
     char   *queue;
+    _Bool   queue_durable;
+    _Bool   queue_auto_delete;
 
     amqp_connection_state_t connection;
     pthread_mutex_t lock;
@@ -332,9 +337,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             ? amqp_cstring_bytes (conf->queue)
             : AMQP_EMPTY_BYTES,
             /* passive     = */ 0,
-            /* durable     = */ 0,
+            /* durable     = */ conf->queue_durable,
             /* exclusive   = */ 0,
-            /* auto_delete = */ 1,
+            /* auto_delete = */ conf->queue_auto_delete,
             /* arguments   = */ AMQP_EMPTY_TABLE);
     if (qd_ret == NULL)
     {
@@ -407,6 +412,8 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
+    static time_t last_connect_time = 0;
+
     amqp_rpc_reply_t reply;
     int status;
 #ifdef HAVE_AMQP_TCP_SOCKET
@@ -418,6 +425,19 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     if (conf->connection != NULL)
         return (0);
 
+    time_t now = time(NULL);
+    if (now < (last_connect_time + conf->connection_retry_delay))
+    {
+        DEBUG("amqp plugin: skipping connection retry, "
+            "ConnectionRetryDelay: %d", conf->connection_retry_delay);
+        return(1);
+    }
+    else
+    {
+        DEBUG ("amqp plugin: retrying connection");
+        last_connect_time = now;
+    }
+
     conf->connection = amqp_new_connection ();
     if (conf->connection == NULL)
     {
@@ -791,7 +811,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 {
     camqp_config_t *conf = user_data->data;
     char routing_key[6 * DATA_MAX_NAME_LEN];
-    char buffer[4096];
+    char buffer[8192];
     int status;
 
     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
@@ -924,6 +944,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     conf->password = NULL;
     conf->exchange = NULL;
     conf->routing_key = NULL;
+    conf->connection_retry_delay = 0;
+
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
@@ -935,6 +957,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
+    conf->queue_durable = 0;
+    conf->queue_auto_delete = 1;
     /* general */
     conf->connection = NULL;
     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
@@ -974,6 +998,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_string (child, &conf->exchange_type);
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
             status = cf_util_get_string (child, &conf->queue);
+        else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_durable);
+        else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_auto_delete);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
             status = cf_util_get_string (child, &conf->routing_key);
         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
@@ -1013,6 +1041,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             conf->escape_char = tmp_buff[0];
             sfree (tmp_buff);
         }
+        else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0)
+            status = cf_util_get_int (child, &conf->connection_retry_delay);
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);