2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
31 #include <riemann/riemann-client.h>
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
49 char *event_service_prefix;
53 _Bool check_thresholds;
55 _Bool always_append_ds;
58 riemann_client_type_t client_type;
59 riemann_client_t *client;
64 riemann_message_t *batch_msg;
70 static char **riemann_tags;
71 static size_t riemann_tags_num;
72 static char **riemann_attrs;
73 static size_t riemann_attrs_num;
75 /* host->lock must be held when calling this function. */
76 static int wrr_connect(struct riemann_host *host) /* {{{ */
84 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
85 port = (host->port) ? host->port : RIEMANN_PORT;
89 host->client = riemann_client_create(host->client_type, node, port,
90 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
91 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
92 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
93 RIEMANN_CLIENT_OPTION_NONE);
94 if (host->client == NULL) {
95 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
99 DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
103 } /* }}} int wrr_connect */
105 /* host->lock must be held when calling this function. */
106 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
111 riemann_client_free(host->client);
115 } /* }}} int wrr_disconnect */
118 * Function to send messages to riemann.
120 * Acquires the host lock, disconnects on errors.
122 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
125 pthread_mutex_lock (&host->lock);
127 status = wrr_connect(host);
131 status = riemann_client_send_message(host->client, msg);
133 wrr_disconnect(host);
134 pthread_mutex_unlock(&host->lock);
139 * For TCP we need to receive message acknowledgemenent.
141 if (host->client_type != RIEMANN_CLIENT_UDP)
143 riemann_message_t *response;
145 response = riemann_client_recv_message(host->client);
147 if (response == NULL)
149 wrr_disconnect(host);
150 pthread_mutex_unlock(&host->lock);
153 riemann_message_free(response);
156 pthread_mutex_unlock (&host->lock);
158 } /* }}} int wrr_send */
160 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
161 notification_t const *n)
163 riemann_message_t *msg;
164 riemann_event_t *event;
165 char service_buffer[6 * DATA_MAX_NAME_LEN];
166 char const *severity;
167 notification_meta_t *meta;
172 case NOTIF_OKAY: severity = "ok"; break;
173 case NOTIF_WARNING: severity = "warning"; break;
174 case NOTIF_FAILURE: severity = "critical"; break;
175 default: severity = "unknown";
178 format_name(service_buffer, sizeof(service_buffer),
179 /* host = */ "", n->plugin, n->plugin_instance,
180 n->type, n->type_instance);
182 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
183 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
184 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
185 RIEMANN_EVENT_FIELD_STATE, severity,
186 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
187 RIEMANN_EVENT_FIELD_NONE);
190 riemann_event_string_attribute_add(event, "host", n->host);
191 if (n->plugin[0] != 0)
192 riemann_event_string_attribute_add(event, "plugin", n->plugin);
193 if (n->plugin_instance[0] != 0)
194 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
197 riemann_event_string_attribute_add(event, "type", n->type);
198 if (n->type_instance[0] != 0)
199 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
201 for (i = 0; i < riemann_attrs_num; i += 2)
202 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
204 for (i = 0; i < riemann_tags_num; i++)
205 riemann_event_tag_add(event, riemann_tags[i]);
207 if (n->message[0] != 0)
208 riemann_event_string_attribute_add(event, "description", n->message);
210 /* Pull in values from threshold and add extra attributes */
211 for (meta = n->meta; meta != NULL; meta = meta->next)
213 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
215 riemann_event_set(event,
216 RIEMANN_EVENT_FIELD_METRIC_D,
217 (double) meta->nm_value.nm_double,
218 RIEMANN_EVENT_FIELD_NONE);
222 if (meta->type == NM_TYPE_STRING) {
223 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
228 msg = riemann_message_create_with_events(event, NULL);
231 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
232 riemann_event_free (event);
236 DEBUG("write_riemann plugin: Successfully created message for notification: "
237 "host = \"%s\", service = \"%s\", state = \"%s\"",
238 event->host, event->service, event->state);
240 } /* }}} riemann_message_t *wrr_notification_to_message */
242 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
243 data_set_t const *ds,
244 value_list_t const *vl, size_t index,
245 gauge_t const *rates,
248 riemann_event_t *event;
249 char name_buffer[5 * DATA_MAX_NAME_LEN];
250 char service_buffer[6 * DATA_MAX_NAME_LEN];
253 event = riemann_event_new();
256 ERROR("write_riemann plugin: riemann_event_new() failed.");
260 format_name(name_buffer, sizeof(name_buffer),
261 /* host = */ "", vl->plugin, vl->plugin_instance,
262 vl->type, vl->type_instance);
263 if (host->always_append_ds || (ds->ds_num > 1))
265 if (host->event_service_prefix == NULL)
266 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
267 &name_buffer[1], ds->ds[index].name);
269 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
270 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
274 if (host->event_service_prefix == NULL)
275 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
277 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
278 host->event_service_prefix, &name_buffer[1]);
281 riemann_event_set(event,
282 RIEMANN_EVENT_FIELD_HOST, vl->host,
283 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
284 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
285 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
286 "plugin", vl->plugin,
288 "ds_name", ds->ds[index].name,
290 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
291 RIEMANN_EVENT_FIELD_NONE);
293 if (host->check_thresholds) {
294 const char *state = NULL;
311 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
312 RIEMANN_EVENT_FIELD_NONE);
315 if (vl->plugin_instance[0] != 0)
316 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
317 if (vl->type_instance[0] != 0)
318 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
320 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
322 char ds_type[DATA_MAX_NAME_LEN];
324 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
325 DS_TYPE_TO_STRING(ds->ds[index].type));
326 riemann_event_string_attribute_add(event, "ds_type", ds_type);
330 riemann_event_string_attribute_add(event, "ds_type",
331 DS_TYPE_TO_STRING(ds->ds[index].type));
335 char ds_index[DATA_MAX_NAME_LEN];
337 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
338 riemann_event_string_attribute_add(event, "ds_index", ds_index);
341 for (i = 0; i < riemann_attrs_num; i += 2)
342 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
344 for (i = 0; i < riemann_tags_num; i++)
345 riemann_event_tag_add(event, riemann_tags[i]);
347 if (ds->ds[index].type == DS_TYPE_GAUGE)
349 riemann_event_set(event,
350 RIEMANN_EVENT_FIELD_METRIC_D,
351 (double) vl->values[index].gauge,
352 RIEMANN_EVENT_FIELD_NONE);
354 else if (rates != NULL)
356 riemann_event_set(event,
357 RIEMANN_EVENT_FIELD_METRIC_D,
358 (double) rates[index],
359 RIEMANN_EVENT_FIELD_NONE);
365 if (ds->ds[index].type == DS_TYPE_DERIVE)
366 metric = (int64_t) vl->values[index].derive;
367 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
368 metric = (int64_t) vl->values[index].absolute;
370 metric = (int64_t) vl->values[index].counter;
372 riemann_event_set(event,
373 RIEMANN_EVENT_FIELD_METRIC_S64,
375 RIEMANN_EVENT_FIELD_NONE);
378 DEBUG("write_riemann plugin: Successfully created message for metric: "
379 "host = \"%s\", service = \"%s\"",
380 event->host, event->service);
382 } /* }}} riemann_event_t *wrr_value_to_event */
384 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
385 data_set_t const *ds,
386 value_list_t const *vl,
389 riemann_message_t *msg;
391 gauge_t *rates = NULL;
393 /* Initialize the Msg structure. */
394 msg = riemann_message_new();
397 ERROR ("write_riemann plugin: riemann_message_new failed.");
401 if (host->store_rates)
403 rates = uc_get_rate(ds, vl);
406 ERROR("write_riemann plugin: uc_get_rate failed.");
407 riemann_message_free(msg);
412 for (i = 0; i < vl->values_len; i++)
414 riemann_event_t *event;
416 event = wrr_value_to_event(host, ds, vl,
417 (int) i, rates, statuses[i]);
420 riemann_message_free(msg);
424 riemann_message_append_events(msg, event, NULL);
429 } /* }}} riemann_message_t *wrr_value_list_to_message */
432 * Always call while holding host->lock !
434 static int wrr_batch_flush_nolock(cdtime_t timeout,
435 struct riemann_host *host)
442 if ((host->batch_init + timeout) > now)
445 wrr_send(host, host->batch_msg);
446 riemann_message_free(host->batch_msg);
448 if (host->client_type != RIEMANN_CLIENT_UDP)
450 riemann_message_t *response;
452 response = riemann_client_recv_message(host->client);
456 wrr_disconnect(host);
460 riemann_message_free(response);
463 host->batch_init = cdtime();
464 host->batch_msg = NULL;
468 static int wrr_batch_flush(cdtime_t timeout,
469 const char *identifier __attribute__((unused)),
470 user_data_t *user_data)
472 struct riemann_host *host;
475 if (user_data == NULL)
478 host = user_data->data;
479 pthread_mutex_lock(&host->lock);
480 status = wrr_batch_flush_nolock(timeout, host);
482 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
485 pthread_mutex_unlock(&host->lock);
489 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
490 data_set_t const *ds,
491 value_list_t const *vl,
494 riemann_message_t *msg;
498 msg = wrr_value_list_to_message(host, ds, vl, statuses);
502 pthread_mutex_lock(&host->lock);
504 if (host->batch_msg == NULL) {
505 host->batch_msg = msg;
509 status = riemann_message_append_events_n(host->batch_msg,
515 riemann_message_free(msg);
518 pthread_mutex_unlock(&host->lock);
519 ERROR("write_riemann plugin: out of memory");
524 len = riemann_message_get_packed_size(host->batch_msg);
526 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
527 ret = wrr_batch_flush_nolock(0, host);
530 pthread_mutex_unlock(&host->lock);
532 } /* }}} riemann_message_t *wrr_batch_add_value_list */
534 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
537 struct riemann_host *host = ud->data;
538 riemann_message_t *msg;
540 if (!host->notifications)
544 * Never batch for notifications, send them ASAP
546 msg = wrr_notification_to_message(host, n);
550 status = wrr_send(host, msg);
552 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
555 riemann_message_free(msg);
557 } /* }}} int wrr_notification */
559 static int wrr_write(const data_set_t *ds, /* {{{ */
560 const value_list_t *vl,
564 int statuses[vl->values_len];
565 struct riemann_host *host = ud->data;
566 riemann_message_t *msg;
568 if (host->check_thresholds) {
569 status = write_riemann_threshold_check(ds, vl, statuses);
573 memset (statuses, 0, sizeof (statuses));
576 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
577 wrr_batch_add_value_list(host, ds, vl, statuses);
579 msg = wrr_value_list_to_message(host, ds, vl, statuses);
583 status = wrr_send(host, msg);
585 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
588 riemann_message_free(msg);
591 } /* }}} int wrr_write */
593 static void wrr_free(void *p) /* {{{ */
595 struct riemann_host *host = p;
600 pthread_mutex_lock(&host->lock);
602 host->reference_count--;
603 if (host->reference_count > 0)
605 pthread_mutex_unlock(&host->lock);
609 wrr_disconnect(host);
611 pthread_mutex_destroy(&host->lock);
613 } /* }}} void wrr_free */
615 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
617 struct riemann_host *host = NULL;
620 oconfig_item_t *child;
621 char callback_name[DATA_MAX_NAME_LEN];
624 if ((host = calloc(1, sizeof(*host))) == NULL) {
625 ERROR ("write_riemann plugin: calloc failed.");
628 pthread_mutex_init(&host->lock, NULL);
629 host->reference_count = 1;
632 host->notifications = 1;
633 host->check_thresholds = 0;
634 host->store_rates = 1;
635 host->always_append_ds = 0;
636 host->batch_mode = 1;
637 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
638 host->batch_init = cdtime();
639 host->ttl_factor = RIEMANN_TTL_FACTOR;
641 host->client_type = RIEMANN_CLIENT_TCP;
643 status = cf_util_get_string(ci, &host->name);
645 WARNING("write_riemann plugin: Required host name is missing.");
650 for (i = 0; i < ci->children_num; i++) {
652 * The code here could be simplified but makes room
653 * for easy adding of new options later on.
655 child = &ci->children[i];
658 if (strcasecmp("Host", child->key) == 0) {
659 status = cf_util_get_string(child, &host->node);
662 } else if (strcasecmp("Notifications", child->key) == 0) {
663 status = cf_util_get_boolean(child, &host->notifications);
666 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
667 status = cf_util_get_string(child, &host->event_service_prefix);
670 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
671 status = cf_util_get_boolean(child, &host->check_thresholds);
674 } else if (strcasecmp("Batch", child->key) == 0) {
675 status = cf_util_get_boolean(child, &host->batch_mode);
678 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
679 status = cf_util_get_int(child, &host->batch_max);
682 } else if (strcasecmp("Port", child->key) == 0) {
683 host->port = cf_util_get_port_number(child);
684 if (host->port == -1) {
685 ERROR("write_riemann plugin: Invalid argument "
686 "configured for the \"Port\" "
690 } else if (strcasecmp("Protocol", child->key) == 0) {
692 status = cf_util_get_string_buffer(child,
696 ERROR("write_riemann plugin: cf_util_get_"
697 "string_buffer failed with "
698 "status %i.", status);
702 if (strcasecmp("UDP", tmp) == 0)
703 host->client_type = RIEMANN_CLIENT_UDP;
704 else if (strcasecmp("TCP", tmp) == 0)
705 host->client_type = RIEMANN_CLIENT_TCP;
706 else if (strcasecmp("TLS", tmp) == 0)
707 host->client_type = RIEMANN_CLIENT_TLS;
709 WARNING("write_riemann plugin: The value "
710 "\"%s\" is not valid for the "
711 "\"Protocol\" option. Use "
712 "either \"UDP\", \"TCP\" or \"TLS\".",
714 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
715 status = cf_util_get_string(child, &host->tls_ca_file);
718 ERROR("write_riemann plugin: cf_util_get_"
719 "string_buffer failed with "
720 "status %i.", status);
723 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
724 status = cf_util_get_string(child, &host->tls_cert_file);
727 ERROR("write_riemann plugin: cf_util_get_"
728 "string_buffer failed with "
729 "status %i.", status);
732 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
733 status = cf_util_get_string(child, &host->tls_key_file);
736 ERROR("write_riemann plugin: cf_util_get_"
737 "string_buffer failed with "
738 "status %i.", status);
741 } else if (strcasecmp("StoreRates", child->key) == 0) {
742 status = cf_util_get_boolean(child, &host->store_rates);
745 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
746 status = cf_util_get_boolean(child,
747 &host->always_append_ds);
750 } else if (strcasecmp("TTLFactor", child->key) == 0) {
752 status = cf_util_get_double(child, &tmp);
756 host->ttl_factor = tmp;
757 } else if (tmp >= 1.0) {
758 NOTICE("write_riemann plugin: The configured "
759 "TTLFactor is very small "
760 "(%.1f). A value of 2.0 or "
761 "greater is recommended.",
763 host->ttl_factor = tmp;
764 } else if (tmp > 0.0) {
765 WARNING("write_riemann plugin: The configured "
766 "TTLFactor is too small to be "
767 "useful (%.1f). I'll use it "
768 "since the user knows best, "
769 "but under protest.",
771 host->ttl_factor = tmp;
772 } else { /* zero, negative and NAN */
773 ERROR("write_riemann plugin: The configured "
774 "TTLFactor is invalid (%.1f).",
778 WARNING("write_riemann plugin: ignoring unknown config "
779 "option: \"%s\"", child->key);
787 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
790 ud.free_func = wrr_free;
792 pthread_mutex_lock(&host->lock);
794 status = plugin_register_write(callback_name, wrr_write, &ud);
796 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
798 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
801 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
802 "failed with status %i.",
803 callback_name, status);
805 host->reference_count++;
807 status = plugin_register_notification(callback_name,
808 wrr_notification, &ud);
810 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
811 "failed with status %i.",
812 callback_name, status);
814 host->reference_count++;
816 if (host->reference_count <= 1)
818 /* Both callbacks failed => free memory.
819 * We need to unlock here, because riemann_free() will lock.
820 * This is not a race condition, because we're the only one
821 * holding a reference. */
822 pthread_mutex_unlock(&host->lock);
827 host->reference_count--;
828 pthread_mutex_unlock(&host->lock);
831 } /* }}} int wrr_config_node */
833 static int wrr_config(oconfig_item_t *ci) /* {{{ */
836 oconfig_item_t *child;
839 for (i = 0; i < ci->children_num; i++) {
840 child = &ci->children[i];
842 if (strcasecmp("Node", child->key) == 0) {
843 wrr_config_node (child);
844 } else if (strcasecmp(child->key, "attribute") == 0) {
848 if (child->values_num != 2) {
849 WARNING("riemann attributes need both a key and a value.");
852 if (child->values[0].type != OCONFIG_TYPE_STRING ||
853 child->values[1].type != OCONFIG_TYPE_STRING) {
854 WARNING("riemann attribute needs string arguments.");
857 if ((key = strdup(child->values[0].value.string)) == NULL) {
858 WARNING("cannot allocate memory for attribute key.");
861 if ((val = strdup(child->values[1].value.string)) == NULL) {
862 WARNING("cannot allocate memory for attribute value.");
866 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
867 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
868 DEBUG("write_riemann: got attr: %s => %s", key, val);
871 } else if (strcasecmp(child->key, "tag") == 0) {
873 status = cf_util_get_string(child, &tmp);
877 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
878 DEBUG("write_riemann plugin: Got tag: %s", tmp);
881 WARNING("write_riemann plugin: Ignoring unknown "
882 "configuration option \"%s\" at top level.",
887 } /* }}} int wrr_config */
889 void module_register(void)
891 plugin_register_complex_config("write_riemann", wrr_config);
894 /* vim: set sw=8 sts=8 ts=8 noet : */