amqp plugin: Implement publishing to multiple brokers.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 5 Aug 2010 09:18:53 +0000 (11:18 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 5 Aug 2010 09:18:53 +0000 (11:18 +0200)
src/amqp.c
src/collectd.conf.in

index 748ff9f..a72cec3 100644 (file)
 
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
-#define AMQP_DM_VOLATILE   1
-#define AMQP_DM_PERSISTENT 2
+#define CAMQP_DM_VOLATILE   1
+#define CAMQP_DM_PERSISTENT 2
+
+#define CAMQP_CHANNEL 1
+
+/*
+ * Data types
+ */
+struct camqp_config_s
+{
+    _Bool   publish;
+    char   *name;
+
+    char   *host;
+    int     port;
+    char   *vhost;
+    char   *user;
+    char   *password;
+
+    char   *exchange;
+    char   *exchange_type;
+    char   *queue;
+    char   *routingkey;
+    uint8_t delivery_mode;
+
+    _Bool   store_rates;
+
+    amqp_connection_state_t connection;
+    pthread_mutex_t lock;
+};
+typedef struct camqp_config_s camqp_config_t;
 
 /*
  * Global variables
@@ -55,164 +84,118 @@ static const char *def_password   = "guest";
 static const char *def_exchange   = "amq.fanout";
 static const char *def_routingkey = "collectd";
 
-static char *conf_host       = NULL;
-static char *conf_vhost      = NULL;
-static char *conf_user       = NULL;
-static char *conf_password   = NULL;
-static char *conf_exchange   = NULL;
-static char *conf_routingkey = NULL;
-static int   conf_port       = 5672;
-static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
-static _Bool conf_store_rates = 0;
-
-#define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
-
-static amqp_connection_state_t amqp_conn = NULL;
-static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
-
-static const char *config_keys[] =
-{
-    "Host",
-    "Port",
-    "VHost",
-    "User",
-    "Password",
-    "Exchange",
-    "RoutingKey",
-    "Persistent",
-    "StoreRates"
-};
-static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
 
 /*
  * Functions
  */
-static int config_set(char **var, const char *value)
+static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
 {
-    sfree(*var);
-    if ((*var = strdup(value)) == NULL)
-        return (1);
-    return (0);
-} /* int config_set */
+    int sockfd;
+
+    if ((conf == NULL) || (conf->connection == NULL))
+        return;
 
-static int config(const char *key, const char *value)
+    sockfd = amqp_get_sockfd (conf->connection);
+    amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
+    amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+    amqp_destroy_connection (conf->connection);
+    close (sockfd);
+    conf->connection = NULL;
+} /* }}} void camqp_close_connection */
+
+static void camqp_config_free (void *ptr) /* {{{ */
 {
-    if (strcasecmp(key, "host") == 0)
-        return (config_set(&conf_host, value));
-    else if(strcasecmp(key, "port") == 0)
-    {
-        int tmp;
+    camqp_config_t *conf = ptr;
 
-        tmp = service_name_to_port_number (value);
-        if (tmp <= 0)
-        {
-            ERROR ("AMQP plugin: Cannot parse `%s' as a "
-                    "service name (port number).", value);
-            return (1);
-        }
+    if (conf == NULL)
+        return;
 
-        conf_port = tmp;
-        return (0);
-    }
-    else if (strcasecmp(key, "vhost") == 0)
-        return (config_set(&conf_vhost, value));
-    else if (strcasecmp(key, "user") == 0)
-        return (config_set(&conf_user, value));
-    else if (strcasecmp(key, "password") == 0)
-        return (config_set(&conf_password, value));
-    else if (strcasecmp(key, "exchange") == 0)
-        return (config_set(&conf_exchange, value));
-    else if (strcasecmp(key, "routingkey") == 0)
-        return (config_set(&conf_routingkey, value));
-    else if (strcasecmp ("Persistent", key) == 0)
-    {
-        if (IS_TRUE (value))
-            conf_delivery_mode = AMQP_DM_PERSISTENT;
-        else
-            conf_delivery_mode = AMQP_DM_VOLATILE;
-        return (0);
-    }
-    else if (strcasecmp ("StoreRates", key) == 0)
-    {
-        if (IS_TRUE (value))
-            conf_store_rates = 1;
-        else
-            conf_store_rates = 0;
-        return (0);
-    }
-    return (-1);
-} /* int config */
+    camqp_close_connection (conf);
+
+    sfree (conf->name);
+    sfree (conf->host);
+    sfree (conf->vhost);
+    sfree (conf->user);
+    sfree (conf->password);
+    sfree (conf->exchange);
+    sfree (conf->exchange_type);
+    sfree (conf->queue);
+    sfree (conf->routingkey);
 
