amqp plugin: First step towards subscribing to data via AMQP.
[collectd.git] / src / amqp.c
index eccdaff..1924ce7 100644 (file)
@@ -84,6 +84,10 @@ static const char *def_password   = "guest";
 static const char *def_exchange   = "amq.fanout";
 static const char *def_routingkey = "collectd";
 
+static pthread_t *subscriber_threads     = NULL;
+static size_t     subscriber_threads_num = 0;
+static _Bool      subscriber_threads_running = 1;
+
 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
 
 /*
@@ -126,6 +130,196 @@ static void camqp_config_free (void *ptr) /* {{{ */
     sfree (conf);
 } /* }}} void camqp_config_free */
 
+static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
+{
+    char *ret;
+
+    if ((in == NULL) || (in->bytes == NULL))
+        return (NULL);
+
+    ret = malloc (in->len + 1);
+    if (ret == NULL)
+        return (NULL);
+
+    memcpy (ret, in->bytes, in->len);
+    ret[in->len] = 0;
+
+    return (ret);
+} /* }}} char *camqp_bytes_cstring */
+
+static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
+{
+    amqp_rpc_reply_t r;
+
+    r = amqp_get_rpc_reply (conf->connection);
+    if (r.reply_type == AMQP_RESPONSE_NORMAL)
+        return (0);
+
+    return (1);
+} /* }}} _Bool camqp_is_error */
+
+static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
+        char *buffer, size_t buffer_size)
+{
+    amqp_rpc_reply_t r;
+
+    r = amqp_get_rpc_reply (conf->connection);
+    switch (r.reply_type)
+    {
+        case AMQP_RESPONSE_NORMAL:
+            sstrncpy (buffer, "Success", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_NONE:
+            sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+            if (r.library_errno)
+                return (sstrerror (r.library_errno, buffer, buffer_size));
+            else
+                sstrncpy (buffer, "End of stream", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_SERVER_EXCEPTION:
+            if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
+            {
+                amqp_connection_close_t *m = r.reply.decoded;
+                char *tmp = camqp_bytes_cstring (&m->reply_text);
+                ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
+                        m->reply_code, tmp);
+                sfree (tmp);
+            }
+            else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
+            {
+                amqp_channel_close_t *m = r.reply.decoded;
+                char *tmp = camqp_bytes_cstring (&m->reply_text);
+                ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
+                        m->reply_code, tmp);
+                sfree (tmp);
+            }
+            else
+            {
+                ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
+                        r.reply.id);
+            }
+            break;
+
+        default:
+            ssnprintf (buffer, buffer_size, "Unknown reply type %i",
+                    (int) r.reply_type);
+    }
+
+    return (buffer);
+} /* }}} char *camqp_strerror */
+
+static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
+{
+    amqp_queue_declare_ok_t *qd_ret;
+    amqp_basic_consume_ok_t *cm_ret;
+
+    qd_ret = amqp_queue_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* queue       = */ (conf->queue != NULL)
+            ? amqp_cstring_bytes (conf->queue)
+            : AMQP_EMPTY_BYTES,
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* exclusive   = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if (qd_ret == NULL)
+    {
+        ERROR ("amqp plugin: amqp_queue_declare failed.");
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    if (conf->queue == NULL)
+    {
+        conf->queue = camqp_bytes_cstring (&qd_ret->queue);
+        if (conf->queue == NULL)
+        {
+            ERROR ("amqp plugin: camqp_bytes_cstring failed.");
+            camqp_close_connection (conf);
+            return (-1);
+        }
+
+        INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
+    }
+    DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
+
+    /* bind to an exchange */
+    if (conf->exchange != NULL)
+    {
+        amqp_queue_bind_ok_t *qb_ret;
+
+        /* create the exchange */
+        if (conf->exchange_type != NULL)
+        {
+            amqp_exchange_declare_ok_t *ed_ret;
+
+            ed_ret = amqp_exchange_declare (conf->connection,
+                    /* channel     = */ CAMQP_CHANNEL,
+                    /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+                    /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+                    /* passive     = */ 0,
+                    /* durable     = */ 0,
+                    /* auto_delete = */ 1,
+                    /* arguments   = */ AMQP_EMPTY_TABLE);
+            if ((ed_ret == NULL) && camqp_is_error (conf))
+            {
+                char errbuf[1024];
+                ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                        camqp_strerror (conf, errbuf, sizeof (errbuf)));
+                camqp_close_connection (conf);
+                return (-1);
+            }
+        }
+
+        DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
+                conf->queue, conf->exchange, CONF (conf, routingkey));
+
+        assert (conf->queue != NULL);
+        qb_ret = amqp_queue_bind (conf->connection,
+                /* channel     = */ CAMQP_CHANNEL,
+                /* queue       = */ amqp_cstring_bytes (conf->queue),
+                /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+#if 1
+                /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
+#else
+                /* routing_key = */ AMQP_EMPTY_BYTES,
+#endif
+                /* arguments   = */ AMQP_EMPTY_TABLE);
+        if ((qb_ret == NULL) && camqp_is_error (conf))
+        {
+            char errbuf[1024];
+            ERROR ("amqp plugin: amqp_queue_bind failed: %s",
+                    camqp_strerror (conf, errbuf, sizeof (errbuf)));
+            camqp_close_connection (conf);
+            return (-1);
+        }
+    } /* if (conf->exchange != NULL) */
+
+    cm_ret = amqp_basic_consume (conf->connection,
+            /* channel      = */ CAMQP_CHANNEL,
+            /* queue        = */ amqp_cstring_bytes (conf->queue),
+            /* consumer_tag = */ AMQP_EMPTY_BYTES,
+            /* no_local     = */ 0,
+            /* no_ack       = */ 1,
+            /* exclusive    = */ 0);
+    if ((cm_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_basic_consume failed: %s",
+                    camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    return (0);
+} /* }}} int camqp_setup_queue */
+
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
@@ -186,9 +380,172 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
 
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
+
+    if (!conf->publish)
+        return (camqp_setup_queue (conf));
     return (0);
 } /* }}} int camqp_connect */
 
