amqp plugin: Add a debug message.
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009  Sebastien Pahl
4  * Copyright (C) 2010  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44  * library.. */
45 #define CAMQP_DM_VOLATILE   1
46 #define CAMQP_DM_PERSISTENT 2
47
48 #define CAMQP_CHANNEL 1
49
50 /*
51  * Data types
52  */
53 struct camqp_config_s
54 {
55     _Bool   publish;
56     char   *name;
57
58     char   *host;
59     int     port;
60     char   *vhost;
61     char   *user;
62     char   *password;
63
64     char   *exchange;
65     char   *exchange_type;
66     char   *queue;
67     char   *routingkey;
68     uint8_t delivery_mode;
69
70     _Bool   store_rates;
71
72     amqp_connection_state_t connection;
73     pthread_mutex_t lock;
74 };
75 typedef struct camqp_config_s camqp_config_t;
76
77 /*
78  * Global variables
79  */
80 static const char *def_host       = "localhost";
81 static const char *def_vhost      = "/";
82 static const char *def_user       = "guest";
83 static const char *def_password   = "guest";
84 static const char *def_exchange   = "amq.fanout";
85 static const char *def_routingkey = "collectd";
86
87 static pthread_t *subscriber_threads     = NULL;
88 static size_t     subscriber_threads_num = 0;
89 static _Bool      subscriber_threads_running = 1;
90
91 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
92
93 /*
94  * Functions
95  */
96 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
97 {
98     int sockfd;
99
100     if ((conf == NULL) || (conf->connection == NULL))
101         return;
102
103     sockfd = amqp_get_sockfd (conf->connection);
104     amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
105     amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
106     amqp_destroy_connection (conf->connection);
107     close (sockfd);
108     conf->connection = NULL;
109 } /* }}} void camqp_close_connection */
110
111 static void camqp_config_free (void *ptr) /* {{{ */
112 {
113     camqp_config_t *conf = ptr;
114
115     if (conf == NULL)
116         return;
117
118     camqp_close_connection (conf);
119
120     sfree (conf->name);
121     sfree (conf->host);
122     sfree (conf->vhost);
123     sfree (conf->user);
124     sfree (conf->password);
125     sfree (conf->exchange);
126     sfree (conf->exchange_type);
127     sfree (conf->queue);
128     sfree (conf->routingkey);
129
130     sfree (conf);
131 } /* }}} void camqp_config_free */
132
133 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
134 {
135     char *ret;
136
137     if ((in == NULL) || (in->bytes == NULL))
138         return (NULL);
139
140     ret = malloc (in->len + 1);
141     if (ret == NULL)
142         return (NULL);
143
144     memcpy (ret, in->bytes, in->len);
145     ret[in->len] = 0;
146
147     return (ret);
148 } /* }}} char *camqp_bytes_cstring */
149
150 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
151 {
152     amqp_rpc_reply_t r;
153
154     r = amqp_get_rpc_reply (conf->connection);
155     if (r.reply_type == AMQP_RESPONSE_NORMAL)
156         return (0);
157
158     return (1);
159 } /* }}} _Bool camqp_is_error */
160
161 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
162         char *buffer, size_t buffer_size)
163 {
164     amqp_rpc_reply_t r;
165
166     r = amqp_get_rpc_reply (conf->connection);
167     switch (r.reply_type)
168     {
169         case AMQP_RESPONSE_NORMAL:
170             sstrncpy (buffer, "Success", sizeof (buffer));
171             break;
172
173         case AMQP_RESPONSE_NONE:
174             sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
175             break;
176
177         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
178             if (r.library_errno)
179                 return (sstrerror (r.library_errno, buffer, buffer_size));
180             else
181                 sstrncpy (buffer, "End of stream", sizeof (buffer));
182             break;
183
184         case AMQP_RESPONSE_SERVER_EXCEPTION:
185             if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
186             {
187                 amqp_connection_close_t *m = r.reply.decoded;
188                 char *tmp = camqp_bytes_cstring (&m->reply_text);
189                 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
190                         m->reply_code, tmp);
191                 sfree (tmp);
192             }
193             else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
194             {
195                 amqp_channel_close_t *m = r.reply.decoded;
196                 char *tmp = camqp_bytes_cstring (&m->reply_text);
197                 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
198                         m->reply_code, tmp);
199                 sfree (tmp);
200             }
201             else
202             {
203                 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
204                         r.reply.id);
205             }
206             break;
207
208         default:
209             ssnprintf (buffer, buffer_size, "Unknown reply type %i",
210                     (int) r.reply_type);
211     }
212
213     return (buffer);
214 } /* }}} char *camqp_strerror */
215
216 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
217 {
218     amqp_queue_declare_ok_t *qd_ret;
219     amqp_basic_consume_ok_t *cm_ret;
220
221     qd_ret = amqp_queue_declare (conf->connection,
222             /* channel     = */ CAMQP_CHANNEL,
223             /* queue       = */ (conf->queue != NULL)
224             ? amqp_cstring_bytes (conf->queue)
225             : AMQP_EMPTY_BYTES,
226             /* passive     = */ 0,
227             /* durable     = */ 0,
228             /* exclusive   = */ 0,
229             /* auto_delete = */ 1,
230             /* arguments   = */ AMQP_EMPTY_TABLE);
231     if (qd_ret == NULL)
232     {
233         ERROR ("amqp plugin: amqp_queue_declare failed.");
234         camqp_close_connection (conf);
235         return (-1);
236     }
237
238     if (conf->queue == NULL)
239     {
240         conf->queue = camqp_bytes_cstring (&qd_ret->queue);
241         if (conf->queue == NULL)
242         {
243             ERROR ("amqp plugin: camqp_bytes_cstring failed.");
244             camqp_close_connection (conf);
245             return (-1);
246         }
247
248         INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
249     }
250     DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
251
252     /* bind to an exchange */
253     if (conf->exchange != NULL)
254     {
255         amqp_queue_bind_ok_t *qb_ret;
256
257         /* create the exchange */
258         if (conf->exchange_type != NULL)
259         {
260             amqp_exchange_declare_ok_t *ed_ret;
261
262             ed_ret = amqp_exchange_declare (conf->connection,
263                     /* channel     = */ CAMQP_CHANNEL,
264                     /* exchange    = */ amqp_cstring_bytes (conf->exchange),
265                     /* type        = */ amqp_cstring_bytes (conf->exchange_type),
266                     /* passive     = */ 0,
267                     /* durable     = */ 0,
268                     /* auto_delete = */ 1,
269                     /* arguments   = */ AMQP_EMPTY_TABLE);
270             if ((ed_ret == NULL) && camqp_is_error (conf))
271             {
272                 char errbuf[1024];
273                 ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
274                         camqp_strerror (conf, errbuf, sizeof (errbuf)));
275                 camqp_close_connection (conf);
276                 return (-1);
277             }
278         }
279
280         DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
281                 conf->queue, conf->exchange, CONF (conf, routingkey));
282
283         assert (conf->queue != NULL);
284         qb_ret = amqp_queue_bind (conf->connection,
285                 /* channel     = */ CAMQP_CHANNEL,
286                 /* queue       = */ amqp_cstring_bytes (conf->queue),
287                 /* exchange    = */ amqp_cstring_bytes (conf->exchange),
288 #if 1
289                 /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
290 #else
291                 /* routing_key = */ AMQP_EMPTY_BYTES,
292 #endif
293                 /* arguments   = */ AMQP_EMPTY_TABLE);
294         if ((qb_ret == NULL) && camqp_is_error (conf))
295         {
296             char errbuf[1024];
297             ERROR ("amqp plugin: amqp_queue_bind failed: %s",
298                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
299             camqp_close_connection (conf);
300             return (-1);
301         }
302
303         DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
304                 conf->queue, conf->exchange);
305     } /* if (conf->exchange != NULL) */
306
307     cm_ret = amqp_basic_consume (conf->connection,
308             /* channel      = */ CAMQP_CHANNEL,
309             /* queue        = */ amqp_cstring_bytes (conf->queue),
310             /* consumer_tag = */ AMQP_EMPTY_BYTES,
311             /* no_local     = */ 0,
312             /* no_ack       = */ 1,
313             /* exclusive    = */ 0);
314     if ((cm_ret == NULL) && camqp_is_error (conf))
315     {
316         char errbuf[1024];
317         ERROR ("amqp plugin: amqp_basic_consume failed: %s",
318                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
319         camqp_close_connection (conf);
320         return (-1);
321     }
322
323     return (0);
324 } /* }}} int camqp_setup_queue */
325
326 static int camqp_connect (camqp_config_t *conf) /* {{{ */
327 {
328     amqp_rpc_reply_t reply;
329     int sockfd;
330     int status;
331
332     if (conf->connection != NULL)
333         return (0);
334
335     conf->connection = amqp_new_connection ();
336     if (conf->connection == NULL)
337     {
338         ERROR ("amqp plugin: amqp_new_connection failed.");
339         return (ENOMEM);
340     }
341
342     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
343     if (sockfd < 0)
344     {
345         char errbuf[1024];
346         status = (-1) * sockfd;
347         ERROR ("amqp plugin: amqp_open_socket failed: %s",
348                 sstrerror (status, errbuf, sizeof (errbuf)));
349         amqp_destroy_connection (conf->connection);
350         conf->connection = NULL;
351         return (status);
352     }
353     amqp_set_sockfd (conf->connection, sockfd);
354
355     reply = amqp_login (conf->connection, CONF(conf, vhost),
356             /* channel max = */      0,
357             /* frame max   = */ 131072,
358             /* heartbeat   = */      0,
359             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
360             CONF(conf, user), CONF(conf, password));
361     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
362     {
363         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
364                 CONF(conf, vhost), CONF(conf, user));
365         amqp_destroy_connection (conf->connection);
366         close (sockfd);
367         conf->connection = NULL;
368         return (1);
369     }
370
371     amqp_channel_open (conf->connection, /* channel = */ 1);
372     /* FIXME: Is checking "reply.reply_type" really correct here? How does
373      * it get set? --octo */
374     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
375     {
376         ERROR ("amqp plugin: amqp_channel_open failed.");
377         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
378         amqp_destroy_connection (conf->connection);
379         close(sockfd);
380         conf->connection = NULL;
381         return (1);
382     }
383
384     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
385             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
386
387     if (!conf->publish)
388         return (camqp_setup_queue (conf));
389     return (0);
390 } /* }}} int camqp_connect */
391
392 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
393         size_t body_size)
394 {
395     char body[body_size + 1];
396     char *body_ptr;
397     size_t received;
398     amqp_frame_t frame;
399     int status;
400
401     memset (body, 0, sizeof (body));
402     body_ptr = &body[0];
403     received = 0;
404
405     while (received < body_size)
406     {
407         status = amqp_simple_wait_frame (conf->connection, &frame);
408         if (status < 0)
409         {
410             char errbuf[1024];
411             status = (-1) * status;
412             ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
413                     sstrerror (status, errbuf, sizeof (errbuf)));
414             camqp_close_connection (conf);
415             return (status);
416         }
417
418         if (frame.frame_type != AMQP_FRAME_BODY)
419         {
420             NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
421                     frame.frame_type);
422             return (-1);
423         }
424
425         if ((body_size - received) < frame.payload.body_fragment.len)
426         {
427             WARNING ("amqp plugin: Body is larger than indicated by header.");
428             return (-1);
429         }
430
431         memcpy (body_ptr, frame.payload.body_fragment.bytes,
432                 frame.payload.body_fragment.len);
433         body_ptr += frame.payload.body_fragment.len;
434         received += frame.payload.body_fragment.len;
435     } /* while (received < body_size) */
436
437     DEBUG ("amqp plugin: camqp_read_body: body = %s", body);
438
439     return (0);
440 } /* }}} int camqp_read_body */
441
442 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
443 {
444     int status;
445     amqp_frame_t frame;
446
447     status = amqp_simple_wait_frame (conf->connection, &frame);
448     if (status < 0)
449     {
450         char errbuf[1024];
451         status = (-1) * status;
452         ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
453                     sstrerror (status, errbuf, sizeof (errbuf)));
454         camqp_close_connection (conf);
455         return (status);
456     }
457
458     if (frame.frame_type != AMQP_FRAME_HEADER)
459     {
460         NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
461                 frame.frame_type);
462         return (-1);
463     }
464
465     return (camqp_read_body (conf, frame.payload.properties.body_size));
466 } /* }}} int camqp_read_header */
467
468 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
469 {
470     camqp_config_t *conf = user_data;
471     int status;
472
473     while (subscriber_threads_running)
474     {
475         amqp_frame_t frame;
476
477         status = camqp_connect (conf);
478         if (status != 0)
479         {
480             ERROR ("amqp plugin: camqp_connect failed. "
481                     "Will sleep for %i seconds.", interval_g);
482             sleep (interval_g);
483             continue;
484         }
485
486         status = amqp_simple_wait_frame (conf->connection, &frame);
487         if (status < 0)
488         {
489             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
490                     "Will sleep for %i seconds.", interval_g);
491             camqp_close_connection (conf);
492             sleep (interval_g);
493             continue;
494         }
495
496         if (frame.frame_type != AMQP_FRAME_METHOD)
497         {
498             DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
499                     frame.frame_type);
500             continue;
501         }
502
503         if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
504         {
505             DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
506                     frame.payload.method.id);
507             continue;
508         }
509
510         status = camqp_read_header (conf);
511
512         amqp_maybe_release_buffers (conf->connection);
513     } /* while (subscriber_threads_running) */
514
515     camqp_config_free (conf);
516     pthread_exit (NULL);
517 } /* }}} void *camqp_subscribe_thread */
518
519 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
520 {
521     int status;
522     pthread_t *tmp;
523
524     tmp = realloc (subscriber_threads,
525             sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
526     if (tmp == NULL)
527     {
528         ERROR ("amqp plugin: realloc failed.");
529         camqp_config_free (conf);
530         return (ENOMEM);
531     }
532     subscriber_threads = tmp;
533     tmp = subscriber_threads + subscriber_threads_num;
534     memset (tmp, 0, sizeof (*tmp));
535
536     status = pthread_create (tmp, /* attr = */ NULL,
537             camqp_subscribe_thread, conf);
538     if (status != 0)
539     {
540         char errbuf[1024];
541         ERROR ("amqp plugin: pthread_create failed: %s",
542                 sstrerror (status, errbuf, sizeof (errbuf)));
543         camqp_config_free (conf);
544         return (status);
545     }
546
547     subscriber_threads_num++;
548
549     return (0);
550 } /* }}} int camqp_subscribe_init */
551
552 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
553         const char *buffer)
554 {
555     amqp_basic_properties_t props;
556     int status;
557
558     status = camqp_connect (conf);
559     if (status != 0)
560         return (status);
561
562     memset (&props, 0, sizeof (props));
563     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
564         | AMQP_BASIC_DELIVERY_MODE_FLAG
565         | AMQP_BASIC_APP_ID_FLAG;
566     props.content_type = amqp_cstring_bytes("application/json");
567     props.delivery_mode = conf->delivery_mode;
568     props.app_id = amqp_cstring_bytes("collectd");
569
570     status = amqp_basic_publish(conf->connection,
571                 /* channel = */ 1,
572                 amqp_cstring_bytes(CONF(conf, exchange)),
573                 amqp_cstring_bytes(CONF(conf, routingkey)),
574                 /* mandatory = */ 0,
575                 /* immediate = */ 0,
576                 &props,
577                 amqp_cstring_bytes(buffer));
578     if (status != 0)
579     {
580         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
581                 status);
582         camqp_close_connection (conf);
583     }
584
585     return (status);
586 } /* }}} int camqp_write_locked */
587
588 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
589         user_data_t *user_data)
590 {
591     camqp_config_t *conf = user_data->data;
592     char buffer[4096];
593     size_t bfree;
594     size_t bfill;
595     int status;
596
597     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
598         return (EINVAL);
599
600     memset (buffer, 0, sizeof (buffer));
601     bfree = sizeof (buffer);
602     bfill = 0;
603
604     format_json_initialize (buffer, &bfill, &bfree);
605     format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
606     format_json_finalize (buffer, &bfill, &bfree);
607
608     pthread_mutex_lock (&conf->lock);
609     status = camqp_write_locked (conf, buffer);
610     pthread_mutex_unlock (&conf->lock);
611
612     return (status);
613 } /* }}} int camqp_write */
614
615 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
616         _Bool publish)
617 {
618     camqp_config_t *conf;
619     int status;
620     int i;
621
622     conf = malloc (sizeof (*conf));
623     if (conf == NULL)
624     {
625         ERROR ("amqp plugin: malloc failed.");
626         return (ENOMEM);
627     }
628
629     /* Initialize "conf" {{{ */
630     memset (conf, 0, sizeof (*conf));
631     conf->publish = publish;
632     conf->name = NULL;
633     conf->host = NULL;
634     conf->port = 5672;
635     conf->vhost = NULL;
636     conf->user = NULL;
637     conf->password = NULL;
638     conf->exchange = NULL;
639     conf->exchange_type = NULL;
640     conf->queue = NULL;
641     conf->routingkey = NULL;
642     conf->delivery_mode = CAMQP_DM_VOLATILE;
643     conf->store_rates = 0;
644     conf->connection = NULL;
645     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
646     /* }}} */
647
648     status = cf_util_get_string (ci, &conf->name);
649     if (status != 0)
650     {
651         sfree (conf);
652         return (status);
653     }
654
655     for (i = 0; i < ci->children_num; i++)
656     {
657         oconfig_item_t *child = ci->children + i;
658
659         if (strcasecmp ("Host", child->key) == 0)
660             status = cf_util_get_string (child, &conf->host);
661         else if (strcasecmp ("Port", child->key) == 0)
662         {
663             status = cf_util_get_port_number (child);
664             if (status > 0)
665             {
666                 conf->port = status;
667                 status = 0;
668             }
669         }
670         else if (strcasecmp ("VHost", child->key) == 0)
671             status = cf_util_get_string (child, &conf->vhost);
672         else if (strcasecmp ("User", child->key) == 0)
673             status = cf_util_get_string (child, &conf->user);
674         else if (strcasecmp ("Password", child->key) == 0)
675             status = cf_util_get_string (child, &conf->password);
676         else if (strcasecmp ("Exchange", child->key) == 0)
677             status = cf_util_get_string (child, &conf->exchange);
678         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
679             status = cf_util_get_string (child, &conf->exchange_type);
680         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
681             status = cf_util_get_string (child, &conf->queue);
682         else if (strcasecmp ("RoutingKey", child->key) == 0)
683             status = cf_util_get_string (child, &conf->routingkey);
684         else if (strcasecmp ("Persistent", child->key) == 0)
685         {
686             _Bool tmp = 0;
687             status = cf_util_get_boolean (child, &tmp);
688             if (tmp)
689                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
690             else
691                 conf->delivery_mode = CAMQP_DM_VOLATILE;
692         }
693         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
694             status = cf_util_get_boolean (child, &conf->store_rates);
695         else
696             WARNING ("amqp plugin: Ignoring unknown "
697                     "configuration option \"%s\".", child->key);
698
699         if (status != 0)
700             break;
701     } /* for (i = 0; i < ci->children_num; i++) */
702
703     if ((status == 0) && !publish && (conf->exchange == NULL))
704     {
705         if (conf->routingkey != NULL)
706             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
707                     "without the \"Exchange\" option. It will be ignored.");
708
709         if (conf->exchange_type != NULL)
710             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
711                     "without the \"Exchange\" option. It will be ignored.");
712     }
713
714     if (status != 0)
715     {
716         camqp_config_free (conf);
717         return (status);
718     }
719
720     if (conf->exchange != NULL)
721     {
722         DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
723                 conf->exchange);
724     }
725
726     if (publish)
727     {
728         char cbname[128];
729         user_data_t ud = { conf, camqp_config_free };
730
731         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
732
733         status = plugin_register_write (cbname, camqp_write, &ud);
734         if (status != 0)
735         {
736             camqp_config_free (conf);
737             return (status);
738         }
739     }
740     else
741     {
742         status = camqp_subscribe_init (conf);
743         if (status != 0)
744         {
745             camqp_config_free (conf);
746             return (status);
747         }
748     }
749
750     return (0);
751 } /* }}} int camqp_config_connection */
752
753 static int camqp_config (oconfig_item_t *ci) /* {{{ */
754 {
755     int i;
756
757     for (i = 0; i < ci->children_num; i++)
758     {
759         oconfig_item_t *child = ci->children + i;
760
761         if (strcasecmp ("Publish", child->key) == 0)
762             camqp_config_connection (child, /* publish = */ 1);
763         else if (strcasecmp ("Subscribe", child->key) == 0)
764             camqp_config_connection (child, /* publish = */ 0);
765         else
766             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
767                     child->key);
768     } /* for (ci->children_num) */
769
770     return (0);
771 } /* }}} int camqp_config */
772
773 static int shutdown (void) /* {{{ */
774 {
775     size_t i;
776
777     DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
778             subscriber_threads_num);
779
780     subscriber_threads_running = 0;
781     for (i = 0; i < subscriber_threads_num; i++)
782     {
783         /* FIXME: Sending a signal is not very elegant here. Maybe find out how
784          * to use a timeout in the thread and check for the variable in regular
785          * intervals. */
786         pthread_kill (subscriber_threads[i], SIGTERM);
787         pthread_join (subscriber_threads[i], /* retval = */ NULL);
788     }
789
790     subscriber_threads_num = 0;
791     sfree (subscriber_threads);
792
793     DEBUG ("amqp plugin: All subscriber threads exited.");
794
795     return (0);
796 } /* }}} int shutdown */
797
798 void module_register (void)
799 {
800     plugin_register_complex_config ("amqp", camqp_config);
801     plugin_register_shutdown ("amqp", shutdown);
802 } /* void module_register */
803
804 /* vim: set sw=4 sts=4 et fdm=marker : */