-static int amqp_connect (void)
+    sfree (conf);
+} /* }}} void camqp_config_free */
+
+static int amqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
     int sockfd;
     int status;
 
-    if (amqp_conn != NULL)
+    if (conf->connection != NULL)
         return (0);
 
-    amqp_conn = amqp_new_connection ();
-    if (amqp_conn == NULL)
+    conf->connection = amqp_new_connection ();
+    if (conf->connection == NULL)
     {
         ERROR ("amqp plugin: amqp_new_connection failed.");
         return (ENOMEM);
     }
 
-    sockfd = amqp_open_socket (CONF(host), conf_port);
+    sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
     {
         char errbuf[1024];
         status = (-1) * sockfd;
         ERROR ("amqp plugin: amqp_open_socket failed: %s",
                 sstrerror (status, errbuf, sizeof (errbuf)));
-        amqp_destroy_connection(amqp_conn);
-        amqp_conn = NULL;
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
         return (status);
     }
+    amqp_set_sockfd (conf->connection, sockfd);
 
-    amqp_set_sockfd (amqp_conn, sockfd);
-
-    reply = amqp_login (amqp_conn, CONF(vhost),
+    reply = amqp_login (conf->connection, CONF(conf, vhost),
             /* channel max = */      0,
             /* frame max   = */ 131072,
             /* heartbeat   = */      0,
             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
-            CONF(user), CONF(password));
+            CONF(conf, user), CONF(conf, password));
     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
     {
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
-                CONF(vhost), CONF(user));
-        amqp_destroy_connection (amqp_conn);
+                CONF(conf, vhost), CONF(conf, user));
+        amqp_destroy_connection (conf->connection);
         close (sockfd);
-        amqp_conn = NULL;
+        conf->connection = NULL;
         return (1);
     }
 
-    amqp_channel_open (amqp_conn, /* channel = */ 1);
+    amqp_channel_open (conf->connection, /* channel = */ 1);
     /* FIXME: Is checking "reply.reply_type" really correct here? How does
      * it get set? --octo */
     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
     {
         ERROR ("amqp plugin: amqp_channel_open failed.");
-        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
-        amqp_destroy_connection(amqp_conn);
+        amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection (conf->connection);
         close(sockfd);
-        amqp_conn = NULL;
+        conf->connection = NULL;
         return (1);
     }
 
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
-            "on %s:%i.", CONF(vhost), CONF(host), conf_port);
+            "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
     return (0);
-} /* int amqp_connect */
+} /* }}} int amqp_connect */
 
