write_riemann: add a BatchFlushTimeout option
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "write_riemann_threshold.h"
42
43 #define RIEMANN_HOST            "localhost"
44 #define RIEMANN_PORT            5555
45 #define RIEMANN_TTL_FACTOR      2.0
46 #define RIEMANN_BATCH_MAX      8192
47
48 struct riemann_host {
49     c_complain_t init_complaint;
50         char                    *name;
51         char                    *event_service_prefix;
52         pthread_mutex_t  lock;
53     _Bool            batch_mode;
54         _Bool            notifications;
55         _Bool            check_thresholds;
56         _Bool                    store_rates;
57         _Bool                    always_append_ds;
58         char                    *node;
59         int                      port;
60         riemann_client_type_t    client_type;
61         riemann_client_t        *client;
62         double                   ttl_factor;
63     cdtime_t         batch_init;
64     int              batch_max;
65     int              batch_timeout;
66         int                          reference_count;
67   riemann_message_t     *batch_msg;
68         char                     *tls_ca_file;
69         char                     *tls_cert_file;
70         char                     *tls_key_file;
71         struct timeval timeout;
72 };
73
74 static char     **riemann_tags;
75 static size_t     riemann_tags_num;
76 static char     **riemann_attrs;
77 static size_t     riemann_attrs_num;
78
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82         char const              *node;
83         int                      port;
84
85         if (host->client)
86                 return 0;
87
88         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89         port = (host->port) ? host->port : RIEMANN_PORT;
90
91         host->client = NULL;
92
93         host->client = riemann_client_create(host->client_type, node, port,
94                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
95                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
96                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
97                                              RIEMANN_CLIENT_OPTION_NONE);
98         if (host->client == NULL) {
99         c_complain (LOG_ERR, &host->init_complaint,
100                     "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101                     node, port);
102                 return -1;
103         }
104         if (host->timeout.tv_sec != 0) {
105                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106                         riemann_client_free(host->client);
107                         host->client = NULL;
108             c_complain (LOG_ERR, &host->init_complaint,
109                         "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110                         node, port);
111                         return -1;
112                 }
113         }
114
115     c_release (LOG_INFO, &host->init_complaint,
116                "write_riemann plugin: Successfully connected to %s:%d",
117                node, port);
118
119         return 0;
120 } /* }}} int wrr_connect */
121
122 /* host->lock must be held when calling this function. */
123 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
124 {
125         if (!host->client)
126                 return (0);
127
128         riemann_client_free(host->client);
129         host->client = NULL;
130
131         return (0);
132 } /* }}} int wrr_disconnect */
133
134 /**
135  * Function to send messages to riemann.
136  *
137  * Acquires the host lock, disconnects on errors.
138  */
139 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
140 {
141         int status = 0;
142         pthread_mutex_lock (&host->lock);
143
144         status = wrr_connect(host);
145         if (status != 0) {
146         pthread_mutex_unlock(&host->lock);
147                 return status;
148     }
149
150         status = riemann_client_send_message(host->client, msg);
151         if (status != 0) {
152                 wrr_disconnect(host);
153                 pthread_mutex_unlock(&host->lock);
154                 return status;
155         }
156
157         /*
158          * For TCP we need to receive message acknowledgemenent.
159          */
160         if (host->client_type != RIEMANN_CLIENT_UDP)
161         {
162                 riemann_message_t *response;
163
164                 response = riemann_client_recv_message(host->client);
165
166                 if (response == NULL)
167                 {
168                         wrr_disconnect(host);
169                         pthread_mutex_unlock(&host->lock);
170                         return errno;
171                 }
172                 riemann_message_free(response);
173         }
174
175         pthread_mutex_unlock (&host->lock);
176         return 0;
177 } /* }}} int wrr_send */
178
179 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
180                 notification_t const *n)
181 {
182         riemann_message_t *msg;
183         riemann_event_t *event;
184         char service_buffer[6 * DATA_MAX_NAME_LEN];
185         char const *severity;
186         notification_meta_t *meta;
187         size_t i;
188
189         switch (n->severity)
190         {
191                 case NOTIF_OKAY:        severity = "ok"; break;
192                 case NOTIF_WARNING:     severity = "warning"; break;
193                 case NOTIF_FAILURE:     severity = "critical"; break;
194                 default:                severity = "unknown";
195         }
196
197         format_name(service_buffer, sizeof(service_buffer),
198                     /* host = */ "", n->plugin, n->plugin_instance,
199                     n->type, n->type_instance);
200
201         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
202                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
203                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
204                                      RIEMANN_EVENT_FIELD_STATE, severity,
205                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
206                                      RIEMANN_EVENT_FIELD_NONE);
207
208         if (n->host[0] != 0)
209                 riemann_event_string_attribute_add(event, "host", n->host);
210         if (n->plugin[0] != 0)
211                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
212         if (n->plugin_instance[0] != 0)
213                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
214
215         if (n->type[0] != 0)
216                 riemann_event_string_attribute_add(event, "type", n->type);
217         if (n->type_instance[0] != 0)
218                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
219
220         for (i = 0; i < riemann_attrs_num; i += 2)
221                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
222
223         for (i = 0; i < riemann_tags_num; i++)
224                 riemann_event_tag_add(event, riemann_tags[i]);
225
226         if (n->message[0] != 0)
227                 riemann_event_string_attribute_add(event, "description", n->message);
228
229         /* Pull in values from threshold and add extra attributes */
230         for (meta = n->meta; meta != NULL; meta = meta->next)
231         {
232                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
233                 {
234                         riemann_event_set(event,
235                                           RIEMANN_EVENT_FIELD_METRIC_D,
236                                           (double) meta->nm_value.nm_double,
237                                           RIEMANN_EVENT_FIELD_NONE);
238                         continue;
239                 }
240
241                 if (meta->type == NM_TYPE_STRING) {
242                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
243                         continue;
244                 }
245         }
246
247         msg = riemann_message_create_with_events(event, NULL);
248         if (msg == NULL)
249         {
250                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
251                 riemann_event_free (event);
252                 return (NULL);
253         }
254
255         DEBUG("write_riemann plugin: Successfully created message for notification: "
256               "host = \"%s\", service = \"%s\", state = \"%s\"",
257               event->host, event->service, event->state);
258         return (msg);
259 } /* }}} riemann_message_t *wrr_notification_to_message */
260
261 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
262                                            data_set_t const *ds,
263                                            value_list_t const *vl, size_t index,
264                                            gauge_t const *rates,
265                                            int status)
266 {
267         riemann_event_t *event;
268         char name_buffer[5 * DATA_MAX_NAME_LEN];
269         char service_buffer[6 * DATA_MAX_NAME_LEN];
270         size_t i;
271
272         event = riemann_event_new();
273         if (event == NULL)
274         {
275                 ERROR("write_riemann plugin: riemann_event_new() failed.");
276                 return (NULL);
277         }
278
279         format_name(name_buffer, sizeof(name_buffer),
280                     /* host = */ "", vl->plugin, vl->plugin_instance,
281                     vl->type, vl->type_instance);
282         if (host->always_append_ds || (ds->ds_num > 1))
283         {
284                 if (host->event_service_prefix == NULL)
285                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
286                                   &name_buffer[1], ds->ds[index].name);
287                 else
288                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
289                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
290         }
291         else
292         {
293                 if (host->event_service_prefix == NULL)
294                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
295                 else
296                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
297                                   host->event_service_prefix, &name_buffer[1]);
298         }
299
300         riemann_event_set(event,
301                           RIEMANN_EVENT_FIELD_HOST, vl->host,
302                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
303                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
304                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
305                           "plugin", vl->plugin,
306                           "type", vl->type,
307                           "ds_name", ds->ds[index].name,
308                           NULL,
309                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
310                           RIEMANN_EVENT_FIELD_NONE);
311
312         if (host->check_thresholds) {
313                 const char *state = NULL;
314
315                 switch (status) {
316                         case STATE_OKAY:
317                                 state = "ok";
318                                 break;
319                         case STATE_ERROR:
320                                 state = "critical";
321                                 break;
322                         case STATE_WARNING:
323                                 state = "warning";
324                                 break;
325                         case STATE_MISSING:
326                                 state = "unknown";
327                                 break;
328                 }
329                 if (state)
330                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
331                                           RIEMANN_EVENT_FIELD_NONE);
332         }
333
334         if (vl->plugin_instance[0] != 0)
335                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
336         if (vl->type_instance[0] != 0)
337                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
338
339         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
340         {
341                 char ds_type[DATA_MAX_NAME_LEN];
342
343                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
344                           DS_TYPE_TO_STRING(ds->ds[index].type));
345                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
346         }
347         else
348         {
349                 riemann_event_string_attribute_add(event, "ds_type",
350                                        DS_TYPE_TO_STRING(ds->ds[index].type));
351         }
352
353         {
354                 char ds_index[DATA_MAX_NAME_LEN];
355
356                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
357                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
358         }
359
360         for (i = 0; i < riemann_attrs_num; i += 2)
361                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
362
363         for (i = 0; i < riemann_tags_num; i++)
364                 riemann_event_tag_add(event, riemann_tags[i]);
365
366         if (ds->ds[index].type == DS_TYPE_GAUGE)
367         {
368                 riemann_event_set(event,
369                                   RIEMANN_EVENT_FIELD_METRIC_D,
370                                   (double) vl->values[index].gauge,
371                                   RIEMANN_EVENT_FIELD_NONE);
372         }
373         else if (rates != NULL)
374         {
375                 riemann_event_set(event,
376                                   RIEMANN_EVENT_FIELD_METRIC_D,
377                                   (double) rates[index],
378                                   RIEMANN_EVENT_FIELD_NONE);
379         }
380         else
381         {
382                 int64_t metric;
383
384                 if (ds->ds[index].type == DS_TYPE_DERIVE)
385                         metric = (int64_t) vl->values[index].derive;
386                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
387                         metric = (int64_t) vl->values[index].absolute;
388                 else
389                         metric = (int64_t) vl->values[index].counter;
390
391                 riemann_event_set(event,
392                                   RIEMANN_EVENT_FIELD_METRIC_S64,
393                                   (int64_t) metric,
394                                   RIEMANN_EVENT_FIELD_NONE);
395         }
396
397         DEBUG("write_riemann plugin: Successfully created message for metric: "
398               "host = \"%s\", service = \"%s\"",
399               event->host, event->service);
400         return (event);
401 } /* }}} riemann_event_t *wrr_value_to_event */
402
403 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
404                                                     data_set_t const *ds,
405                                                     value_list_t const *vl,
406                                                     int *statuses)
407 {
408         riemann_message_t *msg;
409         size_t i;
410         gauge_t *rates = NULL;
411
412         /* Initialize the Msg structure. */
413         msg = riemann_message_new();
414         if (msg == NULL)
415         {
416                 ERROR ("write_riemann plugin: riemann_message_new failed.");
417                 return (NULL);
418         }
419
420         if (host->store_rates)
421         {
422                 rates = uc_get_rate(ds, vl);
423                 if (rates == NULL)
424                 {
425                         ERROR("write_riemann plugin: uc_get_rate failed.");
426                         riemann_message_free(msg);
427                         return (NULL);
428                 }
429         }
430
431         for (i = 0; i < vl->values_len; i++)
432         {
433                 riemann_event_t *event;
434
435                 event = wrr_value_to_event(host, ds, vl,
436                                            (int) i, rates, statuses[i]);
437                 if (event == NULL)
438                 {
439                         riemann_message_free(msg);
440                         sfree(rates);
441                         return (NULL);
442                 }
443                 riemann_message_append_events(msg, event, NULL);
444         }
445
446         sfree(rates);
447         return (msg);
448 } /* }}} riemann_message_t *wrr_value_list_to_message */
449
450 /*
451  * Always call while holding host->lock !
452  */
453 static int wrr_batch_flush_nolock(cdtime_t timeout,
454                                   struct riemann_host *host)
455 {
456         cdtime_t    now;
457         int         status = 0;
458
459         if (timeout > 0) {
460                 now = cdtime();
461                 if ((host->batch_init + timeout) > now)
462                         return status;
463         }
464         wrr_send(host, host->batch_msg);
465         riemann_message_free(host->batch_msg);
466
467         if (host->client_type != RIEMANN_CLIENT_UDP)
468         {
469                 riemann_message_t *response;
470
471                 response = riemann_client_recv_message(host->client);
472
473                 if (!response)
474                 {
475                         wrr_disconnect(host);
476                         return errno;
477                 }
478
479                 riemann_message_free(response);
480         }
481
482         host->batch_init = cdtime();
483         host->batch_msg = NULL;
484         return status;
485 }
486
487 static int wrr_batch_flush(cdtime_t timeout,
488         const char *identifier __attribute__((unused)),
489         user_data_t *user_data)
490 {
491         struct riemann_host *host;
492         int status;
493
494         if (user_data == NULL)
495                 return (-EINVAL);
496
497         host = user_data->data;
498         pthread_mutex_lock(&host->lock);
499         status = wrr_batch_flush_nolock(timeout, host);
500         if (status != 0)
501         c_complain (LOG_ERR, &host->init_complaint,
502                     "write_riemann plugin: riemann_client_send failed with status %i",
503                     status);
504     else
505         c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
506
507         pthread_mutex_unlock(&host->lock);
508         return status;
509 }
510
511 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
512                                     data_set_t const *ds,
513                                     value_list_t const *vl,
514                                     int *statuses)
515 {
516         riemann_message_t *msg;
517         size_t len;
518         int ret;
519     cdtime_t timeout;
520
521         msg = wrr_value_list_to_message(host, ds, vl, statuses);
522         if (msg == NULL)
523                 return -1;
524
525         pthread_mutex_lock(&host->lock);
526
527         if (host->batch_msg == NULL) {
528                 host->batch_msg = msg;
529         } else {
530                 int status;
531
532                 status = riemann_message_append_events_n(host->batch_msg,
533                                                          msg->n_events,
534                                                          msg->events);
535                 msg->n_events = 0;
536                 msg->events = NULL;
537
538                 riemann_message_free(msg);
539
540                 if (status != 0) {
541                         pthread_mutex_unlock(&host->lock);
542                         ERROR("write_riemann plugin: out of memory");
543                         return -1;
544                 }
545         }
546
547         len = riemann_message_get_packed_size(host->batch_msg);
548         ret = 0;
549         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
550                 ret = wrr_batch_flush_nolock(0, host);
551         } else {
552         if (host->batch_timeout > 0) {
553             timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
554             ret = wrr_batch_flush_nolock(timeout, host);
555         }
556     }
557
558         pthread_mutex_unlock(&host->lock);
559         return ret;
560 } /* }}} riemann_message_t *wrr_batch_add_value_list */
561
562 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
563 {
564         int                      status;
565         struct riemann_host     *host = ud->data;
566         riemann_message_t       *msg;
567
568         if (!host->notifications)
569                 return 0;
570
571     /*
572      * Never batch for notifications, send them ASAP
573      */
574         msg = wrr_notification_to_message(host, n);
575         if (msg == NULL)
576                 return (-1);
577
578         status = wrr_send(host, msg);
579         if (status != 0)
580         c_complain (LOG_ERR, &host->init_complaint,
581                     "write_riemann plugin: riemann_client_send failed with status %i",
582                     status);
583     else
584         c_release (LOG_DEBUG, &host->init_complaint,
585                    "write_riemann plugin: riemann_client_send succeeded");
586
587         riemann_message_free(msg);
588         return (status);
589 } /* }}} int wrr_notification */
590
591 static int wrr_write(const data_set_t *ds, /* {{{ */
592               const value_list_t *vl,
593               user_data_t *ud)
594 {
595         int                      status = 0;
596         int                      statuses[vl->values_len];
597         struct riemann_host     *host = ud->data;
598         riemann_message_t       *msg;
599
600         if (host->check_thresholds) {
601                 status = write_riemann_threshold_check(ds, vl, statuses);
602     if (status != 0)
603       return status;
604   } else {
605     memset (statuses, 0, sizeof (statuses));
606   }
607
608   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
609     wrr_batch_add_value_list(host, ds, vl, statuses);
610   } else {
611     msg = wrr_value_list_to_message(host, ds, vl, statuses);
612     if (msg == NULL)
613       return (-1);
614
615     status = wrr_send(host, msg);
616
617     riemann_message_free(msg);
618   }
619   return status;
620 } /* }}} int wrr_write */
621
622 static void wrr_free(void *p) /* {{{ */
623 {
624   struct riemann_host   *host = p;
625
626   if (host == NULL)
627     return;
628
629   pthread_mutex_lock(&host->lock);
630
631   host->reference_count--;
632   if (host->reference_count > 0)
633     {
634       pthread_mutex_unlock(&host->lock);
635       return;
636     }
637
638   wrr_disconnect(host);
639
640   pthread_mutex_destroy(&host->lock);
641   sfree(host);
642 } /* }}} void wrr_free */
643
644 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
645 {
646   struct riemann_host   *host = NULL;
647   int                    status = 0;
648   int                    i;
649   oconfig_item_t                *child;
650   char                   callback_name[DATA_MAX_NAME_LEN];
651   user_data_t            ud;
652
653   if ((host = calloc(1, sizeof(*host))) == NULL) {
654     ERROR ("write_riemann plugin: calloc failed.");
655     return ENOMEM;
656   }
657   pthread_mutex_init(&host->lock, NULL);
658   C_COMPLAIN_INIT (&host->init_complaint);
659   host->reference_count = 1;
660   host->node = NULL;
661   host->port = 0;
662   host->notifications = 1;
663   host->check_thresholds = 0;
664   host->store_rates = 1;
665   host->always_append_ds = 0;
666   host->batch_mode = 1;
667   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
668   host->batch_init = cdtime();
669   host->batch_timeout = 0;
670   host->ttl_factor = RIEMANN_TTL_FACTOR;
671   host->client = NULL;
672   host->client_type = RIEMANN_CLIENT_TCP;
673   host->timeout.tv_sec = 0;
674   host->timeout.tv_usec = 0;
675
676   status = cf_util_get_string(ci, &host->name);
677   if (status != 0) {
678     WARNING("write_riemann plugin: Required host name is missing.");
679     wrr_free(host);
680     return -1;
681   }
682
683   for (i = 0; i < ci->children_num; i++) {
684     /*
685      * The code here could be simplified but makes room
686      * for easy adding of new options later on.
687      */
688     child = &ci->children[i];
689     status = 0;
690
691     if (strcasecmp("Host", child->key) == 0) {
692       status = cf_util_get_string(child, &host->node);
693       if (status != 0)
694         break;
695     } else if (strcasecmp("Notifications", child->key) == 0) {
696       status = cf_util_get_boolean(child, &host->notifications);
697       if (status != 0)
698         break;
699     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
700       status = cf_util_get_string(child, &host->event_service_prefix);
701       if (status != 0)
702         break;
703     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
704       status = cf_util_get_boolean(child, &host->check_thresholds);
705       if (status != 0)
706         break;
707     } else if (strcasecmp("Batch", child->key) == 0) {
708       status = cf_util_get_boolean(child, &host->batch_mode);
709       if (status != 0)
710         break;
711     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
712       status = cf_util_get_int(child, &host->batch_max);
713       if (status != 0)
714         break;
715     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
716       status = cf_util_get_int(child, &host->batch_timeout);
717       if (status != 0)
718         break;
719     } else if (strcasecmp("Timeout", child->key) == 0) {
720       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
721       if (status != 0)
722         break;
723     } else if (strcasecmp("Port", child->key) == 0) {
724       host->port = cf_util_get_port_number(child);
725       if (host->port == -1) {
726         ERROR("write_riemann plugin: Invalid argument "
727               "configured for the \"Port\" "
728               "option.");
729         break;
730       }
731     } else if (strcasecmp("Protocol", child->key) == 0) {
732       char tmp[16];
733       status = cf_util_get_string_buffer(child,
734                                          tmp, sizeof(tmp));
735       if (status != 0)
736         {
737           ERROR("write_riemann plugin: cf_util_get_"
738                 "string_buffer failed with "
739                 "status %i.", status);
740           break;
741         }
742
743       if (strcasecmp("UDP", tmp) == 0)
744         host->client_type = RIEMANN_CLIENT_UDP;
745       else if (strcasecmp("TCP", tmp) == 0)
746         host->client_type = RIEMANN_CLIENT_TCP;
747       else if (strcasecmp("TLS", tmp) == 0)
748         host->client_type = RIEMANN_CLIENT_TLS;
749       else
750         WARNING("write_riemann plugin: The value "
751                 "\"%s\" is not valid for the "
752                 "\"Protocol\" option. Use "
753                 "either \"UDP\", \"TCP\" or \"TLS\".",
754                 tmp);
755     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
756       status = cf_util_get_string(child, &host->tls_ca_file);
757       if (status != 0)
758         {
759           ERROR("write_riemann plugin: cf_util_get_"
760                 "string_buffer failed with "
761                 "status %i.", status);
762           break;
763         }
764     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
765       status = cf_util_get_string(child, &host->tls_cert_file);
766       if (status != 0)
767         {
768           ERROR("write_riemann plugin: cf_util_get_"
769                 "string_buffer failed with "
770                 "status %i.", status);
771           break;
772         }
773     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
774       status = cf_util_get_string(child, &host->tls_key_file);
775       if (status != 0)
776         {
777           ERROR("write_riemann plugin: cf_util_get_"
778                 "string_buffer failed with "
779                 "status %i.", status);
780           break;
781         }
782     } else if (strcasecmp("StoreRates", child->key) == 0) {
783       status = cf_util_get_boolean(child, &host->store_rates);
784       if (status != 0)
785         break;
786     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
787       status = cf_util_get_boolean(child,
788                                    &host->always_append_ds);
789       if (status != 0)
790         break;
791     } else if (strcasecmp("TTLFactor", child->key) == 0) {
792       double tmp = NAN;
793       status = cf_util_get_double(child, &tmp);
794       if (status != 0)
795         break;
796       if (tmp >= 2.0) {
797         host->ttl_factor = tmp;
798       } else if (tmp >= 1.0) {
799         NOTICE("write_riemann plugin: The configured "
800                "TTLFactor is very small "
801                "(%.1f). A value of 2.0 or "
802                "greater is recommended.",
803                tmp);
804         host->ttl_factor = tmp;
805       } else if (tmp > 0.0) {
806         WARNING("write_riemann plugin: The configured "
807                 "TTLFactor is too small to be "
808                 "useful (%.1f). I'll use it "
809                 "since the user knows best, "
810                 "but under protest.",
811                 tmp);
812         host->ttl_factor = tmp;
813       } else { /* zero, negative and NAN */
814         ERROR("write_riemann plugin: The configured "
815               "TTLFactor is invalid (%.1f).",
816               tmp);
817       }
818     } else {
819       WARNING("write_riemann plugin: ignoring unknown config "
820               "option: \"%s\"", child->key);
821     }
822   }
823   if (status != 0) {
824     wrr_free(host);
825     return status;
826   }
827
828   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
829             host->name);
830   ud.data = host;
831   ud.free_func = wrr_free;
832
833   pthread_mutex_lock(&host->lock);
834
835   status = plugin_register_write(callback_name, wrr_write, &ud);
836
837   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
838     ud.free_func = NULL;
839     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
840   }
841   if (status != 0)
842     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
843             "failed with status %i.",
844             callback_name, status);
845   else /* success */
846     host->reference_count++;
847
848   status = plugin_register_notification(callback_name,
849                                         wrr_notification, &ud);
850   if (status != 0)
851     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
852             "failed with status %i.",
853             callback_name, status);
854   else /* success */
855     host->reference_count++;
856
857   if (host->reference_count <= 1)
858     {
859       /* Both callbacks failed => free memory.
860        * We need to unlock here, because riemann_free() will lock.
861        * This is not a race condition, because we're the only one
862        * holding a reference. */
863       pthread_mutex_unlock(&host->lock);
864       wrr_free(host);
865       return (-1);
866     }
867
868   host->reference_count--;
869   pthread_mutex_unlock(&host->lock);
870
871   return status;
872 } /* }}} int wrr_config_node */
873
874 static int wrr_config(oconfig_item_t *ci) /* {{{ */
875 {
876   int            i;
877   oconfig_item_t        *child;
878   int            status;
879
880   for (i = 0; i < ci->children_num; i++)  {
881     child = &ci->children[i];
882
883     if (strcasecmp("Node", child->key) == 0) {
884       wrr_config_node (child);
885     } else if (strcasecmp(child->key, "attribute") == 0) {
886       char *key = NULL;
887       char *val = NULL;
888
889       if (child->values_num != 2) {
890         WARNING("riemann attributes need both a key and a value.");
891         return (-1);
892       }
893       if (child->values[0].type != OCONFIG_TYPE_STRING ||
894           child->values[1].type != OCONFIG_TYPE_STRING) {
895         WARNING("riemann attribute needs string arguments.");
896         return (-1);
897       }
898       if ((key = strdup(child->values[0].value.string)) == NULL) {
899         WARNING("cannot allocate memory for attribute key.");
900         return (-1);
901       }
902       if ((val = strdup(child->values[1].value.string)) == NULL) {
903         WARNING("cannot allocate memory for attribute value.");
904         sfree(key);
905         return (-1);
906       }
907       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
908       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
909       DEBUG("write_riemann: got attr: %s => %s", key, val);
910       sfree(key);
911       sfree(val);
912     } else if (strcasecmp(child->key, "tag") == 0) {
913       char *tmp = NULL;
914       status = cf_util_get_string(child, &tmp);
915       if (status != 0)
916         continue;
917
918       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
919       DEBUG("write_riemann plugin: Got tag: %s", tmp);
920       sfree(tmp);
921     } else {
922       WARNING("write_riemann plugin: Ignoring unknown "
923               "configuration option \"%s\" at top level.",
924               child->key);
925     }
926   }
927   return (0);
928 } /* }}} int wrr_config */
929
930 void module_register(void)
931 {
932   plugin_register_complex_config("write_riemann", wrr_config);
933 }
934
935 /* vim: set sw=8 sts=8 ts=8 noet : */