#include "collectd.h"
-#include "plugin.h"
#include "common.h"
-#include "configfile.h"
+#include "plugin.h"
#include "utils_cache.h"
-#include "riemann.pb-c.h"
+#include "utils_complain.h"
#include "write_riemann_threshold.h"
- #include <errno.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netdb.h>
+#include <riemann/riemann-client.h>
-#define RIEMANN_HOST "localhost"
-#define RIEMANN_PORT "5555"
-#define RIEMANN_TTL_FACTOR 2.0
-#define RIEMANN_BATCH_MAX 8192
+#define RIEMANN_HOST "localhost"
+#define RIEMANN_PORT 5555
+#define RIEMANN_TTL_FACTOR 2.0
+#define RIEMANN_BATCH_MAX 8192
struct riemann_host {
- char *name;
- char *event_service_prefix;
-#define F_CONNECT 0x01
- uint8_t flags;
- pthread_mutex_t lock;
- _Bool batch_mode;
- _Bool notifications;
- _Bool check_thresholds;
- _Bool store_rates;
- _Bool always_append_ds;
- char *node;
- char *service;
- _Bool use_tcp;
- int s;
- double ttl_factor;
- Msg *batch_msg;
- cdtime_t batch_init;
- int batch_max;
- int reference_count;
+ c_complain_t init_complaint;
+ char *name;
+ char *event_service_prefix;
+ pthread_mutex_t lock;
+ _Bool batch_mode;
+ _Bool notifications;
+ _Bool check_thresholds;
+ _Bool store_rates;
+ _Bool always_append_ds;
+ char *node;
+ int port;
+ riemann_client_type_t client_type;
+ riemann_client_t *client;
+ double ttl_factor;
+ cdtime_t batch_init;
+ int batch_max;
+ int batch_timeout;
+ int reference_count;
+ riemann_message_t *batch_msg;
+ char *tls_ca_file;
+ char *tls_cert_file;
+ char *tls_key_file;
+ struct timeval timeout;
};
-static char **riemann_tags;
-static size_t riemann_tags_num;
-static char **riemann_attrs;
-static size_t riemann_attrs_num;
-
-static void riemann_event_protobuf_free (Event *event) /* {{{ */
-{
- size_t i;
-
- if (event == NULL)
- return;
-
- sfree (event->state);
- sfree (event->service);
- sfree (event->host);
- sfree (event->description);
-
- strarray_free (event->tags, event->n_tags);
- event->tags = NULL;
- event->n_tags = 0;
-
- for (i = 0; i < event->n_attributes; i++)
- {
- sfree (event->attributes[i]->key);
- sfree (event->attributes[i]->value);
- sfree (event->attributes[i]);
- }
- sfree (event->attributes);
- event->n_attributes = 0;
-
- sfree (event);
-} /* }}} void riemann_event_protobuf_free */
-
-static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
-{
- size_t i;
-
- if (msg == NULL)
- return;
-
- for (i = 0; i < msg->n_events; i++)
- {
- riemann_event_protobuf_free (msg->events[i]);
- msg->events[i] = NULL;
- }
-
- sfree (msg->events);
- msg->n_events = 0;
-
- sfree (msg);
-} /* }}} void riemann_msg_protobuf_free */
+static char **riemann_tags;
+static size_t riemann_tags_num;
+static char **riemann_attrs;
+static size_t riemann_attrs_num;
/* host->lock must be held when calling this function. */
-static int riemann_connect(struct riemann_host *host) /* {{{ */
+static int wrr_connect(struct riemann_host *host) /* {{{ */
{
- int e;
- struct addrinfo *ai, *res, hints;
- char const *node;
- char const *service;
-
- if (host->flags & F_CONNECT)
- return 0;
-
- memset(&hints, 0, sizeof(hints));
- memset(&service, 0, sizeof(service));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
-#ifdef AI_ADDRCONFIG
- hints.ai_flags |= AI_ADDRCONFIG;
+ char const *node;
+ int port;
+
+ if (host->client)
+ return 0;
+
+ node = (host->node != NULL) ? host->node : RIEMANN_HOST;
+ port = (host->port) ? host->port : RIEMANN_PORT;
+
+ host->client = NULL;
+
+ host->client = riemann_client_create(
+ host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
+ host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
+ host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
+ host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
+ if (host->client == NULL) {
+ c_complain(LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+ node, port);
+ return -1;
+ }
+#if RCC_VERSION_NUMBER >= 0x010800
+ if (host->timeout.tv_sec != 0) {
+ if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
+ riemann_client_free(host->client);
+ host->client = NULL;
+ c_complain(LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+ node, port);
+ return -1;
+ }
+ }
#endif
- node = (host->node != NULL) ? host->node : RIEMANN_HOST;
- service = (host->service != NULL) ? host->service : RIEMANN_PORT;
-
- if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
- ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
- node, gai_strerror(e));
- return -1;
- }
-
- host->s = -1;
- for (ai = res; ai != NULL; ai = ai->ai_next) {
- if ((host->s = socket(ai->ai_family,
- ai->ai_socktype,
- ai->ai_protocol)) == -1) {
- continue;
- }
-
- if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
- close(host->s);
- host->s = -1;
- continue;
- }
-
- host->flags |= F_CONNECT;
- DEBUG("write_riemann plugin: got a successful connection for: %s:%s",
- node, service);
- break;
- }
-
- freeaddrinfo(res);
-
- if (host->s < 0) {
- WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
- node, service);
- return -1;
- }
- return 0;
-} /* }}} int riemann_connect */
+ set_sock_opts(riemann_client_get_fd(host->client));
+
+ c_release(LOG_INFO, &host->init_complaint,
+ "write_riemann plugin: Successfully connected to %s:%d", node,
+ port);
+
+ return 0;
+} /* }}} int wrr_connect */
/* host->lock must be held when calling this function. */
-static int riemann_disconnect (struct riemann_host *host) /* {{{ */
+static int wrr_disconnect(struct riemann_host *host) /* {{{ */
{
- if ((host->flags & F_CONNECT) == 0)
- return (0);
+ if (!host->client)
+ return (0);
- close (host->s);
- host->s = -1;
- host->flags &= ~F_CONNECT;
+ riemann_client_free(host->client);
+ host->client = NULL;
- return (0);
-} /* }}} int riemann_disconnect */
+ return (0);
+} /* }}} int wrr_disconnect */
-static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */
-{
- int status = 0;
- u_char *buffer = NULL;
- size_t buffer_len;
-
- status = riemann_connect (host);
- if (status != 0)
- return status;
-
- buffer_len = msg__get_packed_size(msg);
-
- if (host->use_tcp)
- buffer_len += 4;
-
- buffer = malloc (buffer_len);
- if (buffer == NULL) {
- ERROR ("write_riemann plugin: malloc failed.");
- return ENOMEM;
- }
- memset (buffer, 0, buffer_len);
-
- if (host->use_tcp)
- {
- uint32_t length = htonl ((uint32_t) (buffer_len - 4));
- memcpy (buffer, &length, 4);
- msg__pack(msg, buffer + 4);
- }
- else
- {
- msg__pack(msg, buffer);
- }
-
- status = (int) swrite (host->s, buffer, buffer_len);
- if (status != 0)
- {
- char errbuf[1024];
- ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
- (host->node != NULL) ? host->node : RIEMANN_HOST,
- (host->service != NULL) ? host->service : RIEMANN_PORT,
- sstrerror (errno, errbuf, sizeof (errbuf)));
- sfree (buffer);
- return -1;
- }
-
- sfree (buffer);
- return 0;
-} /* }}} int riemann_send_msg */
-
-static int riemann_recv_ack(struct riemann_host *host) /* {{{ */
+/**
+ * Function to send messages to riemann.
+ *
+ * Acquires the host lock, disconnects on errors.
+ */
+static int wrr_send_nolock(struct riemann_host *host,
+ riemann_message_t *msg) /* {{{ */
{
- int status = 0;
- Msg *msg = NULL;
- uint32_t header;
+ int status = 0;
- status = (int) sread (host->s, &header, 4);
+ status = wrr_connect(host);
+ if (status != 0) {
+ return status;
+ }
- if (status != 0)
- return -1;
+ status = riemann_client_send_message(host->client, msg);
+ if (status != 0) {
+ wrr_disconnect(host);
+ return status;
+ }
- size_t size = ntohl(header);
+ /*
+ * For TCP we need to receive message acknowledgemenent.
+ */
+ if (host->client_type != RIEMANN_CLIENT_UDP) {
+ riemann_message_t *response;
- // Buffer on the stack since acknowledges are typically small.
- u_char buffer[size];
- memset (buffer, 0, size);
+ response = riemann_client_recv_message(host->client);
- status = (int) sread (host->s, buffer, size);
+ if (response == NULL) {
+ wrr_disconnect(host);
+ return errno;
+ }
+ riemann_message_free(response);
+ }
- if (status != 0)
- return status;
+ return 0;
+} /* }}} int wrr_send */
- msg = msg__unpack (NULL, size, buffer);
+static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
+ int status = 0;
- if (msg == NULL)
- return -1;
+ pthread_mutex_lock(&host->lock);
+ status = wrr_send_nolock(host, msg);
+ pthread_mutex_unlock(&host->lock);
+ return status;
+}
- if (!msg->ok)
- {
- ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
- (host->node != NULL) ? host->node : RIEMANN_HOST,
- (host->service != NULL) ? host->service : RIEMANN_PORT,
- msg->error);
+static riemann_message_t *
+wrr_notification_to_message(struct riemann_host *host, /* {{{ */
+ notification_t const *n) {
+ riemann_message_t *msg;
+ riemann_event_t *event;
+ char service_buffer[6 * DATA_MAX_NAME_LEN];
+ char const *severity;
+
+ switch (n->severity) {
+ case NOTIF_OKAY:
+ severity = "ok";
+ break;
+ case NOTIF_WARNING:
+ severity = "warning";
+ break;
+ case NOTIF_FAILURE:
+ severity = "critical";
+ break;
+ default:
+ severity = "unknown";
+ }
+
+ format_name(service_buffer, sizeof(service_buffer),
+ /* host = */ "", n->plugin, n->plugin_instance, n->type,
+ n->type_instance);
+
+ event = riemann_event_create(
+ RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
+ (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
+ "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
+ RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
+ RIEMANN_EVENT_FIELD_NONE);
+
+ if (n->host[0] != 0)
+ riemann_event_string_attribute_add(event, "host", n->host);
+ if (n->plugin[0] != 0)
+ riemann_event_string_attribute_add(event, "plugin", n->plugin);
+ if (n->plugin_instance[0] != 0)
+ riemann_event_string_attribute_add(event, "plugin_instance",
+ n->plugin_instance);
+
+ if (n->type[0] != 0)
+ riemann_event_string_attribute_add(event, "type", n->type);
+ if (n->type_instance[0] != 0)
+ riemann_event_string_attribute_add(event, "type_instance",
+ n->type_instance);
+
+ for (size_t i = 0; i < riemann_attrs_num; i += 2)
+ riemann_event_string_attribute_add(event, riemann_attrs[i],
+ riemann_attrs[i + 1]);
+
+ for (size_t i = 0; i < riemann_tags_num; i++)
+ riemann_event_tag_add(event, riemann_tags[i]);
+
+ if (n->message[0] != 0)
+ riemann_event_string_attribute_add(event, "description", n->message);
+
+ /* Pull in values from threshold and add extra attributes */
+ for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
+ if (strcasecmp("CurrentValue", meta->name) == 0 &&
+ meta->type == NM_TYPE_DOUBLE) {
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
+ (double)meta->nm_value.nm_double,
+ RIEMANN_EVENT_FIELD_NONE);
+ continue;
+ }
- msg__free_unpacked(msg, NULL);
- return -1;
- }
+ if (meta->type == NM_TYPE_STRING) {
+ riemann_event_string_attribute_add(event, meta->name,
+ meta->nm_value.nm_string);
+ continue;
+ }
+ }
+
+ msg = riemann_message_create_with_events(event, NULL);
+ if (msg == NULL) {
+ ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
+ riemann_event_free(event);
+ return (NULL);
+ }
+
+ DEBUG("write_riemann plugin: Successfully created message for notification: "
+ "host = \"%s\", service = \"%s\", state = \"%s\"",
+ event->host, event->service, event->state);
+ return (msg);
+} /* }}} riemann_message_t *wrr_notification_to_message */
+
+static riemann_event_t *
+wrr_value_to_event(struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl, size_t index,
+ gauge_t const *rates, int status) {
+ riemann_event_t *event;
+ char name_buffer[5 * DATA_MAX_NAME_LEN];
+ char service_buffer[6 * DATA_MAX_NAME_LEN];
+ size_t i;
+
+ event = riemann_event_new();
+ if (event == NULL) {
+ ERROR("write_riemann plugin: riemann_event_new() failed.");
+ return (NULL);
+ }
+
+ format_name(name_buffer, sizeof(name_buffer),
+ /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
+ vl->type_instance);
+ if (host->always_append_ds || (ds->ds_num > 1)) {
+ if (host->event_service_prefix == NULL)
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
+ &name_buffer[1], ds->ds[index].name);
+ else
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
+ host->event_service_prefix, &name_buffer[1],
+ ds->ds[index].name);
+ } else {
+ if (host->event_service_prefix == NULL)
+ sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
+ else
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
+ host->event_service_prefix, &name_buffer[1]);
+ }
+
+ riemann_event_set(
+ event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
+ (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
+ (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
+ RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
+ vl->type, "ds_name", ds->ds[index].name, NULL,
+ RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
+
+ if (host->check_thresholds) {
+ const char *state = NULL;
+
+ switch (status) {
+ case STATE_OKAY:
+ state = "ok";
+ break;
+ case STATE_ERROR:
+ state = "critical";
+ break;
+ case STATE_WARNING:
+ state = "warning";
+ break;
+ case STATE_MISSING:
+ state = "unknown";
+ break;
+ }
+ if (state)
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
+ RIEMANN_EVENT_FIELD_NONE);
+ }
+
+ if (vl->plugin_instance[0] != 0)
+ riemann_event_string_attribute_add(event, "plugin_instance",
+ vl->plugin_instance);
+ if (vl->type_instance[0] != 0)
+ riemann_event_string_attribute_add(event, "type_instance",
+ vl->type_instance);
+
+ if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
+ char ds_type[DATA_MAX_NAME_LEN];
+
+ ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
+ riemann_event_string_attribute_add(event, "ds_type", ds_type);
+ } else {
+ riemann_event_string_attribute_add(event, "ds_type",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
+ }
+
+ {
+ char ds_index[DATA_MAX_NAME_LEN];
+
+ ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
+ riemann_event_string_attribute_add(event, "ds_index", ds_index);
+ }
+
+ for (i = 0; i < riemann_attrs_num; i += 2)
+ riemann_event_string_attribute_add(event, riemann_attrs[i],
+ riemann_attrs[i + 1]);
+
+ for (i = 0; i < riemann_tags_num; i++)
+ riemann_event_tag_add(event, riemann_tags[i]);
+
+ if (ds->ds[index].type == DS_TYPE_GAUGE) {
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
+ (double)vl->values[index].gauge,
+ RIEMANN_EVENT_FIELD_NONE);
+ } else if (rates != NULL) {
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
+ RIEMANN_EVENT_FIELD_NONE);
+ } else {
+ int64_t metric;
+
+ if (ds->ds[index].type == DS_TYPE_DERIVE)
+ metric = (int64_t)vl->values[index].derive;
+ else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
+ metric = (int64_t)vl->values[index].absolute;
+ else
+ metric = (int64_t)vl->values[index].counter;
+
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
+ RIEMANN_EVENT_FIELD_NONE);
+ }
+
+ DEBUG("write_riemann plugin: Successfully created message for metric: "
+ "host = \"%s\", service = \"%s\"",
+ event->host, event->service);
+ return (event);
+} /* }}} riemann_event_t *wrr_value_to_event */
+
+static riemann_message_t *
+wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ int *statuses) {
+ riemann_message_t *msg;
+ size_t i;
+ gauge_t *rates = NULL;
+
+ /* Initialize the Msg structure. */
+ msg = riemann_message_new();
+ if (msg == NULL) {
+ ERROR("write_riemann plugin: riemann_message_new failed.");
+ return (NULL);
+ }
+
+ if (host->store_rates) {
+ rates = uc_get_rate(ds, vl);
+ if (rates == NULL) {
+ ERROR("write_riemann plugin: uc_get_rate failed.");
+ riemann_message_free(msg);
+ return (NULL);
+ }
+ }
- msg__free_unpacked (msg, NULL);
- return 0;
-} /* }}} int riemann_recv_ack */
+ for (i = 0; i < vl->values_len; i++) {
+ riemann_event_t *event;
-/**
- * Function to send messages (Msg) to riemann.
- *
- * Acquires the host lock, disconnects on errors.
- */
-static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */
-{
- int status = 0;
- pthread_mutex_lock (&host->lock);
-
- status = riemann_send_msg(host, msg);
- if (status != 0) {
- riemann_disconnect (host);
- pthread_mutex_unlock (&host->lock);
- return status;
- }
-
- /*
- * For TCP we need to receive message acknowledgemenent.
- */
- if (host->use_tcp)
- {
- status = riemann_recv_ack(host);
-
- if (status != 0)
- {
- riemann_disconnect (host);
- pthread_mutex_unlock (&host->lock);
- return status;
- }
- }
-
- pthread_mutex_unlock (&host->lock);
- return 0;
-} /* }}} int riemann_send */
-
-static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
-{
- return (strarray_add (&event->tags, &event->n_tags, tag));
-} /* }}} int riemann_event_add_tag */
-
-static int riemann_event_add_attribute(Event *event, /* {{{ */
- char const *key, char const *value)
-{
- Attribute **new_attributes;
- Attribute *a;
-
- new_attributes = realloc (event->attributes,
- sizeof (*event->attributes) * (event->n_attributes + 1));
- if (new_attributes == NULL)
- {
- ERROR ("write_riemann plugin: realloc failed.");
- return (ENOMEM);
- }
- event->attributes = new_attributes;
-
- a = malloc (sizeof (*a));
- if (a == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- return (ENOMEM);
- }
- attribute__init (a);
-
- a->key = strdup (key);
- if (value != NULL)
- a->value = strdup (value);
-
- event->attributes[event->n_attributes] = a;
- event->n_attributes++;
-
- return (0);
-} /* }}} int riemann_event_add_attribute */
-
-static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
- notification_t const *n)
-{
- Msg *msg;
- Event *event;
- char service_buffer[6 * DATA_MAX_NAME_LEN];
- char const *severity;
- notification_meta_t *meta;
- int i;
-
- msg = malloc (sizeof (*msg));
- if (msg == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- return (NULL);
- }
- memset (msg, 0, sizeof (*msg));
- msg__init (msg);
-
- msg->events = malloc (sizeof (*msg->events));
- if (msg->events == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- sfree (msg);
- return (NULL);
- }
-
- event = malloc (sizeof (*event));
- if (event == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- sfree (msg->events);
- sfree (msg);
- return (NULL);
- }
- memset (event, 0, sizeof (*event));
- event__init (event);
-
- msg->events[0] = event;
- msg->n_events = 1;
-
- event->host = strdup (n->host);
- event->time = CDTIME_T_TO_TIME_T (n->time);
- event->has_time = 1;
-
- switch (n->severity)
- {
- case NOTIF_OKAY: severity = "ok"; break;
- case NOTIF_WARNING: severity = "warning"; break;
- case NOTIF_FAILURE: severity = "critical"; break;
- default: severity = "unknown";
- }
- event->state = strdup (severity);
-
- riemann_event_add_tag (event, "notification");
- if (n->host[0] != 0)
- riemann_event_add_attribute (event, "host", n->host);
- if (n->plugin[0] != 0)
- riemann_event_add_attribute (event, "plugin", n->plugin);
- if (n->plugin_instance[0] != 0)
- riemann_event_add_attribute (event, "plugin_instance",
- n->plugin_instance);
-
- if (n->type[0] != 0)
- riemann_event_add_attribute (event, "type", n->type);
- if (n->type_instance[0] != 0)
- riemann_event_add_attribute (event, "type_instance",
- n->type_instance);
-
- for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_add_attribute(event,
- riemann_attrs[i],
- riemann_attrs[i +1]);
-
- for (i = 0; i < riemann_tags_num; i++)
- riemann_event_add_tag (event, riemann_tags[i]);
-
- format_name (service_buffer, sizeof (service_buffer),
- /* host = */ "", n->plugin, n->plugin_instance,
- n->type, n->type_instance);
- event->service = strdup (&service_buffer[1]);
-
- if (n->message[0] != 0)
- riemann_event_add_attribute (event, "description", n->message);
-
- /* Pull in values from threshold and add extra attributes */
- for (meta = n->meta; meta != NULL; meta = meta->next)
- {
- if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
- {
- event->metric_d = meta->nm_value.nm_double;
- event->has_metric_d = 1;
- continue;
- }
-
- if (meta->type == NM_TYPE_STRING) {
- riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string);
- continue;
- }
- }
-
- DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
- "host = \"%s\", service = \"%s\", state = \"%s\"",
- event->host, event->service, event->state);
- return (msg);
-} /* }}} Msg *riemann_notification_to_protobuf */
-
-static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
- data_set_t const *ds,
- value_list_t const *vl, size_t index,
- gauge_t const *rates,
- int status)
-{
- Event *event;
- char name_buffer[5 * DATA_MAX_NAME_LEN];
- char service_buffer[6 * DATA_MAX_NAME_LEN];
- double ttl;
- int i;
-
- event = malloc (sizeof (*event));
- if (event == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- return (NULL);
- }
- memset (event, 0, sizeof (*event));
- event__init (event);
-
- event->host = strdup (vl->host);
- event->time = CDTIME_T_TO_TIME_T (vl->time);
- event->has_time = 1;
-
- if (host->check_thresholds) {
- switch (status) {
- case STATE_OKAY:
- event->state = strdup("ok");
- break;
- case STATE_ERROR:
- event->state = strdup("critical");
- break;
- case STATE_WARNING:
- event->state = strdup("warning");
- break;
- case STATE_MISSING:
- event->state = strdup("unknown");
- break;
- }
- }
-
- ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
- event->ttl = (float) ttl;
- event->has_ttl = 1;
-
- riemann_event_add_attribute (event, "plugin", vl->plugin);
- if (vl->plugin_instance[0] != 0)
- riemann_event_add_attribute (event, "plugin_instance",
- vl->plugin_instance);
-
- riemann_event_add_attribute (event, "type", vl->type);
- if (vl->type_instance[0] != 0)
- riemann_event_add_attribute (event, "type_instance",
- vl->type_instance);
-
- if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
- {
- char ds_type[DATA_MAX_NAME_LEN];
-
- ssnprintf (ds_type, sizeof (ds_type), "%s:rate",
- DS_TYPE_TO_STRING(ds->ds[index].type));
- riemann_event_add_attribute (event, "ds_type", ds_type);
- }
- else
- {
- riemann_event_add_attribute (event, "ds_type",
- DS_TYPE_TO_STRING(ds->ds[index].type));
- }
- riemann_event_add_attribute (event, "ds_name", ds->ds[index].name);
- {
- char ds_index[DATA_MAX_NAME_LEN];
-
- ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
- riemann_event_add_attribute (event, "ds_index", ds_index);
- }
-
- for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_add_attribute(event,
- riemann_attrs[i],
- riemann_attrs[i +1]);
-
- for (i = 0; i < riemann_tags_num; i++)
- riemann_event_add_tag (event, riemann_tags[i]);
-
- if (ds->ds[index].type == DS_TYPE_GAUGE)
- {
- event->has_metric_d = 1;
- event->metric_d = (double) vl->values[index].gauge;
- }
- else if (rates != NULL)
- {
- event->has_metric_d = 1;
- event->metric_d = (double) rates[index];
- }
- else
- {
- event->has_metric_sint64 = 1;
- if (ds->ds[index].type == DS_TYPE_DERIVE)
- event->metric_sint64 = (int64_t) vl->values[index].derive;
- else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
- event->metric_sint64 = (int64_t) vl->values[index].absolute;
- else
- event->metric_sint64 = (int64_t) vl->values[index].counter;
- }
-
- format_name (name_buffer, sizeof (name_buffer),
- /* host = */ "", vl->plugin, vl->plugin_instance,
- vl->type, vl->type_instance);
- if (host->always_append_ds || (ds->ds_num > 1))
- {
- if (host->event_service_prefix == NULL)
- ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s",
- &name_buffer[1], ds->ds[index].name);
- else
- ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s",
- host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
- }
- else
- {
- if (host->event_service_prefix == NULL)
- sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer));
- else
- ssnprintf (service_buffer, sizeof (service_buffer), "%s%s",
- host->event_service_prefix, &name_buffer[1]);
- }
-
- event->service = strdup (service_buffer);
-
- DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
- "host = \"%s\", service = \"%s\"",
- event->host, event->service);
- return (event);
-} /* }}} Event *riemann_value_to_protobuf */
-
-static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
- data_set_t const *ds,
- value_list_t const *vl,
- int *statuses)
-{
- Msg *msg;
- size_t i;
- gauge_t *rates = NULL;
-
- /* Initialize the Msg structure. */
- msg = malloc (sizeof (*msg));
- if (msg == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- return (NULL);
- }
- memset (msg, 0, sizeof (*msg));
- msg__init (msg);
-
- /* Set up events. First, the list of pointers. */
- msg->n_events = (size_t) vl->values_len;
- msg->events = calloc (msg->n_events, sizeof (*msg->events));
- if (msg->events == NULL)
- {
- ERROR ("write_riemann plugin: calloc failed.");
- riemann_msg_protobuf_free (msg);
- return (NULL);
- }
-
- if (host->store_rates)
- {
- rates = uc_get_rate (ds, vl);
- if (rates == NULL)
- {
- ERROR ("write_riemann plugin: uc_get_rate failed.");
- riemann_msg_protobuf_free (msg);
- return (NULL);
- }
- }
-
- for (i = 0; i < msg->n_events; i++)
- {
- msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
- (int) i, rates, statuses[i]);
- if (msg->events[i] == NULL)
- {
- riemann_msg_protobuf_free (msg);
- sfree (rates);
- return (NULL);
- }
- }
-
- sfree (rates);
- return (msg);
-} /* }}} Msg *riemann_value_list_to_protobuf */
+ event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
+ if (event == NULL) {
+ riemann_message_free(msg);
+ sfree(rates);
+ return (NULL);
+ }
+ riemann_message_append_events(msg, event, NULL);
+ }
+ sfree(rates);
+ return (msg);
+} /* }}} riemann_message_t *wrr_value_list_to_message */
/*
* Always call while holding host->lock !