amqp plugin: Chose (hopefully sane) default values for all config options.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 5 Aug 2010 07:32:47 +0000 (09:32 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 5 Aug 2010 07:32:47 +0000 (09:32 +0200)
src/amqp.c

index 1f61080..748ff9f 100644 (file)
 /*
  * Global variables
  */
-static int   port       = 5672;
-static char *host       = NULL;
-static char *vhost      = NULL;
-static char *user       = NULL;
-static char *password   = NULL;
-static char *exchange   = NULL;
-static char *routingkey = NULL;
-static uint8_t delivery_mode = AMQP_DM_VOLATILE;
-static _Bool store_rates = 0;
+static const char *def_host       = "localhost";
+static const char *def_vhost      = "/";
+static const char *def_user       = "guest";
+static const char *def_password   = "guest";
+static const char *def_exchange   = "amq.fanout";
+static const char *def_routingkey = "collectd";
+
+static char *conf_host       = NULL;
+static char *conf_vhost      = NULL;
+static char *conf_user       = NULL;
+static char *conf_password   = NULL;
+static char *conf_exchange   = NULL;
+static char *conf_routingkey = NULL;
+static int   conf_port       = 5672;
+static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
+static _Bool conf_store_rates = 0;
+
+#define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
 
 static amqp_connection_state_t amqp_conn = NULL;
 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -89,7 +98,7 @@ static int config_set(char **var, const char *value)
 static int config(const char *key, const char *value)
 {
     if (strcasecmp(key, "host") == 0)
-        return (config_set(&host, value));
+        return (config_set(&conf_host, value));
     else if(strcasecmp(key, "port") == 0)
     {
         int tmp;
@@ -102,33 +111,33 @@ static int config(const char *key, const char *value)
             return (1);
         }
 
-        port = tmp;
+        conf_port = tmp;
         return (0);
     }
     else if (strcasecmp(key, "vhost") == 0)
-        return (config_set(&vhost, value));
+        return (config_set(&conf_vhost, value));
     else if (strcasecmp(key, "user") == 0)
-        return (config_set(&user, value));
+        return (config_set(&conf_user, value));
     else if (strcasecmp(key, "password") == 0)
-        return (config_set(&password, value));
+        return (config_set(&conf_password, value));
     else if (strcasecmp(key, "exchange") == 0)
-        return (config_set(&exchange, value));
+        return (config_set(&conf_exchange, value));
     else if (strcasecmp(key, "routingkey") == 0)
-        return (config_set(&routingkey, value));
+        return (config_set(&conf_routingkey, value));
     else if (strcasecmp ("Persistent", key) == 0)
     {
         if (IS_TRUE (value))
-            delivery_mode = AMQP_DM_PERSISTENT;
+            conf_delivery_mode = AMQP_DM_PERSISTENT;
         else
-            delivery_mode = AMQP_DM_VOLATILE;
+            conf_delivery_mode = AMQP_DM_VOLATILE;
         return (0);
     }
     else if (strcasecmp ("StoreRates", key) == 0)
     {
         if (IS_TRUE (value))
-            store_rates = 1;
+            conf_store_rates = 1;
         else
-            store_rates = 0;
+            conf_store_rates = 0;
         return (0);
     }
     return (-1);
@@ -150,7 +159,7 @@ static int amqp_connect (void)
         return (ENOMEM);
     }
 
-    sockfd = amqp_open_socket (host, port);
+    sockfd = amqp_open_socket (CONF(host), conf_port);
     if (sockfd < 0)
     {
         char errbuf[1024];
@@ -164,17 +173,18 @@ static int amqp_connect (void)
 
     amqp_set_sockfd (amqp_conn, sockfd);
 
-    reply = amqp_login(amqp_conn, vhost,
+    reply = amqp_login (amqp_conn, CONF(vhost),
             /* channel max = */      0,
-            /* frame max = */   131072,
-            /* heartbeat = */        0,
-            /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+            /* frame max   = */ 131072,
+            /* heartbeat   = */      0,
+            /* authentication = */ AMQP_SASL_METHOD_PLAIN,
+            CONF(user), CONF(password));
     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
     {
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
-                vhost, user);
-        amqp_destroy_connection(amqp_conn);
-        close(sockfd);
+                CONF(vhost), CONF(user));
+        amqp_destroy_connection (amqp_conn);
+        close (sockfd);
         amqp_conn = NULL;
         return (1);
     }
@@ -193,7 +203,7 @@ static int amqp_connect (void)
     }
 
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
-            "on %s:%i.", vhost, host, port);
+            "on %s:%i.", CONF(vhost), CONF(host), conf_port);
     return (0);
 } /* int amqp_connect */
 
@@ -207,14 +217,17 @@ static int amqp_write_locked (const char *buffer)
         return (status);
 
     memset (&props, 0, sizeof (props));
-    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+        | AMQP_BASIC_DELIVERY_MODE_FLAG
+        | AMQP_BASIC_APP_ID_FLAG;
     props.content_type = amqp_cstring_bytes("application/json");
-    props.delivery_mode = delivery_mode;
+    props.delivery_mode = conf_delivery_mode;
+    props.app_id = amqp_cstring_bytes("collectd");
 
     status = amqp_basic_publish(amqp_conn,
                 /* channel = */ 1,
-                amqp_cstring_bytes(exchange),
-                amqp_cstring_bytes(routingkey),
+                amqp_cstring_bytes(CONF(exchange)),
+                amqp_cstring_bytes(CONF(routingkey)),
                 /* mandatory = */ 0,
                 /* immediate = */ 0,
                 &props,
@@ -253,7 +266,7 @@ static int amqp_write (const data_set_t *ds, const value_list_t *vl,
     bfill = 0;
 
     format_json_initialize (buffer, &bfill, &bfree);
-    format_json_value_list (buffer, &bfill, &bfree, ds, vl, store_rates);
+    format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
     format_json_finalize (buffer, &bfill, &bfree);
 
     pthread_mutex_lock (&amqp_conn_lock);
@@ -279,12 +292,12 @@ static int shutdown (void)
     }
     pthread_mutex_unlock (&amqp_conn_lock);
 
-    sfree(host);
-    sfree(vhost);
-    sfree(user);
-    sfree(password);
-    sfree(exchange);
-    sfree(routingkey);
+    sfree(conf_host);
+    sfree(conf_vhost);
+    sfree(conf_user);
+    sfree(conf_password);
+    sfree(conf_exchange);
+    sfree(conf_routingkey);
 
     return (0);
 } /* int shutdown */