initial sync from private repo
[collectd.git] / write_graphite.c
1 /**
2  * collectd - src/write_graphite.c
3  * Copyright (C) 2011  Scott Sanders
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Scott Sanders <scott@jssjr.com>
20  *
21  *   based on the excellent write_http plugin
22  **/
23
24 #include "collectd.h"
25 #include "common.h"
26 #include "plugin.h"
27 #include "configfile.h"
28
29 #include "utils_cache.h"
30 #include "utils_parse_option.h"
31
32 /* Folks without pthread will need to disable this plugin. */
33 #include <pthread.h>
34
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37 #include <sys/types.h>
38
39 #include <netinet/in.h>
40 #include <netdb.h>
41
42 #ifndef WG_FORMAT_NAME
43 #define WG_FORMAT_NAME(ret, ret_len, vl, prefix, name) \
44         wg_format_name (ret, ret_len, (vl)->host, (vl)->plugin, (vl)->plugin_instance, \
45                         (vl)->type, (vl)->type_instance, prefix, name)
46 #endif
47
48 /*
49  * Private variables
50  */
51 struct wg_callback
52 {
53     int      sock_fd;
54     struct hostent *server;
55
56     char    *host;
57     int      port;
58     char    *prefix;
59
60     char     send_buf[4096];
61     size_t   send_buf_free;
62     size_t   send_buf_fill;
63     cdtime_t send_buf_init_time;
64
65     pthread_mutex_t send_lock;
66 };
67
68
69 /*
70  * Functions
71  */
72 static void wg_reset_buffer (struct wg_callback *cb) /* {{{ */
73 {
74     memset (cb->send_buf, 0, sizeof (cb->send_buf));
75     cb->send_buf_free = sizeof (cb->send_buf);
76     cb->send_buf_fill = 0;
77     cb->send_buf_init_time = cdtime ();
78 } /* }}} wg_reset_buffer */
79
80 static int wg_send_buffer (struct wg_callback *cb) /* {{{ */
81 {
82     int status = 0;
83
84     status = write (cb->sock_fd, cb->send_buf, strlen (cb->send_buf));
85     if (status < 0)
86     {
87         ERROR ("write_graphite plugin: send failed with "
88                 "status %i (%s)",
89                 status,
90                 strerror (errno));
91
92         pthread_mutex_lock (&cb->send_lock);
93
94         DEBUG ("write_graphite plugin: closing socket and restting fd "
95                 "so reinit will occur");
96         close (cb->sock_fd);
97         cb->sock_fd = -1;
98
99         pthread_mutex_unlock (&cb->send_lock);
100
101         return (-1);
102     }
103     return (0);
104 } /* }}} wg_send_buffer */
105
106 static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) /* {{{ */
107 {
108     int status;
109
110     DEBUG ("write_graphite plugin: wg_flush_nolock: timeout = %.3f; "
111             "send_buf_fill = %zu;",
112             (double)timeout,
113             cb->send_buf_fill);
114
115     /* timeout == 0  => flush unconditionally */
116     if (timeout > 0)
117     {
118         cdtime_t now;
119
120         now = cdtime ();
121         if ((cb->send_buf_init_time + timeout) > now)
122             return (0);
123     }
124
125     if (cb->send_buf_fill <= 0)
126     {
127         cb->send_buf_init_time = cdtime ();
128         return (0);
129     }
130
131     status = wg_send_buffer (cb);
132     wg_reset_buffer (cb);
133
134     return (status);
135 } /* }}} wg_flush_nolock */
136
137 static int wg_callback_init (struct wg_callback *cb) /* {{{ */
138 {
139     int status;
140
141     struct sockaddr_in serv_addr;
142
143     if (cb->sock_fd > 0)
144         return (0);
145
146     cb->sock_fd = socket (AF_INET, SOCK_STREAM, 0);
147     if (cb->sock_fd < 0)
148     {
149         ERROR ("write_graphite plugin: socket failed: %s", strerror (errno));
150         return (-1);
151     }
152     cb->server = gethostbyname(cb->host);
153     if (cb->server == NULL)
154     {
155         ERROR ("write_graphite plugin: no such host");
156         return (-1);
157     }
158     memset (&serv_addr, 0, sizeof (serv_addr));
159     serv_addr.sin_family = AF_INET;
160     memcpy (&serv_addr.sin_addr.s_addr,
161                 cb->server->h_addr,
162                 cb->server->h_length);
163     serv_addr.sin_port = htons(cb->port);
164
165     status = connect(cb->sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
166     if (status < 0)
167     {
168         char errbuf[1024];
169         sstrerror (errno, errbuf, sizeof (errbuf));
170         ERROR ("write_graphite plugin: connect failed: %s", errbuf);
171         close (cb->sock_fd);
172         cb->sock_fd = -1;
173         return (-1);
174     }
175
176     wg_reset_buffer (cb);
177
178     return (0);
179 } /* }}} int wg_callback_init */
180
181 static void wg_callback_free (void *data) /* {{{ */
182 {
183     struct wg_callback *cb;
184
185     if (data == NULL)
186         return;
187
188     cb = data;
189
190     wg_flush_nolock (/* timeout = */ 0, cb);
191
192     close(cb->sock_fd);
193     sfree(cb->host);
194     sfree(cb->prefix);
195
196     sfree(cb);
197 } /* }}} void wg_callback_free */
198
199 static int wg_flush (cdtime_t timeout, /* {{{ */
200         const char *identifier __attribute__((unused)),
201         user_data_t *user_data)
202 {
203     struct wg_callback *cb;
204     int status;
205
206     if (user_data == NULL)
207         return (-EINVAL);
208
209     cb = user_data->data;
210
211     pthread_mutex_lock (&cb->send_lock);
212
213     if (cb->sock_fd < 0)
214     {
215         status = wg_callback_init (cb);
216         if (status != 0)
217         {
218             ERROR ("write_graphite plugin: wg_callback_init failed.");
219             pthread_mutex_unlock (&cb->send_lock);
220             return (-1);
221         }
222     }
223
224     status = wg_flush_nolock (timeout, cb);
225     pthread_mutex_unlock (&cb->send_lock);
226
227     return (status);
228 } /* }}} int wg_flush */
229
230 static int wg_format_values (char *ret, size_t ret_len, /* {{{ */
231         int ds_num, const data_set_t *ds, const value_list_t *vl,
232         _Bool store_rates)
233 {
234     size_t offset = 0;
235     int status;
236     gauge_t *rates = NULL;
237
238     assert (0 == strcmp (ds->type, vl->type));
239
240     memset (ret, 0, ret_len);
241
242 #define BUFFER_ADD(...) do { \
243     status = ssnprintf (ret + offset, ret_len - offset, \
244             __VA_ARGS__); \
245     if (status < 1) \
246     { \
247         sfree (rates); \
248         return (-1); \
249     } \
250     else if (((size_t) status) >= (ret_len - offset)) \
251     { \
252         sfree (rates); \
253         return (-1); \
254     } \
255     else \
256     offset += ((size_t) status); \
257 } while (0)
258
259     if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
260         BUFFER_ADD ("%f", vl->values[ds_num].gauge);
261     else if (store_rates)
262     {
263         if (rates == NULL)
264             rates = uc_get_rate (ds, vl);
265         if (rates == NULL)
266         {
267             WARNING ("format_values: "
268                     "uc_get_rate failed.");
269             return (-1);
270         }
271         BUFFER_ADD ("%g", rates[ds_num]);
272     }
273     else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
274         BUFFER_ADD ("%llu", vl->values[ds_num].counter);
275     else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
276         BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
277     else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
278         BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
279     else
280     {
281         ERROR ("format_values plugin: Unknown data source type: %i",
282                 ds->ds[ds_num].type);
283         sfree (rates);
284         return (-1);
285     }
286
287 #undef BUFFER_ADD
288
289     sfree (rates);
290     return (0);
291 } /* }}} int wg_format_values */
292
293 static int normalize_hostname (char *dst, const char *src) /* {{{ */
294 {
295     size_t i;
296
297     int reps = 0;
298
299     for (i = 0; i < strlen(src) ; i++)
300     {
301         if (src[i] == '.')
302         {
303             dst[i] = '_';
304             ++reps;
305         }
306         else
307             dst[i] = src[i];
308     }
309     dst[i] = '\0';
310
311     return reps;
312 } /* }}} int normalize_hostname */
313
314 static int wg_format_name (char *ret, int ret_len, /* {{{ */
315                 const char *hostname,
316                 const char *plugin, const char *plugin_instance,
317                 const char *type, const char *type_instance,
318                 const char *prefix, const char *ds_name)
319 {
320     int  status;
321     char *n_hostname;
322
323     assert (plugin != NULL);
324     assert (type != NULL);
325
326     if ((n_hostname = malloc(strlen(hostname)+1)) == NULL)
327     {
328         ERROR ("Unable to allocate memory for normalized hostname buffer");
329         return (-1);
330     }
331
332     if (normalize_hostname(n_hostname, hostname) == -1)
333     {
334         ERROR ("Unable to normalize hostname");
335         return (-1);
336     }
337
338     if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0))
339     {
340         if ((type_instance == NULL) || (strlen (type_instance) == 0))
341         {
342             if ((ds_name == NULL) || (strlen (ds_name) == 0))
343                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s",
344                         prefix, n_hostname, plugin, type);
345             else
346                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s",
347                         prefix, n_hostname, plugin, type, ds_name);
348         }
349         else
350         {
351             if ((ds_name == NULL) || (strlen (ds_name) == 0))
352                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s",
353                         prefix, n_hostname, plugin, type,
354                         type_instance);
355             else
356                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s.%s",
357                         prefix, n_hostname, plugin, type,
358                         type_instance, ds_name);
359         }
360     }
361     else
362     {
363         if ((type_instance == NULL) || (strlen (type_instance) == 0))
364         {
365             if ((ds_name == NULL) || (strlen (ds_name) == 0))
366                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s",
367                         prefix, n_hostname, plugin,
368                         plugin_instance, type);
369             else
370                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s.%s",
371                         prefix, n_hostname, plugin,
372                         plugin_instance, type, ds_name);
373         }
374         else
375         {
376             if ((ds_name == NULL) || (strlen (ds_name) == 0))
377                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s",
378                         prefix, n_hostname, plugin,
379                         plugin_instance, type, type_instance);
380             else
381                 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s.%s",
382                         prefix, n_hostname, plugin,
383                         plugin_instance, type, type_instance, ds_name);
384         }
385     }
386
387     sfree(n_hostname);
388
389     if ((status < 1) || (status >= ret_len))
390         return (-1);
391     return (0);
392 } /* }}} int wg_format_name */
393
394 static int wg_send_message (const char* key, const char* value, cdtime_t time, struct wg_callback *cb) /* {{{ */
395 {
396     int status;
397     size_t message_len;
398     char message[1024];
399
400     message_len = (size_t) ssnprintf (message, sizeof (message),
401             "%s %s %.0f\n",
402             key,
403             value,
404             CDTIME_T_TO_DOUBLE(time));
405     if (message_len >= sizeof (message)) {
406         ERROR ("write_graphite plugin: message buffer too small: "
407                 "Need %zu bytes.", message_len + 1);
408         return (-1);
409     }
410
411
412     pthread_mutex_lock (&cb->send_lock);
413
414     if (cb->sock_fd < 0)
415     {
416         status = wg_callback_init (cb);
417         if (status != 0)
418         {
419             ERROR ("write_graphite plugin: wg_callback_init failed.");
420             pthread_mutex_unlock (&cb->send_lock);
421             return (-1);
422         }
423     }
424
425     if (message_len >= cb->send_buf_free)
426     {
427         status = wg_flush_nolock (/* timeout = */ 0, cb);
428         if (status != 0)
429         {
430             pthread_mutex_unlock (&cb->send_lock);
431             return (status);
432         }
433     }
434     assert (message_len < cb->send_buf_free);
435
436     /* `message_len + 1' because `message_len' does not include the
437      * trailing null byte. Neither does `send_buffer_fill'. */
438     memcpy (cb->send_buf + cb->send_buf_fill,
439             message, message_len + 1);
440     cb->send_buf_fill += message_len;
441     cb->send_buf_free -= message_len;
442
443     DEBUG ("write_graphite plugin: <%s:%d> buf %zu/%zu (%g%%) \"%s\"",
444             cb->host,
445             cb->port,
446             cb->send_buf_fill, sizeof (cb->send_buf),
447             100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)),
448             message);
449
450     /* Check if we have enough space for this message. */
451     pthread_mutex_unlock (&cb->send_lock);
452
453     return (0);
454 } /* }}} int wg_send_message */
455
456 static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, /* {{{ */
457                         struct wg_callback *cb)
458 {
459     char key[10*DATA_MAX_NAME_LEN];
460     char values[512];
461
462     int status, i;
463
464     if (0 != strcmp (ds->type, vl->type))
465     {
466         ERROR ("write_graphite plugin: DS type does not match "
467                 "value list type");
468         return -1;
469     }
470
471     if (ds->ds_num > 1)
472     {
473         for (i = 0; i < ds->ds_num; i++)
474         {
475             /* Copy the identifier to `key' and escape it. */
476             status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, ds->ds[i].name);
477             if (status != 0)
478             {
479                 ERROR ("write_graphite plugin: error with format_name");
480                 return (status);
481             }
482
483             escape_string (key, sizeof (key));
484             /* Convert the values to an ASCII representation and put that into
485              * `values'. */
486             status = wg_format_values (values, sizeof (values), i, ds, vl, 0);
487             if (status != 0)
488             {
489                 ERROR ("write_graphite plugin: error with "
490                         "wg_format_values");
491                 return (status);
492             }
493
494             /* Send the message to graphite */
495             status = wg_send_message (key, values, vl->time, cb);
496             if (status != 0)
497             {
498                 ERROR ("write_graphite plugin: error with "
499                         "wg_send_message");
500                 return (status);
501             }
502         }
503     }
504     else
505     {
506         /* Copy the identifier to `key' and escape it. */
507         status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, NULL);
508         if (status != 0)
509         {
510             ERROR ("write_graphite plugin: error with format_name");
511             return (status);
512         }
513
514         escape_string (key, sizeof (key));
515         /* Convert the values to an ASCII representation and put that into
516          * `values'. */
517         status = wg_format_values (values, sizeof (values), 0, ds, vl, 0);
518         if (status != 0)
519         {
520             ERROR ("write_graphite plugin: error with "
521                     "wg_format_values");
522             return (status);
523         }
524
525         /* Send the message to graphite */
526         status = wg_send_message (key, values, vl->time, cb);
527         if (status != 0)
528         {
529             ERROR ("write_graphite plugin: error with "
530                     "wg_send_message");
531             return (status);
532         }
533     }
534
535     return (0);
536 } /* }}} int wg_write_messages */
537
538 static int wg_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
539         user_data_t *user_data)
540 {
541     struct wg_callback *cb;
542     int status;
543
544     if (user_data == NULL)
545         return (-EINVAL);
546
547     cb = user_data->data;
548
549     status = wg_write_messages (ds, vl, cb);
550
551     return (status);
552 } /* }}} int wg_write */
553
554 static int config_set_number (int *dest, /* {{{ */
555         oconfig_item_t *ci)
556 {
557     if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
558     {
559         WARNING ("write_graphite plugin: The `%s' config option "
560                 "needs exactly one numeric argument.", ci->key);
561         return (-1);
562     }
563
564     *dest = ci->values[0].value.number;
565
566     return (0);
567 } /* }}} int config_set_number */
568
569 static int config_set_string (char **ret_string, /* {{{ */
570         oconfig_item_t *ci)
571 {
572     char *string;
573
574     if ((ci->values_num != 1)
575             || (ci->values[0].type != OCONFIG_TYPE_STRING))
576     {
577         WARNING ("write_graphite plugin: The `%s' config option "
578                 "needs exactly one string argument.", ci->key);
579         return (-1);
580     }
581
582     string = strdup (ci->values[0].value.string);
583     if (string == NULL)
584     {
585         ERROR ("write_graphite plugin: strdup failed.");
586         return (-1);
587     }
588
589     if (*ret_string != NULL)
590         sfree (*ret_string);
591     *ret_string = string;
592
593     return (0);
594 } /* }}} int config_set_string */
595
596 static int wg_config_carbon (oconfig_item_t *ci) /* {{{ */
597 {
598     struct wg_callback *cb;
599     user_data_t user_data;
600     int i;
601
602     cb = malloc (sizeof (*cb));
603     if (cb == NULL)
604     {
605         ERROR ("write_graphite plugin: malloc failed.");
606         return (-1);
607     }
608     memset (cb, 0, sizeof (*cb));
609     cb->sock_fd = -1;
610     cb->host = NULL;
611     cb->port = 2003;
612     cb->prefix = NULL;
613     cb->server = NULL;
614
615     pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
616
617     config_set_string (&cb->host, ci);
618     if (cb->host == NULL)
619         return (-1);
620
621     for (i = 0; i < ci->children_num; i++)
622     {
623         oconfig_item_t *child = ci->children + i;
624
625         if (strcasecmp ("Port", child->key) == 0)
626             config_set_number (&cb->port, child);
627         else if (strcasecmp ("Prefix", child->key) == 0)
628             config_set_string (&cb->prefix, child);
629         else
630         {
631             ERROR ("write_graphite plugin: Invalid configuration "
632                         "option: %s.", child->key);
633         }
634     }
635
636     DEBUG ("write_graphite: Registering write callback to carbon agent "
637             "%s:%d", cb->host, cb->port);
638
639     memset (&user_data, 0, sizeof (user_data));
640     user_data.data = cb;
641     user_data.free_func = NULL;
642     plugin_register_flush ("write_graphite", wg_flush, &user_data);
643
644     user_data.free_func = wg_callback_free;
645     plugin_register_write ("write_graphite", wg_write, &user_data);
646
647     return (0);
648 } /* }}} int wg_config_carbon */
649
650 static int wg_config (oconfig_item_t *ci) /* {{{ */
651 {
652     int i;
653
654     for (i = 0; i < ci->children_num; i++)
655     {
656         oconfig_item_t *child = ci->children + i;
657
658         if (strcasecmp ("Carbon", child->key) == 0)
659             wg_config_carbon (child);
660         else
661         {
662             ERROR ("write_graphite plugin: Invalid configuration "
663                         "option: %s.", child->key);
664         }
665     }
666
667     return (0);
668 } /* }}} int wg_config */
669
670 void module_register (void) /* {{{ */
671 {
672     plugin_register_complex_config ("write_graphite", wg_config);
673 } /* }}} void module_register */
674
675 /* vim: set fdm=marker sw=4 ts=4 sts=4 tw=78 et : */