From: Florian Forster Date: Mon, 7 Nov 2016 07:43:17 +0000 (+0100) Subject: Merge branch 'collectd-5.5' into collectd-5.6 X-Git-Tag: collectd-5.6.2~10 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=1326af38b3ef25c41c994cd76c043202636b3d70;p=collectd.git Merge branch 'collectd-5.5' into collectd-5.6 --- 1326af38b3ef25c41c994cd76c043202636b3d70 diff --cc src/apcups.c index af5f24c7,eb1f0622..70e7fcea --- a/src/apcups.c +++ b/src/apcups.c @@@ -252,8 -263,8 +252,8 @@@ static int net_send (int *sockfd, cons } /* Get and print status from apcupsd NIS server */ -static int apc_query_server (char *host, int port, +static int apc_query_server (char const *node, char const *service, - struct apc_detail_s *apcups_detail) + apc_detail_t *apcups_detail) { int n; char recvline[1024]; @@@ -378,43 -390,43 +379,43 @@@ return (0); } -static int apcups_config (const char *key, const char *value) +static int apcups_config (oconfig_item_t *ci) { - if (strcasecmp (key, "host") == 0) - { - if (conf_host != NULL) - { - free (conf_host); - conf_host = NULL; - } - if ((conf_host = strdup (value)) == NULL) - return (1); - } - else if (strcasecmp (key, "Port") == 0) + _Bool persistent_conn_set = 0; + + for (int i = 0; i < ci->children_num; i++) { - int port_tmp = atoi (value); - if (port_tmp < 1 || port_tmp > 65535) - { - WARNING ("apcups plugin: Invalid port: %i", port_tmp); - return (1); + oconfig_item_t *child = ci->children + i; + + if (strcasecmp (child->key, "Host") == 0) + cf_util_get_string (child, &conf_node); + else if (strcasecmp (child->key, "Port") == 0) + cf_util_get_service (child, &conf_service); + else if (strcasecmp (child->key, "ReportSeconds") == 0) + cf_util_get_boolean (child, &conf_report_seconds); + else if (strcasecmp (child->key, "PersistentConnection") == 0) { + cf_util_get_boolean (child, &conf_persistent_conn); + persistent_conn_set = 1; } - conf_port = port_tmp; - } - else if (strcasecmp (key, "ReportSeconds") == 0) - { - if (IS_TRUE (value)) - conf_report_seconds = 1; else - conf_report_seconds = 0; + ERROR ("apcups plugin: Unknown config option \"%s\".", child->key); } - else - { - return (-1); + + if (!persistent_conn_set) { + double interval = CDTIME_T_TO_DOUBLE(plugin_get_interval()); + if (interval > APCUPS_SERVER_TIMEOUT) { + NOTICE ("apcups plugin: Plugin poll interval set to %.3f seconds. " + "Apcupsd NIS socket timeout is %.3f seconds, " + "PersistentConnection disabled by default.", + interval, APCUPS_SERVER_TIMEOUT); + conf_persistent_conn = 0; + } } + return (0); -} +} /* int apcups_config */ - static void apc_submit_generic (const char *type, const char *type_inst, double value) -static void apc_submit_generic (char *type, char *type_inst, gauge_t value) ++static void apc_submit_generic (const char *type, const char *type_inst, gauge_t value) { value_t values[1]; value_list_t vl = VALUE_LIST_INIT; @@@ -446,33 -461,27 +450,27 @@@ static void apc_submit (apc_detail_t co static int apcups_read (void) { - struct apc_detail_s apcups_detail; - int status; + apc_detail_t apcups_detail = { + .linev = NAN, + .outputv = NAN, + .battv = NAN, + .loadpct = NAN, + .bcharge = NAN, + .timeleft = NAN, + .itemp = NAN, + .linefreq = NAN, + }; - apcups_detail.linev = -1.0; - apcups_detail.outputv = -1.0; - apcups_detail.battv = -1.0; - apcups_detail.loadpct = -1.0; - apcups_detail.bcharge = -1.0; - apcups_detail.timeleft = NAN; - apcups_detail.itemp = -300.0; - apcups_detail.linefreq = -1.0; - - status = apc_query_server ((conf_node == NULL) ? APCUPS_DEFAULT_NODE : conf_node, - (conf_service == NULL) ? APCUPS_DEFAULT_SERVICE : conf_service, - &apcups_detail); - - /* - * if we did not connect then do not bother submitting - * zeros. We want rrd files to have NAN. - */ - int status = apc_query_server (conf_host == NULL - ? APCUPS_DEFAULT_HOST - : conf_host, - conf_port, &apcups_detail); ++ int status = apc_query_server (conf_node == NULL ++ ? APCUPS_DEFAULT_NODE ++ : conf_node, ++ conf_service, &apcups_detail); if (status != 0) { - DEBUG ("apcups plugin: apc_query_server (%s, %s) = %i", - (conf_node == NULL) ? APCUPS_DEFAULT_NODE : conf_node, - (conf_service == NULL) ? APCUPS_DEFAULT_SERVICE : conf_service, - status); - return (-1); - DEBUG ("apcups plugin: apc_query_server (%s, %i) = %i", - conf_host == NULL ? APCUPS_DEFAULT_HOST : conf_host, - conf_port, status); ++ DEBUG ("apcups plugin: apc_query_server (\"%s\", \"%s\") = %d", ++ conf_node == NULL ? APCUPS_DEFAULT_NODE : conf_node, ++ conf_service, status); + return (status); } apc_submit (&apcups_detail); diff --cc src/write_riemann.c index 20f2e10b,a6effa34..92c8d0ca --- a/src/write_riemann.c +++ b/src/write_riemann.c @@@ -30,407 -28,630 +30,406 @@@ #include "collectd.h" -#include "plugin.h" #include "common.h" -#include "configfile.h" +#include "plugin.h" #include "utils_cache.h" -#include "riemann.pb-c.h" +#include "utils_complain.h" #include "write_riemann_threshold.h" - #include -#include -#include -#include +#include -#define RIEMANN_HOST "localhost" -#define RIEMANN_PORT "5555" -#define RIEMANN_TTL_FACTOR 2.0 -#define RIEMANN_BATCH_MAX 8192 +#define RIEMANN_HOST "localhost" +#define RIEMANN_PORT 5555 +#define RIEMANN_TTL_FACTOR 2.0 +#define RIEMANN_BATCH_MAX 8192 struct riemann_host { - char *name; - char *event_service_prefix; -#define F_CONNECT 0x01 - uint8_t flags; - pthread_mutex_t lock; - _Bool batch_mode; - _Bool notifications; - _Bool check_thresholds; - _Bool store_rates; - _Bool always_append_ds; - char *node; - char *service; - _Bool use_tcp; - int s; - double ttl_factor; - Msg *batch_msg; - cdtime_t batch_init; - int batch_max; - int reference_count; + c_complain_t init_complaint; + char *name; + char *event_service_prefix; + pthread_mutex_t lock; + _Bool batch_mode; + _Bool notifications; + _Bool check_thresholds; + _Bool store_rates; + _Bool always_append_ds; + char *node; + int port; + riemann_client_type_t client_type; + riemann_client_t *client; + double ttl_factor; + cdtime_t batch_init; + int batch_max; + int batch_timeout; + int reference_count; + riemann_message_t *batch_msg; + char *tls_ca_file; + char *tls_cert_file; + char *tls_key_file; + struct timeval timeout; }; -static char **riemann_tags; -static size_t riemann_tags_num; -static char **riemann_attrs; -static size_t riemann_attrs_num; - -static void riemann_event_protobuf_free (Event *event) /* {{{ */ -{ - size_t i; - - if (event == NULL) - return; - - sfree (event->state); - sfree (event->service); - sfree (event->host); - sfree (event->description); - - strarray_free (event->tags, event->n_tags); - event->tags = NULL; - event->n_tags = 0; - - for (i = 0; i < event->n_attributes; i++) - { - sfree (event->attributes[i]->key); - sfree (event->attributes[i]->value); - sfree (event->attributes[i]); - } - sfree (event->attributes); - event->n_attributes = 0; - - sfree (event); -} /* }}} void riemann_event_protobuf_free */ - -static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */ -{ - size_t i; - - if (msg == NULL) - return; - - for (i = 0; i < msg->n_events; i++) - { - riemann_event_protobuf_free (msg->events[i]); - msg->events[i] = NULL; - } - - sfree (msg->events); - msg->n_events = 0; - - sfree (msg); -} /* }}} void riemann_msg_protobuf_free */ +static char **riemann_tags; +static size_t riemann_tags_num; +static char **riemann_attrs; +static size_t riemann_attrs_num; /* host->lock must be held when calling this function. */ -static int riemann_connect(struct riemann_host *host) /* {{{ */ +static int wrr_connect(struct riemann_host *host) /* {{{ */ { - int e; - struct addrinfo *ai, *res, hints; - char const *node; - char const *service; - - if (host->flags & F_CONNECT) - return 0; - - memset(&hints, 0, sizeof(hints)); - memset(&service, 0, sizeof(service)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM; -#ifdef AI_ADDRCONFIG - hints.ai_flags |= AI_ADDRCONFIG; + char const *node; + int port; + + if (host->client) + return 0; + + node = (host->node != NULL) ? host->node : RIEMANN_HOST; + port = (host->port) ? host->port : RIEMANN_PORT; + + host->client = NULL; + + host->client = riemann_client_create( + host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE, + host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, + host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, + host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE); + if (host->client == NULL) { + c_complain(LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); + return -1; + } +#if RCC_VERSION_NUMBER >= 0x010800 + if (host->timeout.tv_sec != 0) { + if (riemann_client_set_timeout(host->client, &host->timeout) != 0) { + riemann_client_free(host->client); + host->client = NULL; + c_complain(LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); + return -1; + } + } #endif - node = (host->node != NULL) ? host->node : RIEMANN_HOST; - service = (host->service != NULL) ? host->service : RIEMANN_PORT; - - if ((e = getaddrinfo(node, service, &hints, &res)) != 0) { - ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s", - node, gai_strerror(e)); - return -1; - } - - host->s = -1; - for (ai = res; ai != NULL; ai = ai->ai_next) { - if ((host->s = socket(ai->ai_family, - ai->ai_socktype, - ai->ai_protocol)) == -1) { - continue; - } - - if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) { - close(host->s); - host->s = -1; - continue; - } - - host->flags |= F_CONNECT; - DEBUG("write_riemann plugin: got a successful connection for: %s:%s", - node, service); - break; - } - - freeaddrinfo(res); - - if (host->s < 0) { - WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s", - node, service); - return -1; - } - return 0; -} /* }}} int riemann_connect */ + set_sock_opts(riemann_client_get_fd(host->client)); + + c_release(LOG_INFO, &host->init_complaint, + "write_riemann plugin: Successfully connected to %s:%d", node, + port); + + return 0; +} /* }}} int wrr_connect */ /* host->lock must be held when calling this function. */ -static int riemann_disconnect (struct riemann_host *host) /* {{{ */ +static int wrr_disconnect(struct riemann_host *host) /* {{{ */ { - if ((host->flags & F_CONNECT) == 0) - return (0); + if (!host->client) + return (0); - close (host->s); - host->s = -1; - host->flags &= ~F_CONNECT; + riemann_client_free(host->client); + host->client = NULL; - return (0); -} /* }}} int riemann_disconnect */ + return (0); +} /* }}} int wrr_disconnect */ -static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */ -{ - int status = 0; - u_char *buffer = NULL; - size_t buffer_len; - - status = riemann_connect (host); - if (status != 0) - return status; - - buffer_len = msg__get_packed_size(msg); - - if (host->use_tcp) - buffer_len += 4; - - buffer = malloc (buffer_len); - if (buffer == NULL) { - ERROR ("write_riemann plugin: malloc failed."); - return ENOMEM; - } - memset (buffer, 0, buffer_len); - - if (host->use_tcp) - { - uint32_t length = htonl ((uint32_t) (buffer_len - 4)); - memcpy (buffer, &length, 4); - msg__pack(msg, buffer + 4); - } - else - { - msg__pack(msg, buffer); - } - - status = (int) swrite (host->s, buffer, buffer_len); - if (status != 0) - { - char errbuf[1024]; - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s", - (host->node != NULL) ? host->node : RIEMANN_HOST, - (host->service != NULL) ? host->service : RIEMANN_PORT, - sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (buffer); - return -1; - } - - sfree (buffer); - return 0; -} /* }}} int riemann_send_msg */ - -static int riemann_recv_ack(struct riemann_host *host) /* {{{ */ +/** + * Function to send messages to riemann. + * + * Acquires the host lock, disconnects on errors. + */ +static int wrr_send_nolock(struct riemann_host *host, + riemann_message_t *msg) /* {{{ */ { - int status = 0; - Msg *msg = NULL; - uint32_t header; + int status = 0; - status = (int) sread (host->s, &header, 4); + status = wrr_connect(host); + if (status != 0) { + return status; + } - if (status != 0) - return -1; + status = riemann_client_send_message(host->client, msg); + if (status != 0) { + wrr_disconnect(host); + return status; + } - size_t size = ntohl(header); + /* + * For TCP we need to receive message acknowledgemenent. + */ + if (host->client_type != RIEMANN_CLIENT_UDP) { + riemann_message_t *response; - // Buffer on the stack since acknowledges are typically small. - u_char buffer[size]; - memset (buffer, 0, size); + response = riemann_client_recv_message(host->client); - status = (int) sread (host->s, buffer, size); + if (response == NULL) { + wrr_disconnect(host); + return errno; + } + riemann_message_free(response); + } - if (status != 0) - return status; + return 0; +} /* }}} int wrr_send */ - msg = msg__unpack (NULL, size, buffer); +static int wrr_send(struct riemann_host *host, riemann_message_t *msg) { + int status = 0; - if (msg == NULL) - return -1; + pthread_mutex_lock(&host->lock); + status = wrr_send_nolock(host, msg); + pthread_mutex_unlock(&host->lock); + return status; +} - if (!msg->ok) - { - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s", - (host->node != NULL) ? host->node : RIEMANN_HOST, - (host->service != NULL) ? host->service : RIEMANN_PORT, - msg->error); +static riemann_message_t * +wrr_notification_to_message(struct riemann_host *host, /* {{{ */ + notification_t const *n) { + riemann_message_t *msg; + riemann_event_t *event; + char service_buffer[6 * DATA_MAX_NAME_LEN]; + char const *severity; + + switch (n->severity) { + case NOTIF_OKAY: + severity = "ok"; + break; + case NOTIF_WARNING: + severity = "warning"; + break; + case NOTIF_FAILURE: + severity = "critical"; + break; + default: + severity = "unknown"; + } + + format_name(service_buffer, sizeof(service_buffer), + /* host = */ "", n->plugin, n->plugin_instance, n->type, + n->type_instance); + + event = riemann_event_create( + RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME, + (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS, + "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity, + RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1], + RIEMANN_EVENT_FIELD_NONE); + + if (n->host[0] != 0) + riemann_event_string_attribute_add(event, "host", n->host); + if (n->plugin[0] != 0) + riemann_event_string_attribute_add(event, "plugin", n->plugin); + if (n->plugin_instance[0] != 0) + riemann_event_string_attribute_add(event, "plugin_instance", + n->plugin_instance); + + if (n->type[0] != 0) + riemann_event_string_attribute_add(event, "type", n->type); + if (n->type_instance[0] != 0) + riemann_event_string_attribute_add(event, "type_instance", + n->type_instance); + + for (size_t i = 0; i < riemann_attrs_num; i += 2) + riemann_event_string_attribute_add(event, riemann_attrs[i], + riemann_attrs[i + 1]); + + for (size_t i = 0; i < riemann_tags_num; i++) + riemann_event_tag_add(event, riemann_tags[i]); + + if (n->message[0] != 0) + riemann_event_string_attribute_add(event, "description", n->message); + + /* Pull in values from threshold and add extra attributes */ + for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) { + if (strcasecmp("CurrentValue", meta->name) == 0 && + meta->type == NM_TYPE_DOUBLE) { + riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, + (double)meta->nm_value.nm_double, + RIEMANN_EVENT_FIELD_NONE); + continue; + } - msg__free_unpacked(msg, NULL); - return -1; - } + if (meta->type == NM_TYPE_STRING) { + riemann_event_string_attribute_add(event, meta->name, + meta->nm_value.nm_string); + continue; + } + } + + msg = riemann_message_create_with_events(event, NULL); + if (msg == NULL) { + ERROR("write_riemann plugin: riemann_message_create_with_events() failed."); + riemann_event_free(event); + return (NULL); + } + + DEBUG("write_riemann plugin: Successfully created message for notification: " + "host = \"%s\", service = \"%s\", state = \"%s\"", + event->host, event->service, event->state); + return (msg); +} /* }}} riemann_message_t *wrr_notification_to_message */ + +static riemann_event_t * +wrr_value_to_event(struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, value_list_t const *vl, size_t index, + gauge_t const *rates, int status) { + riemann_event_t *event; + char name_buffer[5 * DATA_MAX_NAME_LEN]; + char service_buffer[6 * DATA_MAX_NAME_LEN]; + size_t i; + + event = riemann_event_new(); + if (event == NULL) { + ERROR("write_riemann plugin: riemann_event_new() failed."); + return (NULL); + } + + format_name(name_buffer, sizeof(name_buffer), + /* host = */ "", vl->plugin, vl->plugin_instance, vl->type, + vl->type_instance); + if (host->always_append_ds || (ds->ds_num > 1)) { + if (host->event_service_prefix == NULL) + ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s", + &name_buffer[1], ds->ds[index].name); + else + ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s", + host->event_service_prefix, &name_buffer[1], + ds->ds[index].name); + } else { + if (host->event_service_prefix == NULL) + sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer)); + else + ssnprintf(service_buffer, sizeof(service_buffer), "%s%s", + host->event_service_prefix, &name_buffer[1]); + } + + riemann_event_set( + event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME, + (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL, + (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor, + RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type", + vl->type, "ds_name", ds->ds[index].name, NULL, + RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE); + + if (host->check_thresholds) { + const char *state = NULL; + + switch (status) { + case STATE_OKAY: + state = "ok"; + break; + case STATE_ERROR: + state = "critical"; + break; + case STATE_WARNING: + state = "warning"; + break; + case STATE_MISSING: + state = "unknown"; + break; + } + if (state) + riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state, + RIEMANN_EVENT_FIELD_NONE); + } + + if (vl->plugin_instance[0] != 0) + riemann_event_string_attribute_add(event, "plugin_instance", + vl->plugin_instance); + if (vl->type_instance[0] != 0) + riemann_event_string_attribute_add(event, "type_instance", + vl->type_instance); + + if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) { + char ds_type[DATA_MAX_NAME_LEN]; + + ssnprintf(ds_type, sizeof(ds_type), "%s:rate", + DS_TYPE_TO_STRING(ds->ds[index].type)); + riemann_event_string_attribute_add(event, "ds_type", ds_type); + } else { + riemann_event_string_attribute_add(event, "ds_type", + DS_TYPE_TO_STRING(ds->ds[index].type)); + } + + { + char ds_index[DATA_MAX_NAME_LEN]; + + ssnprintf(ds_index, sizeof(ds_index), "%zu", index); + riemann_event_string_attribute_add(event, "ds_index", ds_index); + } + + for (i = 0; i < riemann_attrs_num; i += 2) + riemann_event_string_attribute_add(event, riemann_attrs[i], + riemann_attrs[i + 1]); + + for (i = 0; i < riemann_tags_num; i++) + riemann_event_tag_add(event, riemann_tags[i]); + + if (ds->ds[index].type == DS_TYPE_GAUGE) { + riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, + (double)vl->values[index].gauge, + RIEMANN_EVENT_FIELD_NONE); + } else if (rates != NULL) { + riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index], + RIEMANN_EVENT_FIELD_NONE); + } else { + int64_t metric; + + if (ds->ds[index].type == DS_TYPE_DERIVE) + metric = (int64_t)vl->values[index].derive; + else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) + metric = (int64_t)vl->values[index].absolute; + else + metric = (int64_t)vl->values[index].counter; + + riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric, + RIEMANN_EVENT_FIELD_NONE); + } + + DEBUG("write_riemann plugin: Successfully created message for metric: " + "host = \"%s\", service = \"%s\"", + event->host, event->service); + return (event); +} /* }}} riemann_event_t *wrr_value_to_event */ + +static riemann_message_t * +wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, value_list_t const *vl, + int *statuses) { + riemann_message_t *msg; + size_t i; + gauge_t *rates = NULL; + + /* Initialize the Msg structure. */ + msg = riemann_message_new(); + if (msg == NULL) { + ERROR("write_riemann plugin: riemann_message_new failed."); + return (NULL); + } + + if (host->store_rates) { + rates = uc_get_rate(ds, vl); + if (rates == NULL) { + ERROR("write_riemann plugin: uc_get_rate failed."); + riemann_message_free(msg); + return (NULL); + } + } - msg__free_unpacked (msg, NULL); - return 0; -} /* }}} int riemann_recv_ack */ + for (i = 0; i < vl->values_len; i++) { + riemann_event_t *event; -/** - * Function to send messages (Msg) to riemann. - * - * Acquires the host lock, disconnects on errors. - */ -static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */ -{ - int status = 0; - pthread_mutex_lock (&host->lock); - - status = riemann_send_msg(host, msg); - if (status != 0) { - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - return status; - } - - /* - * For TCP we need to receive message acknowledgemenent. - */ - if (host->use_tcp) - { - status = riemann_recv_ack(host); - - if (status != 0) - { - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - return status; - } - } - - pthread_mutex_unlock (&host->lock); - return 0; -} /* }}} int riemann_send */ - -static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */ -{ - return (strarray_add (&event->tags, &event->n_tags, tag)); -} /* }}} int riemann_event_add_tag */ - -static int riemann_event_add_attribute(Event *event, /* {{{ */ - char const *key, char const *value) -{ - Attribute **new_attributes; - Attribute *a; - - new_attributes = realloc (event->attributes, - sizeof (*event->attributes) * (event->n_attributes + 1)); - if (new_attributes == NULL) - { - ERROR ("write_riemann plugin: realloc failed."); - return (ENOMEM); - } - event->attributes = new_attributes; - - a = malloc (sizeof (*a)); - if (a == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - return (ENOMEM); - } - attribute__init (a); - - a->key = strdup (key); - if (value != NULL) - a->value = strdup (value); - - event->attributes[event->n_attributes] = a; - event->n_attributes++; - - return (0); -} /* }}} int riemann_event_add_attribute */ - -static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */ - notification_t const *n) -{ - Msg *msg; - Event *event; - char service_buffer[6 * DATA_MAX_NAME_LEN]; - char const *severity; - notification_meta_t *meta; - int i; - - msg = malloc (sizeof (*msg)); - if (msg == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - return (NULL); - } - memset (msg, 0, sizeof (*msg)); - msg__init (msg); - - msg->events = malloc (sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - sfree (msg); - return (NULL); - } - - event = malloc (sizeof (*event)); - if (event == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - sfree (msg->events); - sfree (msg); - return (NULL); - } - memset (event, 0, sizeof (*event)); - event__init (event); - - msg->events[0] = event; - msg->n_events = 1; - - event->host = strdup (n->host); - event->time = CDTIME_T_TO_TIME_T (n->time); - event->has_time = 1; - - switch (n->severity) - { - case NOTIF_OKAY: severity = "ok"; break; - case NOTIF_WARNING: severity = "warning"; break; - case NOTIF_FAILURE: severity = "critical"; break; - default: severity = "unknown"; - } - event->state = strdup (severity); - - riemann_event_add_tag (event, "notification"); - if (n->host[0] != 0) - riemann_event_add_attribute (event, "host", n->host); - if (n->plugin[0] != 0) - riemann_event_add_attribute (event, "plugin", n->plugin); - if (n->plugin_instance[0] != 0) - riemann_event_add_attribute (event, "plugin_instance", - n->plugin_instance); - - if (n->type[0] != 0) - riemann_event_add_attribute (event, "type", n->type); - if (n->type_instance[0] != 0) - riemann_event_add_attribute (event, "type_instance", - n->type_instance); - - for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_add_attribute(event, - riemann_attrs[i], - riemann_attrs[i +1]); - - for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, riemann_tags[i]); - - format_name (service_buffer, sizeof (service_buffer), - /* host = */ "", n->plugin, n->plugin_instance, - n->type, n->type_instance); - event->service = strdup (&service_buffer[1]); - - if (n->message[0] != 0) - riemann_event_add_attribute (event, "description", n->message); - - /* Pull in values from threshold and add extra attributes */ - for (meta = n->meta; meta != NULL; meta = meta->next) - { - if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) - { - event->metric_d = meta->nm_value.nm_double; - event->has_metric_d = 1; - continue; - } - - if (meta->type == NM_TYPE_STRING) { - riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string); - continue; - } - } - - DEBUG ("write_riemann plugin: Successfully created protobuf for notification: " - "host = \"%s\", service = \"%s\", state = \"%s\"", - event->host, event->service, event->state); - return (msg); -} /* }}} Msg *riemann_notification_to_protobuf */ - -static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, size_t index, - gauge_t const *rates, - int status) -{ - Event *event; - char name_buffer[5 * DATA_MAX_NAME_LEN]; - char service_buffer[6 * DATA_MAX_NAME_LEN]; - double ttl; - int i; - - event = malloc (sizeof (*event)); - if (event == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - return (NULL); - } - memset (event, 0, sizeof (*event)); - event__init (event); - - event->host = strdup (vl->host); - event->time = CDTIME_T_TO_TIME_T (vl->time); - event->has_time = 1; - - if (host->check_thresholds) { - switch (status) { - case STATE_OKAY: - event->state = strdup("ok"); - break; - case STATE_ERROR: - event->state = strdup("critical"); - break; - case STATE_WARNING: - event->state = strdup("warning"); - break; - case STATE_MISSING: - event->state = strdup("unknown"); - break; - } - } - - ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor; - event->ttl = (float) ttl; - event->has_ttl = 1; - - riemann_event_add_attribute (event, "plugin", vl->plugin); - if (vl->plugin_instance[0] != 0) - riemann_event_add_attribute (event, "plugin_instance", - vl->plugin_instance); - - riemann_event_add_attribute (event, "type", vl->type); - if (vl->type_instance[0] != 0) - riemann_event_add_attribute (event, "type_instance", - vl->type_instance); - - if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) - { - char ds_type[DATA_MAX_NAME_LEN]; - - ssnprintf (ds_type, sizeof (ds_type), "%s:rate", - DS_TYPE_TO_STRING(ds->ds[index].type)); - riemann_event_add_attribute (event, "ds_type", ds_type); - } - else - { - riemann_event_add_attribute (event, "ds_type", - DS_TYPE_TO_STRING(ds->ds[index].type)); - } - riemann_event_add_attribute (event, "ds_name", ds->ds[index].name); - { - char ds_index[DATA_MAX_NAME_LEN]; - - ssnprintf (ds_index, sizeof (ds_index), "%zu", index); - riemann_event_add_attribute (event, "ds_index", ds_index); - } - - for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_add_attribute(event, - riemann_attrs[i], - riemann_attrs[i +1]); - - for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, riemann_tags[i]); - - if (ds->ds[index].type == DS_TYPE_GAUGE) - { - event->has_metric_d = 1; - event->metric_d = (double) vl->values[index].gauge; - } - else if (rates != NULL) - { - event->has_metric_d = 1; - event->metric_d = (double) rates[index]; - } - else - { - event->has_metric_sint64 = 1; - if (ds->ds[index].type == DS_TYPE_DERIVE) - event->metric_sint64 = (int64_t) vl->values[index].derive; - else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) - event->metric_sint64 = (int64_t) vl->values[index].absolute; - else - event->metric_sint64 = (int64_t) vl->values[index].counter; - } - - format_name (name_buffer, sizeof (name_buffer), - /* host = */ "", vl->plugin, vl->plugin_instance, - vl->type, vl->type_instance); - if (host->always_append_ds || (ds->ds_num > 1)) - { - if (host->event_service_prefix == NULL) - ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s", - &name_buffer[1], ds->ds[index].name); - else - ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s", - host->event_service_prefix, &name_buffer[1], ds->ds[index].name); - } - else - { - if (host->event_service_prefix == NULL) - sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer)); - else - ssnprintf (service_buffer, sizeof (service_buffer), "%s%s", - host->event_service_prefix, &name_buffer[1]); - } - - event->service = strdup (service_buffer); - - DEBUG ("write_riemann plugin: Successfully created protobuf for metric: " - "host = \"%s\", service = \"%s\"", - event->host, event->service); - return (event); -} /* }}} Event *riemann_value_to_protobuf */ - -static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, - int *statuses) -{ - Msg *msg; - size_t i; - gauge_t *rates = NULL; - - /* Initialize the Msg structure. */ - msg = malloc (sizeof (*msg)); - if (msg == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - return (NULL); - } - memset (msg, 0, sizeof (*msg)); - msg__init (msg); - - /* Set up events. First, the list of pointers. */ - msg->n_events = (size_t) vl->values_len; - msg->events = calloc (msg->n_events, sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("write_riemann plugin: calloc failed."); - riemann_msg_protobuf_free (msg); - return (NULL); - } - - if (host->store_rates) - { - rates = uc_get_rate (ds, vl); - if (rates == NULL) - { - ERROR ("write_riemann plugin: uc_get_rate failed."); - riemann_msg_protobuf_free (msg); - return (NULL); - } - } - - for (i = 0; i < msg->n_events; i++) - { - msg->events[i] = riemann_value_to_protobuf (host, ds, vl, - (int) i, rates, statuses[i]); - if (msg->events[i] == NULL) - { - riemann_msg_protobuf_free (msg); - sfree (rates); - return (NULL); - } - } - - sfree (rates); - return (msg); -} /* }}} Msg *riemann_value_list_to_protobuf */ + event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]); + if (event == NULL) { + riemann_message_free(msg); + sfree(rates); + return (NULL); + } + riemann_message_append_events(msg, event, NULL); + } + sfree(rates); + return (msg); +} /* }}} riemann_message_t *wrr_value_list_to_message */ /* * Always call while holding host->lock !