amqp plugin: Enable the "ExchangeType" option in Publish blocks, too.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 8 Aug 2010 12:45:27 +0000 (14:45 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 8 Aug 2010 12:45:27 +0000 (14:45 +0200)
src/amqp.c
src/collectd.conf.pod

index 7b9f41b..d6cd275 100644 (file)
@@ -220,6 +220,37 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
     return (buffer);
 } /* }}} char *camqp_strerror */
 
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+    amqp_exchange_declare_ok_t *ed_ret;
+
+    if (conf->exchange_type == NULL)
+        return (0);
+
+    ed_ret = amqp_exchange_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+            /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if ((ed_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    INFO ("amqp plugin: Successfully created exchange \"%s\" "
+            "with type \"%s\".",
+            conf->exchange, conf->exchange_type);
+
+    return (0);
+} /* }}} int camqp_create_exchange */
+
 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 {
     amqp_queue_declare_ok_t *qd_ret;
@@ -261,29 +292,6 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
     {
         amqp_queue_bind_ok_t *qb_ret;
 
-        /* create the exchange */
-        if (conf->exchange_type != NULL)
-        {
-            amqp_exchange_declare_ok_t *ed_ret;
-
-            ed_ret = amqp_exchange_declare (conf->connection,
-                    /* channel     = */ CAMQP_CHANNEL,
-                    /* exchange    = */ amqp_cstring_bytes (conf->exchange),
-                    /* type        = */ amqp_cstring_bytes (conf->exchange_type),
-                    /* passive     = */ 0,
-                    /* durable     = */ 0,
-                    /* auto_delete = */ 1,
-                    /* arguments   = */ AMQP_EMPTY_TABLE);
-            if ((ed_ret == NULL) && camqp_is_error (conf))
-            {
-                char errbuf[1024];
-                ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
-                        camqp_strerror (conf, errbuf, sizeof (errbuf)));
-                camqp_close_connection (conf);
-                return (-1);
-            }
-        }
-
         assert (conf->queue != NULL);
         qb_ret = amqp_queue_bind (conf->connection,
                 /* channel     = */ CAMQP_CHANNEL,
@@ -386,6 +394,10 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
 
+    status = camqp_create_exchange (conf);
+    if (status != 0)
+        return (status);
+
     if (!conf->publish)
         return (camqp_setup_queue (conf));
     return (0);
index 195ad79..36245e2 100644 (file)
@@ -170,6 +170,7 @@ possibly filtering or messages.
      User "guest"
      Password "guest"
      Exchange "amq.fanout"
+ #   ExchangeType "fanout"
  #   RoutingKey "collectd"
  #   Persistent false
  #   Format "command"
@@ -230,10 +231,11 @@ In I<Subscribe> blocks this option is optional. If given, a I<binding> between
 the given exchange and the I<queue> is created, using the I<routing key> if
 configured. See the B<Queue> and B<RoutingKey> options below.
 
-=item B<ExchangeType> I<Type> (Subscribe only)
+=item B<ExchangeType> I<Type>
 
 If given, the plugin will try to create the configured I<exchange> with this
-I<type> after connecting and bind its I<queue> to it.
+I<type> after connecting. When in a I<Subscribe> block, the I<queue> will then
+be bound to this exchange.
 
 =item B<Queue> I<Queue> (Subscribe only)