Merge pull request #3339 from jkohen/patch-1
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "utils_cache.h"
32 #include <sys/socket.h>
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <netdb.h>
36 #include <inttypes.h>
37 #include <pthread.h>
38 #include <stddef.h>
39
40 #include <stdlib.h>
41 #ifndef HAVE_ASPRINTF
42 /*
43  * Uses asprintf() portable implementation from
44  * https://github.com/littlstar/asprintf.c/blob/master/
45  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
46  */
47 #include <stdio.h>
48 #include <stdarg.h>
49
50 int vasprintf(char **str, const char *fmt, va_list args) {
51         int size = 0;
52         va_list tmpa;
53         // copy
54         va_copy(tmpa, args);
55         // apply variadic arguments to
56         // sprintf with format to get size
57         size = vsnprintf(NULL, size, fmt, tmpa);
58         // toss args
59         va_end(tmpa);
60         // return -1 to be compliant if
61         // size is less than 0
62         if (size < 0) { return -1; }
63         // alloc with size plus 1 for `\0'
64         *str = (char *) malloc(size + 1);
65         // return -1 to be compliant
66         // if pointer is `NULL'
67         if (NULL == *str) { return -1; }
68         // format string with original
69         // variadic arguments and set new size
70         size = vsprintf(*str, fmt, args);
71         return size;
72 }
73
74 int asprintf(char **str, const char *fmt, ...) {
75         int size = 0;
76         va_list args;
77         // init variadic argumens
78         va_start(args, fmt);
79         // format and get size
80         size = vasprintf(str, fmt, args);
81         // toss args
82         va_end(args);
83         return size;
84 }
85
86 #endif
87
88 #define SENSU_HOST              "localhost"
89 #define SENSU_PORT              "3030"
90
91 struct str_list {
92         int nb_strs;
93         char **strs;
94 };
95
96 struct sensu_host {
97         char                    *name;
98         char                    *event_service_prefix;
99         struct str_list metric_handlers;
100         struct str_list notification_handlers;
101 #define F_READY      0x01
102         uint8_t                  flags;
103         pthread_mutex_t  lock;
104         _Bool            notifications;
105         _Bool            metrics;
106         _Bool                    store_rates;
107         _Bool                    always_append_ds;
108         char                    *separator;
109         char                    *node;
110         char                    *service;
111         int              s;
112         struct addrinfo *res;
113         int                          reference_count;
114 };
115
116 static char     *sensu_tags;
117 static char     **sensu_attrs;
118 static size_t sensu_attrs_num;
119
120 static int add_str_to_list(struct str_list *strs,
121                 const char *str_to_add) /* {{{ */
122 {
123         char **old_strs_ptr = strs->strs;
124         char *newstr = strdup(str_to_add);
125         if (newstr == NULL) {
126                 ERROR("write_sensu plugin: Unable to alloc memory");
127                 return -1;
128         }
129         strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
130         if (strs->strs == NULL) {
131                 strs->strs = old_strs_ptr;
132                 free(newstr);
133                 ERROR("write_sensu plugin: Unable to alloc memory");
134                 return -1;
135         }
136         strs->strs[strs->nb_strs] = newstr;
137         strs->nb_strs++;
138         return 0;
139 }
140 /* }}} int add_str_to_list */
141
142 static void free_str_list(struct str_list *strs) /* {{{ */
143 {
144         int i;
145         for (i=0; i<strs->nb_strs; i++)
146                 free(strs->strs[i]);
147         free(strs->strs);
148 }
149 /* }}} void free_str_list */
150
151 static int sensu_connect(struct sensu_host *host) /* {{{ */
152 {
153         int                      e;
154         struct addrinfo         *ai, hints;
155         char const              *node;
156         char const              *service;
157
158         // Resolve the target if we haven't done already
159         if (!(host->flags & F_READY)) {
160                 memset(&hints, 0, sizeof(hints));
161                 memset(&service, 0, sizeof(service));
162                 host->res = NULL;
163                 hints.ai_family = AF_INET;
164                 hints.ai_socktype = SOCK_STREAM;
165 #ifdef AI_ADDRCONFIG
166                 hints.ai_flags |= AI_ADDRCONFIG;
167 #endif
168
169                 node = (host->node != NULL) ? host->node : SENSU_HOST;
170                 service = (host->service != NULL) ? host->service : SENSU_PORT;
171
172                 if ((e = getaddrinfo(node, service, &hints, &(host->res))) != 0) {
173                         ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
174                                         node, gai_strerror(e));
175                         return -1;
176                 }
177                 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
178                                 node, service);
179                 host->flags |= F_READY;
180         }
181
182         struct linger so_linger;
183         host->s = -1;
184         for (ai = host->res; ai != NULL; ai = ai->ai_next) {
185                 // create the socket
186                 if ((host->s = socket(ai->ai_family,
187                                       ai->ai_socktype,
188                                       ai->ai_protocol)) == -1) {
189                         continue;
190                 }
191
192                 // Set very low close() lingering
193                 so_linger.l_onoff = 1;
194                 so_linger.l_linger = 3;
195                 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
196                         WARNING("write_sensu plugin: failed to set socket close() lingering");
197
198                 // connect the socket
199                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
200                         close(host->s);
201                         host->s = -1;
202                         continue;
203                 }
204                 DEBUG("write_sensu plugin: connected");
205                 break;
206         }
207
208         if (host->s < 0) {
209                 WARNING("write_sensu plugin: Unable to connect to sensu client");
210                 return -1;
211         }
212         return 0;
213 } /* }}} int sensu_connect */
214
215 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
216 {
217         if (host->s != -1)
218                 close(host->s);
219         host->s = -1;
220
221 } /* }}} void sensu_close_socket */
222
223 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
224 {
225         int res;
226         char *ret_str;
227         char *temp_str;
228         int i;
229         if (list->nb_strs == 0) {
230                 ret_str = malloc(sizeof(char));
231                 if (ret_str == NULL) {
232                         ERROR("write_sensu plugin: Unable to alloc memory");
233                         return NULL;
234                 }
235                 ret_str[0] = '\0';
236         }
237
238         res = asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
239         if (res == -1) {
240                 ERROR("write_sensu plugin: Unable to alloc memory");
241                 return NULL;
242         }
243         for (i=1; i<list->nb_strs; i++) {
244                 res = asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
245                 free(temp_str);
246                 if (res == -1) {
247                         ERROR("write_sensu plugin: Unable to alloc memory");
248                         return NULL;
249                 }
250                 temp_str = ret_str;
251         }
252         res = asprintf(&ret_str, "%s]", temp_str);
253         free(temp_str);
254         if (res == -1) {
255                 ERROR("write_sensu plugin: Unable to alloc memory");
256                 return NULL;
257         }
258
259         return ret_str;
260 } /* }}} char *build_json_str_list*/
261
262 int sensu_format_name2(char *ret, int ret_len,
263                 const char *hostname,
264                 const char *plugin, const char *plugin_instance,
265                 const char *type, const char *type_instance,
266                 const char *separator)
267 {
268         char *buffer;
269         size_t buffer_size;
270
271         buffer = ret;
272         buffer_size = (size_t) ret_len;
273
274 #define APPEND(str) do {          \
275         size_t l = strlen (str);        \
276         if (l >= buffer_size)           \
277                 return (ENOBUFS);             \
278         memcpy (buffer, (str), l);      \
279         buffer += l; buffer_size -= l;  \
280 } while (0)
281
282         assert (plugin != NULL);
283         assert (type != NULL);
284
285         APPEND (hostname);
286         APPEND (separator);
287         APPEND (plugin);
288         if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
289         {
290                 APPEND ("-");
291                 APPEND (plugin_instance);
292         }
293         APPEND (separator);
294         APPEND (type);
295         if ((type_instance != NULL) && (type_instance[0] != 0))
296         {
297                 APPEND ("-");
298                 APPEND (type_instance);
299         }
300         assert (buffer_size > 0);
301         buffer[0] = 0;
302
303 #undef APPEND
304         return (0);
305 } /* int sensu_format_name2 */
306
307 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
308 {
309         int i;
310         int len=strlen(orig_name);
311         for (i=0; i<len; i++) {
312                 // some plugins like ipmi generate special characters in metric name
313                 switch(orig_name[i]) {
314                         case '(': orig_name[i] = '_'; break;
315                         case ')': orig_name[i] = '_'; break;
316                         case ' ': orig_name[i] = '_'; break;
317                         case '"': orig_name[i] = '_'; break;
318                         case '\'': orig_name[i] = '_'; break;
319                         case '+': orig_name[i] = '_'; break;
320                 }
321         }
322 } /* }}} char *replace_sensu_name_reserved */
323
324 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
325                 data_set_t const *ds,
326                 value_list_t const *vl, size_t index,
327                 gauge_t const *rates,
328                 int status)
329 {
330         char name_buffer[5 * DATA_MAX_NAME_LEN];
331         char service_buffer[6 * DATA_MAX_NAME_LEN];
332         int i;
333         char *ret_str;
334         char *temp_str;
335         char *value_str;
336         int res;
337         // First part of the JSON string
338         const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
339
340         char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
341         if (handlers_str == NULL) {
342                 ERROR("write_sensu plugin: Unable to alloc memory");
343                 return NULL;
344         }
345
346         // incorporate the handlers
347         if (strlen(handlers_str) == 0) {
348                 free(handlers_str);
349                 ret_str = strdup(part1);
350                 if (ret_str == NULL) {
351                         ERROR("write_sensu plugin: Unable to alloc memory");
352                         return NULL;
353                 }
354         }
355         else {
356                 res = asprintf(&ret_str, "%s, %s", part1, handlers_str);
357                 free(handlers_str);
358                 if (res == -1) {
359                         ERROR("write_sensu plugin: Unable to alloc memory");
360                         return NULL;
361                 }
362         }
363
364         // incorporate the plugin name information
365         res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
366         free(ret_str);
367         if (res == -1) {
368                 ERROR("write_sensu plugin: Unable to alloc memory");
369                 return NULL;
370         }
371         ret_str = temp_str;
372
373         // incorporate the plugin type
374         res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
375         free(ret_str);
376         if (res == -1) {
377                 ERROR("write_sensu plugin: Unable to alloc memory");
378                 return NULL;
379         }
380         ret_str = temp_str;
381
382         // incorporate the plugin instance if any
383         if (vl->plugin_instance[0] != 0) {
384                 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
385                 free(ret_str);
386                 if (res == -1) {
387                         ERROR("write_sensu plugin: Unable to alloc memory");
388                         return NULL;
389                 }
390                 ret_str = temp_str;
391         }
392
393         // incorporate the plugin type instance if any
394         if (vl->type_instance[0] != 0) {
395                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
396                 free(ret_str);
397                 if (res == -1) {
398                         ERROR("write_sensu plugin: Unable to alloc memory");
399                         return NULL;
400                 }
401                 ret_str = temp_str;
402         }
403
404         // incorporate the data source type
405         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
406                 char ds_type[DATA_MAX_NAME_LEN];
407                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
408                 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
409                 free(ret_str);
410                 if (res == -1) {
411                         ERROR("write_sensu plugin: Unable to alloc memory");
412                         return NULL;
413                 }
414                 ret_str = temp_str;
415         } else {
416                 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
417                 free(ret_str);
418                 if (res == -1) {
419                         ERROR("write_sensu plugin: Unable to alloc memory");
420                         return NULL;
421                 }
422                 ret_str = temp_str;
423         }
424
425         // incorporate the data source name
426         res = asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
427         free(ret_str);
428         if (res == -1) {
429                 ERROR("write_sensu plugin: Unable to alloc memory");
430                 return NULL;
431         }
432         ret_str = temp_str;
433
434         // incorporate the data source index
435         {
436                 char ds_index[DATA_MAX_NAME_LEN];
437                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
438                 res = asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
439                 free(ret_str);
440                 if (res == -1) {
441                         ERROR("write_sensu plugin: Unable to alloc memory");
442                         return NULL;
443                 }
444                 ret_str = temp_str;
445         }
446
447         // add key value attributes from config if any
448         for (i = 0; i < sensu_attrs_num; i += 2) {
449                 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
450                 free(ret_str);
451                 if (res == -1) {
452                         ERROR("write_sensu plugin: Unable to alloc memory");
453                         return NULL;
454                 }
455                 ret_str = temp_str;
456         }
457
458         // incorporate sensu tags from config if any
459         if (strlen(sensu_tags) != 0) {
460                 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
461                 free(ret_str);
462                 if (res == -1) {
463                         ERROR("write_sensu plugin: Unable to alloc memory");
464                         return NULL;
465                 }
466                 ret_str = temp_str;
467         }
468
469         // calculate the value and set to a string
470         if (ds->ds[index].type == DS_TYPE_GAUGE) {
471                 double tmp_v = (double) vl->values[index].gauge;
472                 res = asprintf(&value_str, "%.8f", tmp_v);
473                 if (res == -1) {
474                         free(ret_str);
475                         ERROR("write_sensu plugin: Unable to alloc memory");
476                         return NULL;
477                 }
478         } else if (rates != NULL) {
479                 double tmp_v = (double) rates[index];
480                 res = asprintf(&value_str, "%.8f", tmp_v);
481                 if (res == -1) {
482                         free(ret_str);
483                         ERROR("write_sensu plugin: Unable to alloc memory");
484                         return NULL;
485                 }
486         } else {
487                 if (ds->ds[index].type == DS_TYPE_DERIVE) {
488                         res = asprintf(&value_str, "%"PRIi64, vl->values[index].derive);
489                         if (res == -1) {
490                                 free(ret_str);
491                                 ERROR("write_sensu plugin: Unable to alloc memory");
492                                 return NULL;
493                         }
494                 }
495                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
496                         res = asprintf(&value_str, "%"PRIu64, vl->values[index].absolute);
497                         if (res == -1) {
498                                 free(ret_str);
499                                 ERROR("write_sensu plugin: Unable to alloc memory");
500                                 return NULL;
501                         }
502                 }
503                 else {
504                         res = asprintf(&value_str, "%llu", vl->values[index].counter);
505                         if (res == -1) {
506                                 free(ret_str);
507                                 ERROR("write_sensu plugin: Unable to alloc memory");
508                                 return NULL;
509                         }
510                 }
511         }
512
513         // Generate the full service name
514         sensu_format_name2(name_buffer, sizeof(name_buffer),
515                 vl->host, vl->plugin, vl->plugin_instance,
516                 vl->type, vl->type_instance, host->separator);
517         if (host->always_append_ds || (ds->ds_num > 1)) {
518                 if (host->event_service_prefix == NULL)
519                         ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
520                                         name_buffer, ds->ds[index].name);
521                 else
522                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
523                                         host->event_service_prefix, name_buffer, ds->ds[index].name);
524         } else {
525                 if (host->event_service_prefix == NULL)
526                         sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
527                 else
528                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
529                                         host->event_service_prefix, name_buffer);
530         }
531
532         // Replace collectd sensor name reserved characters so that time series DB is happy
533         in_place_replace_sensu_name_reserved(service_buffer);
534
535         // finalize the buffer by setting the output and closing curly bracket
536         res = asprintf(&temp_str, "%s, \"output\": \"%s %s %ld\"}\n", ret_str, service_buffer, value_str, CDTIME_T_TO_TIME_T(vl->time));
537         free(ret_str);
538         free(value_str);
539         if (res == -1) {
540                 ERROR("write_sensu plugin: Unable to alloc memory");
541                 return NULL;
542         }
543         ret_str = temp_str;
544
545         DEBUG("write_sensu plugin: Successfully created json for metric: "
546                         "host = \"%s\", service = \"%s\"",
547                         vl->host, service_buffer);
548         return ret_str;
549 } /* }}} char *sensu_value_to_json */
550
551 /*
552  * Uses replace_str2() implementation from
553  * http://creativeandcritical.net/str-replace-c/
554  * copyright (c) Laird Shaw, under public domain.
555  */
556 char *replace_str(const char *str, const char *old, /* {{{ */
557                 const char *new)
558 {
559         char *ret, *r;
560         const char *p, *q;
561         size_t oldlen = strlen(old);
562         size_t count = strlen(new);
563         size_t retlen = count;
564         size_t newlen = count;
565         int samesize = (oldlen == newlen);
566
567         if (!samesize) {
568                 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
569                         count++;
570                 /* This is undefined if p - str > PTRDIFF_MAX */
571                 retlen = p - str + strlen(p) + count * (newlen - oldlen);
572         } else
573                 retlen = strlen(str);
574
575         ret = malloc(retlen + 1);
576         if (ret == NULL)
577                 return NULL;
578         // added to original: not optimized, but keeps valgrind happy.
579         memset(ret, 0, retlen + 1);
580
581         r = ret;
582         p = str;
583         while (1) {
584                 /* If the old and new strings are different lengths - in other
585                  * words we have already iterated through with strstr above,
586                  * and thus we know how many times we need to call it - then we
587                  * can avoid the final (potentially lengthy) call to strstr,
588                  * which we already know is going to return NULL, by
589                  * decrementing and checking count.
590                  */
591                 if (!samesize && !count--)
592                         break;
593                 /* Otherwise i.e. when the old and new strings are the same
594                  * length, and we don't know how many times to call strstr,
595                  * we must check for a NULL return here (we check it in any
596                  * event, to avoid further conditions, and because there's
597                  * no harm done with the check even when the old and new
598                  * strings are different lengths).
599                  */
600                 if ((q = strstr(p, old)) == NULL)
601                         break;
602                 /* This is undefined if q - p > PTRDIFF_MAX */
603                 ptrdiff_t l = q - p;
604                 memcpy(r, p, l);
605                 r += l;
606                 memcpy(r, new, newlen);
607                 r += newlen;
608                 p = q + oldlen;
609         }
610         strncpy(r, p, strlen(p));
611
612         return ret;
613 } /* }}} char *replace_str */
614
615 static char *replace_json_reserved(const char *message) /* {{{ */
616 {
617         char *msg = replace_str(message, "\\", "\\\\");
618         if (msg == NULL) {
619                 ERROR("write_sensu plugin: Unable to alloc memory");
620                 return NULL;
621         }
622         char *tmp = replace_str(msg, "\"", "\\\"");
623         free(msg);
624         if (tmp == NULL) {
625                 ERROR("write_sensu plugin: Unable to alloc memory");
626                 return NULL;
627         }
628         msg = replace_str(tmp, "\n", "\\\n");
629         free(tmp);
630         if (msg == NULL) {
631                 ERROR("write_sensu plugin: Unable to alloc memory");
632                 return NULL;
633         }
634         return msg;
635 } /* }}} char *replace_json_reserved */
636
637 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
638                 notification_t const *n)
639 {
640         char service_buffer[6 * DATA_MAX_NAME_LEN];
641         char const *severity;
642         notification_meta_t *meta;
643         char *ret_str;
644         char *temp_str;
645         int status;
646         int i;
647         int res;
648         // add the severity/status
649         switch (n->severity) {
650                 case NOTIF_OKAY:
651                         severity = "OK";
652                         status = 0;
653                         break;
654                 case NOTIF_WARNING:
655                         severity = "WARNING";
656                         status = 1;
657                         break;
658                 case NOTIF_FAILURE:
659                         severity = "CRITICAL";
660                         status = 2;
661                         break;
662                 default:
663                         severity = "UNKNOWN";
664                         status = 3;
665         }
666         res = asprintf(&temp_str, "{\"status\": %d", status);
667         if (res == -1) {
668                 ERROR("write_sensu plugin: Unable to alloc memory");
669                 return NULL;
670         }
671         ret_str = temp_str;
672
673         // incorporate the timestamp
674         res = asprintf(&temp_str, "%s, \"timestamp\": %ld", ret_str, CDTIME_T_TO_TIME_T(n->time));
675         free(ret_str);
676         if (res == -1) {
677                 ERROR("write_sensu plugin: Unable to alloc memory");
678                 return NULL;
679         }
680         ret_str = temp_str;
681
682         char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
683         if (handlers_str == NULL) {
684                 ERROR("write_sensu plugin: Unable to alloc memory");
685                 return NULL;
686         }
687         // incorporate the handlers
688         if (strlen(handlers_str) != 0) {
689                 res = asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
690                 free(ret_str);
691                 free(handlers_str);
692                 if (res == -1) {
693                         ERROR("write_sensu plugin: Unable to alloc memory");
694                         return NULL;
695                 }
696                 ret_str = temp_str;
697         } else {
698                 free(handlers_str);
699         }
700
701         // incorporate the plugin name information if any
702         if (n->plugin[0] != 0) {
703                 res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
704                 free(ret_str);
705                 if (res == -1) {
706                         ERROR("write_sensu plugin: Unable to alloc memory");
707                         return NULL;
708                 }
709                 ret_str = temp_str;
710         }
711
712         // incorporate the plugin type if any
713         if (n->type[0] != 0) {
714                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
715                 free(ret_str);
716                 if (res == -1) {
717                         ERROR("write_sensu plugin: Unable to alloc memory");
718                         return NULL;
719                 }
720                 ret_str = temp_str;
721         }
722
723         // incorporate the plugin instance if any
724         if (n->plugin_instance[0] != 0) {
725                 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
726                 free(ret_str);
727                 if (res == -1) {
728                         ERROR("write_sensu plugin: Unable to alloc memory");
729                         return NULL;
730                 }
731                 ret_str = temp_str;
732         }
733
734         // incorporate the plugin type instance if any
735         if (n->type_instance[0] != 0) {
736                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
737                 free(ret_str);
738                 if (res == -1) {
739                         ERROR("write_sensu plugin: Unable to alloc memory");
740                         return NULL;
741                 }
742                 ret_str = temp_str;
743         }
744
745         // add key value attributes from config if any
746         for (i = 0; i < sensu_attrs_num; i += 2) {
747                 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
748                 free(ret_str);
749                 if (res == -1) {
750                         ERROR("write_sensu plugin: Unable to alloc memory");
751                         return NULL;
752                 }
753                 ret_str = temp_str;
754         }
755
756         // incorporate sensu tags from config if any
757         if (strlen(sensu_tags) != 0) {
758                 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
759                 free(ret_str);
760                 if (res == -1) {
761                         ERROR("write_sensu plugin: Unable to alloc memory");
762                         return NULL;
763                 }
764                 ret_str = temp_str;
765         }
766
767         // incorporate the service name
768         sensu_format_name2(service_buffer, sizeof(service_buffer),
769                                 /* host */ "", n->plugin, n->plugin_instance,
770                                 n->type, n->type_instance, host->separator);
771         // replace sensu event name chars that are considered illegal
772         in_place_replace_sensu_name_reserved(service_buffer);
773         res = asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
774         free(ret_str);
775         if (res == -1) {
776                 ERROR("write_sensu plugin: Unable to alloc memory");
777                 return NULL;
778         }
779         ret_str = temp_str;
780
781         // incorporate the check output
782         if (n->message[0] != 0) {
783                 char *msg = replace_json_reserved(n->message);
784                 if (msg == NULL) {
785                         ERROR("write_sensu plugin: Unable to alloc memory");
786                         return NULL;
787                 }
788                 res = asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
789                 free(ret_str);
790                 free(msg);
791                 if (res == -1) {
792                         ERROR("write_sensu plugin: Unable to alloc memory");
793                         return NULL;
794                 }
795                 ret_str = temp_str;
796         }
797
798         // Pull in values from threshold and add extra attributes
799         for (meta = n->meta; meta != NULL; meta = meta->next) {
800                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
801                         res = asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
802                         free(ret_str);
803                         if (res == -1) {
804                                 ERROR("write_sensu plugin: Unable to alloc memory");
805                                 return NULL;
806                         }
807                         ret_str = temp_str;
808                 }
809                 if (meta->type == NM_TYPE_STRING) {
810                         res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
811                         free(ret_str);
812                         if (res == -1) {
813                                 ERROR("write_sensu plugin: Unable to alloc memory");
814                                 return NULL;
815                         }
816                         ret_str = temp_str;
817                 }
818         }
819
820         // close the curly bracket
821         res = asprintf(&temp_str, "%s}\n", ret_str);
822         free(ret_str);
823         if (res == -1) {
824                 ERROR("write_sensu plugin: Unable to alloc memory");
825                 return NULL;
826         }
827         ret_str = temp_str;
828
829         DEBUG("write_sensu plugin: Successfully created JSON for notification: "
830                                 "host = \"%s\", service = \"%s\", state = \"%s\"",
831                                 n->host, service_buffer, severity);
832         return ret_str;
833 } /* }}} char *sensu_notification_to_json */
834
835 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
836 {
837         int status = 0;
838         size_t  buffer_len;
839
840         status = sensu_connect(host);
841         if (status != 0)
842                 return status;
843
844         buffer_len = strlen(msg);
845
846         status = (int) swrite(host->s, msg, buffer_len);
847         sensu_close_socket(host);
848
849         if (status != 0) {
850                 char errbuf[1024];
851                 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
852                                 (host->node != NULL) ? host->node : SENSU_HOST,
853                                 (host->service != NULL) ? host->service : SENSU_PORT,
854                                 sstrerror(errno, errbuf, sizeof(errbuf)));
855                 return -1;
856         }
857
858         return 0;
859 } /* }}} int sensu_send_msg */
860
861
862 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
863 {
864         int status = 0;
865
866         status = sensu_send_msg(host, msg);
867         if (status != 0) {
868                 host->flags &= ~F_READY;
869                 if (host->res != NULL) {
870                         freeaddrinfo(host->res);
871                         host->res = NULL;
872                 }
873                 return status;
874         }
875
876         return 0;
877 } /* }}} int sensu_send */
878
879
880 static int sensu_write(const data_set_t *ds, /* {{{ */
881               const value_list_t *vl,
882               user_data_t *ud)
883 {
884         int status = 0;
885         int statuses[vl->values_len];
886         struct sensu_host       *host = ud->data;
887         gauge_t *rates = NULL;
888         int i;
889         char *msg;
890
891         pthread_mutex_lock(&host->lock);
892         memset(statuses, 0, vl->values_len * sizeof(*statuses));
893
894         if (host->store_rates) {
895                 rates = uc_get_rate(ds, vl);
896                 if (rates == NULL) {
897                         ERROR("write_sensu plugin: uc_get_rate failed.");
898                         pthread_mutex_unlock(&host->lock);
899                         return -1;
900                 }
901         }
902         for (i = 0; i < (size_t) vl->values_len; i++) {
903                 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
904                 if (msg == NULL) {
905                         sfree(rates);
906                         pthread_mutex_unlock(&host->lock);
907                         return -1;
908                 }
909                 status = sensu_send(host, msg);
910                 free(msg);
911                 if (status != 0) {
912                         ERROR("write_sensu plugin: sensu_send failed with status %i", status);
913                         pthread_mutex_unlock(&host->lock);
914                         sfree(rates);
915                         return status;
916                 }
917         }
918         sfree(rates);
919         pthread_mutex_unlock(&host->lock);
920         return status;
921 } /* }}} int sensu_write */
922
923 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
924 {
925         int     status;
926         struct sensu_host *host = ud->data;
927         char *msg;
928
929         pthread_mutex_lock(&host->lock);
930
931         msg = sensu_notification_to_json(host, n);
932         if (msg == NULL) {
933                 pthread_mutex_unlock(&host->lock);
934                 return -1;
935         }
936
937         status = sensu_send(host, msg);
938         free(msg);
939         if (status != 0)
940                 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
941         pthread_mutex_unlock(&host->lock);
942
943         return status;
944 } /* }}} int sensu_notification */
945
946 static void sensu_free(void *p) /* {{{ */
947 {
948         struct sensu_host *host = p;
949
950         if (host == NULL)
951                 return;
952
953         pthread_mutex_lock(&host->lock);
954
955         host->reference_count--;
956         if (host->reference_count > 0) {
957                 pthread_mutex_unlock(&host->lock);
958                 return;
959         }
960
961         sensu_close_socket(host);
962         if (host->res != NULL) {
963                 freeaddrinfo(host->res);
964                 host->res = NULL;
965         }
966         sfree(host->service);
967         sfree(host->event_service_prefix);
968         sfree(host->name);
969         sfree(host->node);
970         sfree(host->separator);
971         free_str_list(&(host->metric_handlers));
972         free_str_list(&(host->notification_handlers));
973         pthread_mutex_destroy(&host->lock);
974         sfree(host);
975 } /* }}} void sensu_free */
976
977
978 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
979 {
980         struct sensu_host       *host = NULL;
981         int                                     status = 0;
982         int                                     i;
983         oconfig_item_t          *child;
984         char                            callback_name[DATA_MAX_NAME_LEN];
985         user_data_t                     ud;
986
987         if ((host = calloc(1, sizeof(*host))) == NULL) {
988                 ERROR("write_sensu plugin: calloc failed.");
989                 return ENOMEM;
990         }
991         pthread_mutex_init(&host->lock, NULL);
992         host->reference_count = 1;
993         host->node = NULL;
994         host->service = NULL;
995         host->notifications = 0;
996         host->metrics = 0;
997         host->store_rates = 1;
998         host->always_append_ds = 0;
999         host->metric_handlers.nb_strs = 0;
1000         host->metric_handlers.strs = NULL;
1001         host->notification_handlers.nb_strs = 0;
1002         host->notification_handlers.strs = NULL;
1003         host->separator = strdup("/");
1004         if (host->separator == NULL) {
1005                 ERROR("write_sensu plugin: Unable to alloc memory");
1006                 sensu_free(host);
1007                 return -1;
1008         }
1009
1010         status = cf_util_get_string(ci, &host->name);
1011         if (status != 0) {
1012                 WARNING("write_sensu plugin: Required host name is missing.");
1013                 sensu_free(host);
1014                 return -1;
1015         }
1016
1017         for (i = 0; i < ci->children_num; i++) {
1018                 child = &ci->children[i];
1019                 status = 0;
1020
1021                 if (strcasecmp("Host", child->key) == 0) {
1022                         status = cf_util_get_string(child, &host->node);
1023                         if (status != 0)
1024                                 break;
1025                 } else if (strcasecmp("Notifications", child->key) == 0) {
1026                         status = cf_util_get_boolean(child, &host->notifications);
1027                         if (status != 0)
1028                                 break;
1029                 } else if (strcasecmp("Metrics", child->key) == 0) {
1030                                         status = cf_util_get_boolean(child, &host->metrics);
1031                                         if (status != 0)
1032                                                 break;
1033                 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1034                         status = cf_util_get_string(child, &host->event_service_prefix);
1035                         if (status != 0)
1036                                 break;
1037                 } else if (strcasecmp("Separator", child->key) == 0) {
1038                                 status = cf_util_get_string(child, &host->separator);
1039                                 if (status != 0)
1040                                         break;
1041                 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1042                         char *temp_str = NULL;
1043                         status = cf_util_get_string(child, &temp_str);
1044                         if (status != 0)
1045                                 break;
1046                         status = add_str_to_list(&(host->metric_handlers), temp_str);
1047                         free(temp_str);
1048                         if (status != 0)
1049                                 break;
1050                 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1051                         char *temp_str = NULL;
1052                         status = cf_util_get_string(child, &temp_str);
1053                         if (status != 0)
1054                                 break;
1055                         status = add_str_to_list(&(host->notification_handlers), temp_str);
1056                         free(temp_str);
1057                         if (status != 0)
1058                                 break;
1059                 } else if (strcasecmp("Port", child->key) == 0) {
1060                         status = cf_util_get_service(child, &host->service);
1061                         if (status != 0) {
1062                                 ERROR("write_sensu plugin: Invalid argument "
1063                                                 "configured for the \"Port\" "
1064                                                 "option.");
1065                                 break;
1066                         }
1067                 } else if (strcasecmp("StoreRates", child->key) == 0) {
1068                         status = cf_util_get_boolean(child, &host->store_rates);
1069                         if (status != 0)
1070                                 break;
1071                 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1072                         status = cf_util_get_boolean(child,
1073                                         &host->always_append_ds);
1074                         if (status != 0)
1075                                 break;
1076                 } else {
1077                         WARNING("write_sensu plugin: ignoring unknown config "
1078                                 "option: \"%s\"", child->key);
1079                 }
1080         }
1081         if (status != 0) {
1082                 sensu_free(host);
1083                 return status;
1084         }
1085
1086         if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1087                         sensu_free(host);
1088                         WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1089                         return -1;
1090                 }
1091
1092         if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1093                 sensu_free(host);
1094                 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1095                 return -1;
1096         }
1097
1098         if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1099                 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1100                 host->notifications = 1;
1101         }
1102
1103         if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1104                 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1105                 host->metrics = 1;
1106         }
1107
1108         if (!(host->notifications || host->metrics)) {
1109                 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1110                 sensu_free(host);
1111                 return -1;
1112         }
1113
1114         ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1115         ud.data = host;
1116         ud.free_func = sensu_free;
1117
1118         pthread_mutex_lock(&host->lock);
1119
1120         if (host->metrics) {
1121                 status = plugin_register_write(callback_name, sensu_write, &ud);
1122                 if (status != 0)
1123                         WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1124                                         "failed with status %i.",
1125                                         callback_name, status);
1126                 else /* success */
1127                         host->reference_count++;
1128         }
1129
1130         if (host->notifications) {
1131                 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1132                 if (status != 0)
1133                         WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1134                                         "failed with status %i.",
1135                                         callback_name, status);
1136                 else
1137                         host->reference_count++;
1138         }
1139
1140         if (host->reference_count <= 1) {
1141                 /* Both callbacks failed => free memory.
1142                  * We need to unlock here, because sensu_free() will lock.
1143                  * This is not a race condition, because we're the only one
1144                  * holding a reference. */
1145                 pthread_mutex_unlock(&host->lock);
1146                 sensu_free(host);
1147                 return -1;
1148         }
1149
1150         host->reference_count--;
1151         pthread_mutex_unlock(&host->lock);
1152
1153         return status;
1154 } /* }}} int sensu_config_node */
1155
1156 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1157 {
1158         int              i;
1159         oconfig_item_t  *child;
1160         int              status;
1161         struct str_list sensu_tags_arr;
1162
1163         sensu_tags_arr.nb_strs = 0;
1164         sensu_tags_arr.strs = NULL;
1165         sensu_tags = malloc(sizeof(char));
1166         if (sensu_tags == NULL) {
1167                 ERROR("write_sensu plugin: Unable to alloc memory");
1168                 return -1;
1169         }
1170         sensu_tags[0] = '\0';
1171
1172         for (i = 0; i < ci->children_num; i++)  {
1173                 child = &ci->children[i];
1174
1175                 if (strcasecmp("Node", child->key) == 0) {
1176                         sensu_config_node(child);
1177                 } else if (strcasecmp(child->key, "attribute") == 0) {
1178                         char *key = NULL;
1179                         char *val = NULL;
1180
1181                         if (child->values_num != 2) {
1182                                 WARNING("sensu attributes need both a key and a value.");
1183                                 free(sensu_tags);
1184                                 return -1;
1185                         }
1186                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
1187                             child->values[1].type != OCONFIG_TYPE_STRING) {
1188                                 WARNING("sensu attribute needs string arguments.");
1189                                 free(sensu_tags);
1190                                 return -1;
1191                         }
1192                         if ((key = strdup(child->values[0].value.string)) == NULL) {
1193                                 ERROR("write_sensu plugin: Unable to alloc memory");
1194                                 free(sensu_tags);
1195                                 return -1;
1196                         }
1197                         if ((val = strdup(child->values[1].value.string)) == NULL) {
1198                                 free(sensu_tags);
1199                                 free(key);
1200                                 ERROR("write_sensu plugin: Unable to alloc memory");
1201                                 return -1;
1202                         }
1203                         strarray_add(&sensu_attrs, &sensu_attrs_num, key);
1204                         strarray_add(&sensu_attrs, &sensu_attrs_num, val);
1205                         DEBUG("write_sensu: got attr: %s => %s", key, val);
1206                         sfree(key);
1207                         sfree(val);
1208                 } else if (strcasecmp(child->key, "tag") == 0) {
1209                         char *tmp = NULL;
1210                         status = cf_util_get_string(child, &tmp);
1211                         if (status != 0)
1212                                 continue;
1213
1214                         status = add_str_to_list(&sensu_tags_arr, tmp);
1215                         sfree(tmp);
1216                         if (status != 0)
1217                                 continue;
1218                         DEBUG("write_sensu plugin: Got tag: %s", tmp);
1219                 } else {
1220                         WARNING("write_sensu plugin: Ignoring unknown "
1221                                  "configuration option \"%s\" at top level.",
1222                                  child->key);
1223                 }
1224         }
1225         if (sensu_tags_arr.nb_strs > 0) {
1226                 free(sensu_tags);
1227                 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1228                 free_str_list(&sensu_tags_arr);
1229                 if (sensu_tags == NULL) {
1230                         ERROR("write_sensu plugin: Unable to alloc memory");
1231                         return -1;
1232                 }
1233         }
1234         return 0;
1235 } /* }}} int sensu_config */
1236
1237 void module_register(void)
1238 {
1239         plugin_register_complex_config("write_sensu", sensu_config);
1240 }
1241
1242 /* vim: set sw=8 sts=8 ts=8 noet : */