2 * collectd - src/riemann.c
4 * Copyright (C) 2012 Pierre-Yves Ritschard <pyr@spootnik.org>
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
33 #define RIEMANN_DELAY 1
34 #define RIEMANN_PORT 5555
35 #define RIEMANN_MAX_TAGS 37
36 #define RIEMANN_EXTRA_TAGS 32
39 struct riemann_host *next;
40 #define F_CONNECT 0x01
44 char name[DATA_MAX_NAME_LEN];
49 struct riemann_event {
51 char service[DATA_MAX_NAME_LEN];
52 const char *tags[RIEMANN_MAX_TAGS];
55 char *riemann_tags[RIEMANN_EXTRA_TAGS];
58 int riemann_send(struct riemann_host *, Msg *);
59 int riemann_notification(const notification_t *, user_data_t *);
60 int riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
61 int riemann_connect(struct riemann_host *);
62 void riemann_free(void *);
63 int riemann_config_host(oconfig_item_t *);
64 int riemann_config(oconfig_item_t *);
65 void module_register(void);
68 riemann_send(struct riemann_host *host, Msg *msg)
73 len = msg__get_packed_size(msg);
74 DEBUG("riemann_write: packed size computed: %ld", len);
75 if ((buf = calloc(1, len)) == NULL) {
76 WARNING("riemann_write: failing to alloc buf!");
82 if (write(host->s, buf, len) != len) {
83 WARNING("riemann_write: could not send out full packet");
92 riemann_notification(const notification_t *n, user_data_t *ud)
95 struct riemann_host *host = ud->data;
97 Event ev = EVENT__INIT;
99 const char *tags[RIEMANN_MAX_TAGS];
100 char service[DATA_MAX_NAME_LEN];
101 notification_meta_t *meta;
106 { NOTIF_OKAY, "ok" },
107 { NOTIF_WARNING, "warning" },
108 { NOTIF_FAILURE, "critical" },
116 ev.host = host->name;
117 ev.time = CDTIME_T_TO_TIME_T(n->time);
121 severities[i].code > 0 && severities[i].code != n->severity;
124 ev.state = severities[i].name;
127 ev.tags = (char **)tags;
129 tags[1] = "notification";
131 for (i = 0; i < riemann_tagcount; i++)
132 tags[ev.n_tags++] = riemann_tags[i];
134 ssnprintf(service, sizeof(service),
135 "%s-%s-%s-%s", n->plugin, n->plugin_instance,
136 n->type, n->type_instance);
137 ev.service = service;
138 ev.description = (char *)n->message;
141 * Pull in values from threshold
144 meta != NULL && strcasecmp(meta->name, "CurrentValue") != 0;
150 ev.metric_d = meta->nm_value.nm_double;
153 return riemann_send(host, &msg);
157 riemann_write(const data_set_t *ds,
158 const value_list_t *vl,
163 struct riemann_host *host = ud->data;
166 struct riemann_event *event_tab, *event;
168 if ((status = riemann_connect(host)) != 0)
171 msg.n_events = vl->values_len;
174 * Get rid of allocations up front
176 if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
177 (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
184 * Now produce valid protobuf structures
186 for (i = 0; i < vl->values_len; i++) {
187 event = &event_tab[i];
188 event__init(&event->ev);
192 ev->host = host->name;
194 ev->time = CDTIME_T_TO_TIME_T(vl->time);
196 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
198 ev->tags = (char **)event->tags;
199 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
200 event->tags[1] = vl->plugin;
201 event->tags[2] = ds->ds[i].name;
202 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
203 event->tags[ev->n_tags++] = vl->plugin_instance;
205 if (vl->type && strlen(vl->type)) {
206 event->tags[ev->n_tags++] = vl->type;
208 if (vl->type_instance && strlen(vl->type_instance)) {
209 event->tags[ev->n_tags++] = vl->type_instance;
212 /* add user defined extra tags */
213 for (j = 0; j < riemann_tagcount; j++)
214 event->tags[ev->n_tags++] = riemann_tags[j];
216 switch (ds->ds[i].type) {
217 case DS_TYPE_COUNTER:
218 ev->has_metric_sint64 = 1;
219 ev->metric_sint64 = vl->values[i].counter;
222 ev->has_metric_d = 1;
223 ev->metric_d = vl->values[i].gauge;
226 ev->has_metric_sint64 = 1;
227 ev->metric_sint64 = vl->values[i].derive;
229 case DS_TYPE_ABSOLUTE:
230 ev->has_metric_sint64 = 1;
231 ev->metric_sint64 = vl->values[i].absolute;
234 WARNING("riemann_write: unknown metric type: %d",
238 ssnprintf(event->service, sizeof(event->service),
239 "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
240 vl->type, vl->type_instance, ds->ds[i].name);
241 ev->service = event->service;
242 DEBUG("riemann_write: %s ready to send", ev->service);
246 status = riemann_send(host, &msg);
252 riemann_connect(struct riemann_host *host)
255 struct addrinfo *ai, *res, hints;
256 struct sockaddr_in *sin4;
257 struct sockaddr_in6 *sin6;
259 if (host->flags & F_CONNECT)
262 memset(&hints, 0, sizeof(hints));
263 hints.ai_family = PF_UNSPEC;
264 hints.ai_socktype = SOCK_DGRAM;
266 if ((e = getaddrinfo(host->name, NULL, &hints, &res)) != 0) {
267 WARNING("could not resolve host \"%s\": %s",
268 host->name, gai_strerror(e));
272 for (ai = res; ai != NULL; ai = ai->ai_next) {
273 pthread_mutex_lock(&host->lock);
275 * check if another thread did not already succesfully connect
277 if (host->flags & F_CONNECT) {
282 if ((host->s = socket(ai->ai_family, SOCK_DGRAM, 0)) == -1) {
283 pthread_mutex_unlock(&host->lock);
284 WARNING("riemann_connect: could not open socket");
289 switch (ai->ai_family) {
291 sin4 = (struct sockaddr_in *)ai->ai_addr;
292 sin4->sin_port = ntohs(host->port);
295 sin6 = (struct sockaddr_in6 *)ai->ai_addr;
296 sin6->sin6_port = ntohs(host->port);
299 WARNING("riemann_connect: unsupported address family");
301 pthread_mutex_unlock(&host->lock);
306 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
308 host->flags |= ~F_CONNECT;
309 pthread_mutex_unlock(&host->lock);
313 host->flags |= F_CONNECT;
314 DEBUG("got a succesful connection for: %s", host->name);
315 pthread_mutex_unlock(&host->lock);
321 WARNING("riemann_connect: no suitable hosts found");
329 riemann_free(void *p)
331 struct riemann_host *host = p;
333 if (host->flags & F_CONNECT)
339 riemann_config_host(oconfig_item_t *ci)
341 struct riemann_host *host = NULL;
344 oconfig_item_t *child;
345 char w_cb_name[DATA_MAX_NAME_LEN];
346 char n_cb_name[DATA_MAX_NAME_LEN];
349 if (ci->values_num != 1 ||
350 ci->values[0].type != OCONFIG_TYPE_STRING) {
351 WARNING("riemann hosts need one string argument");
355 if ((host = calloc(1, sizeof (*host))) == NULL) {
356 WARNING("riemann host allocation failed");
360 if (cf_util_get_string_buffer(ci, host->name,
361 sizeof(host->name)) != 0) {
362 WARNING("riemann host name too long");
367 host->port = RIEMANN_PORT;
368 host->delay = RIEMANN_DELAY;
369 for (i = 0; i < ci->children_num; i++) {
371 * The code here could be simplified but makes room
372 * for easy adding of new options later on.
374 child = &ci->children[i];
377 if (strcasecmp(child->key, "port") == 0) {
378 if ((status = cf_util_get_port_number(child)) < 0) {
379 WARNING("invalid port number");
384 } else if (strcasecmp(child->key, "delay") == 0) {
385 if ((status = cf_util_get_int(ci, &host->delay)) != 0)
388 WARNING("riemann plugin: ignoring unknown config "
389 "option: \"%s\"", child->key);
397 pthread_mutex_init(&host->lock, NULL);
398 ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%d",
399 host->name, host->port);
400 ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%d",
401 host->name, host->port);
402 DEBUG("riemann w_cb_name: %s", w_cb_name);
403 DEBUG("riemann n_cb_name: %s", n_cb_name);
405 ud.free_func = riemann_free;
407 if ((status = plugin_register_write(w_cb_name, riemann_write, &ud)) != 0)
410 if ((status = plugin_register_notification(n_cb_name,
411 riemann_notification,
413 plugin_unregister_write(w_cb_name);
420 riemann_config(oconfig_item_t *ci)
424 oconfig_item_t *child;
426 for (i = 0; i < ci->children_num; i++) {
427 child = &ci->children[i];
429 if (strcasecmp(child->key, "host") == 0) {
430 riemann_config_host(child);
431 } else if (strcasecmp(child->key, "tag") == 0) {
432 if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
433 WARNING("riemann plugin: too many tags");
437 cf_util_get_string(child, &newtag);
440 riemann_tags[riemann_tagcount++] = newtag;
441 DEBUG("riemann_config: got tag: %s", newtag);
444 WARNING ("riemann plugin: Ignoring unknown "
445 "configuration option \"%s\" at top level.",
453 module_register(void)
455 DEBUG("riemann: module_register");
457 plugin_register_complex_config ("riemann", riemann_config);