-static int amqp_write_locked (const char *buffer)
+static int amqp_write_locked (camqp_config_t *conf, /* {{{ */
+        const char *buffer)
 {
     amqp_basic_properties_t props;
     int status;
 
-    status = amqp_connect ();
+    status = amqp_connect (conf);
     if (status != 0)
         return (status);
 
@@ -221,44 +204,37 @@ static int amqp_write_locked (const char *buffer)
         | AMQP_BASIC_DELIVERY_MODE_FLAG
         | AMQP_BASIC_APP_ID_FLAG;
     props.content_type = amqp_cstring_bytes("application/json");
-    props.delivery_mode = conf_delivery_mode;
+    props.delivery_mode = conf->delivery_mode;
     props.app_id = amqp_cstring_bytes("collectd");
 
-    status = amqp_basic_publish(amqp_conn,
+    status = amqp_basic_publish(conf->connection,
                 /* channel = */ 1,
-                amqp_cstring_bytes(CONF(exchange)),
-                amqp_cstring_bytes(CONF(routingkey)),
+                amqp_cstring_bytes(CONF(conf, exchange)),
+                amqp_cstring_bytes(CONF(conf, routingkey)),
                 /* mandatory = */ 0,
                 /* immediate = */ 0,
                 &props,
                 amqp_cstring_bytes(buffer));
     if (status != 0)
     {
-        int sockfd;
-
         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
                 status);
-
-        sockfd = amqp_get_sockfd (amqp_conn);
-        amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
-        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
-        amqp_destroy_connection (amqp_conn);
-        close (sockfd);
-        amqp_conn = NULL;
+        camqp_close_connection (conf);
     }
 
     return (status);
-} /* int amqp_write_locked */
+} /* }}} int amqp_write_locked */
 
-static int amqp_write (const data_set_t *ds, const value_list_t *vl,
-        __attribute__((unused)) user_data_t *user_data)
+static int amqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+        user_data_t *user_data)
 {
+    camqp_config_t *conf = user_data->data;
     char buffer[4096];
     size_t bfree;
     size_t bfill;
     int status;
 
-    if ((ds == NULL) || (vl == NULL))
+    if ((ds == NULL) || (vl == NULL) || (conf == NULL))
         return (EINVAL);
 
     memset (buffer, 0, sizeof (buffer));
@@ -266,47 +242,167 @@ static int amqp_write (const data_set_t *ds, const value_list_t *vl,
     bfill = 0;
 
     format_json_initialize (buffer, &bfill, &bfree);
-    format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
+    format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
     format_json_finalize (buffer, &bfill, &bfree);
 
-    pthread_mutex_lock (&amqp_conn_lock);
-    status = amqp_write_locked (buffer);
-    pthread_mutex_unlock (&amqp_conn_lock);
+    pthread_mutex_lock (&conf->lock);
+    status = amqp_write_locked (conf, buffer);
+    pthread_mutex_unlock (&conf->lock);
 
     return (status);
-} /* int amqp_write */
+} /* }}} int amqp_write */
 
-static int shutdown (void)
+static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
+        _Bool publish)
 {
-    pthread_mutex_lock (&amqp_conn_lock);
-    if (amqp_conn != NULL)
+    camqp_config_t *conf;
+    int status;
+    int i;
+
+    conf = malloc (sizeof (*conf));
+    if (conf == NULL)
     {
-        int sockfd;
+        ERROR ("amqp plugin: malloc failed.");
+        return (ENOMEM);
+    }
 
-        sockfd = amqp_get_sockfd (amqp_conn);
-        amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
-        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
-        amqp_destroy_connection (amqp_conn);
-        close(sockfd);
-        amqp_conn = NULL;
+    /* Initialize "conf" {{{ */
+    memset (conf, 0, sizeof (*conf));
+    conf->publish = publish;
+    conf->name = NULL;
+    conf->host = NULL;
+    conf->port = 5672;
+    conf->vhost = NULL;
+    conf->user = NULL;
+    conf->password = NULL;
+    conf->exchange = NULL;
+    conf->exchange_type = NULL;
+    conf->queue = NULL;
+    conf->routingkey = NULL;
+    conf->delivery_mode = CAMQP_DM_VOLATILE;
+    conf->store_rates = 0;
+    conf->connection = NULL;
+    pthread_mutex_init (&conf->lock, /* attr = */ NULL);
+    /* }}} */
+
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        sfree (conf);
+        return (status);
     }
-    pthread_mutex_unlock (&amqp_conn_lock);
 
-    sfree(conf_host);
-    sfree(conf_vhost);
-    sfree(conf_user);
-    sfree(conf_password);
-    sfree(conf_exchange);
-    sfree(conf_routingkey);
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
 