+static int camqp_read_body (camqp_config_t *conf, /* {{{ */
+        size_t body_size)
+{
+    char body[body_size + 1];
+    char *body_ptr;
+    size_t received;
+    amqp_frame_t frame;
+    int status;
+
+    memset (body, 0, sizeof (body));
+    body_ptr = &body[0];
+    received = 0;
+
+    while (received < body_size)
+    {
+        status = amqp_simple_wait_frame (conf->connection, &frame);
+        if (status < 0)
+        {
+            char errbuf[1024];
+            status = (-1) * status;
+            ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+                    sstrerror (status, errbuf, sizeof (errbuf)));
+            camqp_close_connection (conf);
+            return (status);
+        }
+
+        if (frame.frame_type != AMQP_FRAME_BODY)
+        {
+            NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                    frame.frame_type);
+            return (-1);
+        }
+
+        if ((body_size - received) < frame.payload.body_fragment.len)
+        {
+            WARNING ("amqp plugin: Body is larger than indicated by header.");
+            return (-1);
+        }
+
+        memcpy (body_ptr, frame.payload.body_fragment.bytes,
+                frame.payload.body_fragment.len);
+        body_ptr += frame.payload.body_fragment.len;
+        received += frame.payload.body_fragment.len;
+    } /* while (received < body_size) */
+
+    DEBUG ("amqp plugin: camqp_read_body: body = %s", body);
+
+    return (0);
+} /* }}} int camqp_read_body */
+
+static int camqp_read_header (camqp_config_t *conf) /* {{{ */
+{
+    int status;
+    amqp_frame_t frame;
+
+    status = amqp_simple_wait_frame (conf->connection, &frame);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status = (-1) * status;
+        ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+                    sstrerror (status, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (status);
+    }
+
+    if (frame.frame_type != AMQP_FRAME_HEADER)
+    {
+        NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                frame.frame_type);
+        return (-1);
+    }
+
+    return (camqp_read_body (conf, frame.payload.properties.body_size));
+} /* }}} int camqp_read_header */
+
+static void *camqp_subscribe_thread (void *user_data) /* {{{ */
+{
+    camqp_config_t *conf = user_data;
+    int status;
+
+    while (subscriber_threads_running)
+    {
+        amqp_frame_t frame;
+
+        status = camqp_connect (conf);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: camqp_connect failed. "
+                    "Will sleep for %i seconds.", interval_g);
+            sleep (interval_g);
+            continue;
+        }
+
+        status = amqp_simple_wait_frame (conf->connection, &frame);
+        if (status < 0)
+        {
+            ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
+                    "Will sleep for %i seconds.", interval_g);
+            camqp_close_connection (conf);
+            sleep (interval_g);
+            continue;
+        }
+
+        if (frame.frame_type != AMQP_FRAME_METHOD)
+        {
+            DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                    frame.frame_type);
+            continue;
+        }
+
+        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+        {
+            DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
+                    frame.payload.method.id);
+            continue;
+        }
+
+        status = camqp_read_header (conf);
+
+        amqp_maybe_release_buffers (conf->connection);
+    } /* while (subscriber_threads_running) */
+
+    camqp_config_free (conf);
+    pthread_exit (NULL);
+} /* }}} void *camqp_subscribe_thread */
+
+static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
+{
+    int status;
+    pthread_t *tmp;
+
+    tmp = realloc (subscriber_threads,
+            sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
+    if (tmp == NULL)
+    {
+        ERROR ("amqp plugin: realloc failed.");
+        camqp_config_free (conf);
+        return (ENOMEM);
+    }
+    subscriber_threads = tmp;
+    tmp = subscriber_threads + subscriber_threads_num;
+    memset (tmp, 0, sizeof (*tmp));
+
+    status = pthread_create (tmp, /* attr = */ NULL,
+            camqp_subscribe_thread, conf);
+    if (status != 0)
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: pthread_create failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        camqp_config_free (conf);
+        return (status);
+    }
+
+    subscriber_threads_num++;
+
+    return (0);
+} /* }}} int camqp_subscribe_init */
+
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         const char *buffer)
 {
@@ -297,7 +654,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         oconfig_item_t *child = ci->children + i;
 
         if (strcasecmp ("Host", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->host);
+            status = cf_util_get_string (child, &conf->host);
         else if (strcasecmp ("Port", child->key) == 0)
         {
             status = cf_util_get_port_number (child);
@@ -308,30 +665,30 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             }
         }
         else if (strcasecmp ("VHost", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->vhost);
+            status = cf_util_get_string (child, &conf->vhost);
         else if (strcasecmp ("User", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->user);
+            status = cf_util_get_string (child, &conf->user);
         else if (strcasecmp ("Password", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->password);
+            status = cf_util_get_string (child, &conf->password);
         else if (strcasecmp ("Exchange", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->exchange);
+            status = cf_util_get_string (child, &conf->exchange);
         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
-            status = cf_util_get_string (ci, &conf->exchange_type);
+            status = cf_util_get_string (child, &conf->exchange_type);
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
-            status = cf_util_get_string (ci, &conf->queue);
+            status = cf_util_get_string (child, &conf->queue);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
-            status = cf_util_get_string (ci, &conf->routingkey);
+            status = cf_util_get_string (child, &conf->routingkey);
         else if (strcasecmp ("Persistent", child->key) == 0)
         {
             _Bool tmp = 0;
-            status = cf_util_get_boolean (ci, &tmp);
+            status = cf_util_get_boolean (child, &tmp);
             if (tmp)
                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
             else
                 conf->delivery_mode = CAMQP_DM_VOLATILE;
         }
         else if (strcasecmp ("StoreRates", child->key) == 0)
-            status = cf_util_get_boolean (ci, &conf->store_rates);
+            status = cf_util_get_boolean (child, &conf->store_rates);
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);
@@ -357,6 +714,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         return (status);
     }
 
+    if (conf->exchange != NULL)
+    {
+        DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
+                conf->exchange);
+    }
+
     if (publish)
     {
         char cbname[128];
@@ -371,6 +734,15 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             return (status);
         }
     }
+    else
+    {
+        status = camqp_subscribe_init (conf);
+        if (status != 0)
+        {
+            camqp_config_free (conf);
+            return (status);
+        }
+    }
 
     return (0);
 } /* }}} int camqp_config_connection */
@@ -385,6 +757,8 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */
 
         if (strcasecmp ("Publish", child->key) == 0)
             camqp_config_connection (child, /* publish = */ 1);
+        else if (strcasecmp ("Subscribe", child->key) == 0)
+            camqp_config_connection (child, /* publish = */ 0);
         else
             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
                     child->key);
@@ -395,7 +769,26 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */
 
 static int shutdown (void) /* {{{ */
 {
-    /* FIXME: Set a global shutdown variable here. */
+    size_t i;
+
+    DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+            subscriber_threads_num);
+
+    subscriber_threads_running = 0;
+    for (i = 0; i < subscriber_threads_num; i++)
+    {
+        /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+         * to use a timeout in the thread and check for the variable in regular
+         * intervals. */
+        pthread_kill (subscriber_threads[i], SIGTERM);
+        pthread_join (subscriber_threads[i], /* retval = */ NULL);
+    }
+
+    subscriber_threads_num = 0;
+    sfree (subscriber_threads);
+
+    DEBUG ("amqp plugin: All subscriber threads exited.");
+
     return (0);
 } /* }}} int shutdown */