+        if (strcasecmp ("Host", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
+        {
+            status = cf_util_get_port_number (child);
+            if (status > 0)
+            {
+                conf->port = status;
+                status = 0;
+            }
+        }
+        else if (strcasecmp ("VHost", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->vhost);
+        else if (strcasecmp ("User", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->user);
+        else if (strcasecmp ("Password", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->password);
+        else if (strcasecmp ("Exchange", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->exchange);
+        else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
+            status = cf_util_get_string (ci, &conf->exchange_type);
+        else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
+            status = cf_util_get_string (ci, &conf->queue);
+        else if (strcasecmp ("RoutingKey", child->key) == 0)
+            status = cf_util_get_string (ci, &conf->routingkey);
+        else if (strcasecmp ("Persistent", child->key) == 0)
+        {
+            _Bool tmp = 0;
+            status = cf_util_get_boolean (ci, &tmp);
+            if (tmp)
+                conf->delivery_mode = CAMQP_DM_PERSISTENT;
+            else
+                conf->delivery_mode = CAMQP_DM_VOLATILE;
+        }
+        else if (strcasecmp ("StoreRates", child->key) == 0)
+            status = cf_util_get_boolean (ci, &conf->store_rates);
+        else
+            WARNING ("amqp plugin: Ignoring unknown "
+                    "configuration option \"%s\".", child->key);
+
+        if (status != 0)
+            break;
+    } /* for (i = 0; i < ci->children_num; i++) */
+
+    if ((status == 0) && !publish && (conf->exchange == NULL))
+    {
+        if (conf->routingkey != NULL)
+            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+
+        if (conf->exchange_type != NULL)
+            WARNING ("amqp plugin: The option \"ExchangeType\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+    }
+
+    if (status != 0)
+    {
+        camqp_config_free (conf);
+        return (status);
+    }
+
+    if (publish)
+    {
+        char cbname[128];
+        user_data_t ud = { conf, camqp_config_free };
+
+        ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
+
+        status = plugin_register_write (cbname, amqp_write, &ud);
+        if (status != 0)
+        {
+            camqp_config_free (conf);
+            return (status);
+        }
+    }
+
+    return (0);
+} /* }}} int camqp_config_connection */
+
+static int camqp_config (oconfig_item_t *ci) /* {{{ */
+{
+    int i;
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp ("Publish", child->key) == 0)
+            camqp_config_connection (child, /* publish = */ 1);
+        else
+            WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
+                    child->key);
+    } /* for (ci->children_num) */
+
+    return (0);
+} /* }}} int camqp_config */
+
+static int shutdown (void) /* {{{ */
+{
+    /* FIXME: Set a global shutdown variable here. */
     return (0);
-} /* int shutdown */
+} /* }}} int shutdown */
 
 void module_register (void)
 {
-    plugin_register_config ("amqp", config, config_keys, config_keys_num);
-    plugin_register_write ("amqp", amqp_write, NULL);
+    plugin_register_complex_config ("amqp", camqp_config);
     plugin_register_shutdown ("amqp", shutdown);
 } /* void module_register */
 
-/* vim: set sw=4 sts=4 et : */
+/* vim: set sw=4 sts=4 et fdm=marker : */
index e07920b..31a113a 100644 (file)
 ##############################################################################
 
 #<Plugin "amqp">
-#      Host "localhost"
-#      Port "5672"
-#      VHost "/"
-#      User "collectd"
-#      Password "aiwaeZ0y"
-#      Exchange "amq.direct"
-#      RoutingKey "routing_key"
-#      Persistent false
-#      StoreRates false
+#  <Publish "name">
+#    Host "localhost"
+#    Port "5672"
+#    VHost "/"
+#    User "guest"
+#    Password "guest"
+#    Exchange "amq.fanout"
+#    RoutingKey "collectd"
+#    Persistent false
+#    StoreRates false
+#  </Publish>
 #</Plugin>
 
 #<Plugin apache>