#include "common.h"
#include "configfile.h"
#include "utils_cache.h"
+#include "utils_complain.h"
#include "write_riemann_threshold.h"
#define RIEMANN_HOST "localhost"
#define RIEMANN_BATCH_MAX 8192
struct riemann_host {
+ c_complain_t init_complaint;
char *name;
char *event_service_prefix;
pthread_mutex_t lock;
double ttl_factor;
cdtime_t batch_init;
int batch_max;
+ int batch_timeout;
int reference_count;
riemann_message_t *batch_msg;
char *tls_ca_file;
char *tls_cert_file;
char *tls_key_file;
+ struct timeval timeout;
};
static char **riemann_tags;
RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
RIEMANN_CLIENT_OPTION_NONE);
if (host->client == NULL) {
- WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
- node, port);
+ c_complain (LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+ node, port);
return -1;
}
- DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
- node, port);
+ if (host->timeout.tv_sec != 0) {
+ if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
+ riemann_client_free(host->client);
+ host->client = NULL;
+ c_complain (LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+ node, port);
+ return -1;
+ }
+ }
+
+ c_release (LOG_INFO, &host->init_complaint,
+ "write_riemann plugin: Successfully connected to %s:%d",
+ node, port);
return 0;
} /* }}} int wrr_connect */
*
* Acquires the host lock, disconnects on errors.
*/
-static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
+static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
{
int status = 0;
- pthread_mutex_lock (&host->lock);
status = wrr_connect(host);
- if (status != 0)
+ if (status != 0) {
return status;
+ }
status = riemann_client_send_message(host->client, msg);
if (status != 0) {
wrr_disconnect(host);
- pthread_mutex_unlock(&host->lock);
return status;
}
if (response == NULL)
{
wrr_disconnect(host);
- pthread_mutex_unlock(&host->lock);
return errno;
}
riemann_message_free(response);
}
- pthread_mutex_unlock (&host->lock);
return 0;
} /* }}} int wrr_send */
+static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
+{
+ int status = 0;
+
+ pthread_mutex_lock (&host->lock);
+ status = wrr_send_nolock(host, msg);
+ pthread_mutex_unlock (&host->lock);
+ return status;
+}
+
static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
notification_t const *n)
{
RIEMANN_EVENT_FIELD_NONE);
if (n->host[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("host", n->host));
+ riemann_event_string_attribute_add(event, "host", n->host);
if (n->plugin[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("plugin", n->plugin));
+ riemann_event_string_attribute_add(event, "plugin", n->plugin);
if (n->plugin_instance[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("plugin_instance",
- n->plugin_instance));
+ riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
if (n->type[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("type", n->type));
+ riemann_event_string_attribute_add(event, "type", n->type);
if (n->type_instance[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("type_instance",
- n->type_instance));
+ riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_attribute_add(event,
- riemann_attribute_create(riemann_attrs[i],
- riemann_attrs[i +1]));
+ riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
for (i = 0; i < riemann_tags_num; i++)
riemann_event_tag_add(event, riemann_tags[i]);
if (n->message[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("description", n->message));
+ riemann_event_string_attribute_add(event, "description", n->message);
/* Pull in values from threshold and add extra attributes */
for (meta = n->meta; meta != NULL; meta = meta->next)
}
if (meta->type == NM_TYPE_STRING) {
- riemann_event_attribute_add(event,
- riemann_attribute_create(meta->name,
- meta->nm_value.nm_string));
+ riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
continue;
}
}
RIEMANN_EVENT_FIELD_HOST, vl->host,
RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
- RIEMANN_EVENT_FIELD_ATTRIBUTES,
- riemann_attribute_create("plugin", vl->plugin),
- riemann_attribute_create("type", vl->type),
- riemann_attribute_create("ds_name", ds->ds[index].name),
+ RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
+ "plugin", vl->plugin,
+ "type", vl->type,
+ "ds_name", ds->ds[index].name,
NULL,
RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
RIEMANN_EVENT_FIELD_NONE);
}
if (vl->plugin_instance[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("plugin_instance",
- vl->plugin_instance));
+ riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
if (vl->type_instance[0] != 0)
- riemann_event_attribute_add(event,
- riemann_attribute_create("type_instance",
- vl->type_instance));
+ riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
{
ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
DS_TYPE_TO_STRING(ds->ds[index].type));
- riemann_event_attribute_add(event,
- riemann_attribute_create("ds_type", ds_type));
+ riemann_event_string_attribute_add(event, "ds_type", ds_type);
}
else
{
- riemann_event_attribute_add(event,
- riemann_attribute_create("ds_type",
- DS_TYPE_TO_STRING(ds->ds[index].type)));
+ riemann_event_string_attribute_add(event, "ds_type",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
}
{
char ds_index[DATA_MAX_NAME_LEN];
ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
- riemann_event_attribute_add(event,
- riemann_attribute_create("ds_index", ds_index));
+ riemann_event_string_attribute_add(event, "ds_index", ds_index);
}
for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_attribute_add(event,
- riemann_attribute_create(riemann_attrs[i],
- riemann_attrs[i +1]));
+ riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
for (i = 0; i < riemann_tags_num; i++)
riemann_event_tag_add(event, riemann_tags[i]);
cdtime_t now;
int status = 0;
+ now = cdtime();
if (timeout > 0) {
- now = cdtime();
- if ((host->batch_init + timeout) > now)
+ if ((host->batch_init + timeout) > now) {
return status;
+ }
}
- wrr_send(host, host->batch_msg);
+ wrr_send_nolock(host, host->batch_msg);
riemann_message_free(host->batch_msg);
- if (host->client_type != RIEMANN_CLIENT_UDP)
- {
- riemann_message_t *response;
-
- response = riemann_client_recv_message(host->client);
-
- if (!response)
- {
- wrr_disconnect(host);
- return errno;
- }
-
- riemann_message_free(response);
- }
-
- host->batch_init = cdtime();
+ host->batch_init = now;
host->batch_msg = NULL;
return status;
}
pthread_mutex_lock(&host->lock);
status = wrr_batch_flush_nolock(timeout, host);
if (status != 0)
- ERROR("write_riemann plugin: riemann_client_send failed with status %i",
- status);
+ c_complain (LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: riemann_client_send failed with status %i",
+ status);
+ else
+ c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
pthread_mutex_unlock(&host->lock);
return status;
riemann_message_t *msg;
size_t len;
int ret;
+ cdtime_t timeout;
msg = wrr_value_list_to_message(host, ds, vl, statuses);
if (msg == NULL)
}
}
- len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg));
+ len = riemann_message_get_packed_size(host->batch_msg);
ret = 0;
if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
ret = wrr_batch_flush_nolock(0, host);
- }
+ } else {
+ if (host->batch_timeout > 0) {
+ timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
+ ret = wrr_batch_flush_nolock(timeout, host);
+ }
+ }
pthread_mutex_unlock(&host->lock);
return ret;
status = wrr_send(host, msg);
if (status != 0)
- ERROR("write_riemann plugin: riemann_client_send failed with status %i",
- status);
+ c_complain (LOG_ERR, &host->init_complaint,
+ "write_riemann plugin: riemann_client_send failed with status %i",
+ status);
+ else
+ c_release (LOG_DEBUG, &host->init_complaint,
+ "write_riemann plugin: riemann_client_send succeeded");
riemann_message_free(msg);
return (status);
}
if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
- wrr_batch_add_value_list(host, ds, vl, statuses);
+ wrr_batch_add_value_list(host, ds, vl, statuses);
} else {
msg = wrr_value_list_to_message(host, ds, vl, statuses);
if (msg == NULL)
return (-1);
status = wrr_send(host, msg);
- if (status != 0)
- ERROR("write_riemann plugin: riemann_client_send failed with status %i",
- status);
riemann_message_free(msg);
}
return ENOMEM;
}
pthread_mutex_init(&host->lock, NULL);
+ C_COMPLAIN_INIT (&host->init_complaint);
host->reference_count = 1;
host->node = NULL;
host->port = 0;
host->batch_mode = 1;
host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
host->batch_init = cdtime();
+ host->batch_timeout = 0;
host->ttl_factor = RIEMANN_TTL_FACTOR;
host->client = NULL;
host->client_type = RIEMANN_CLIENT_TCP;
+ host->timeout.tv_sec = 0;
+ host->timeout.tv_usec = 0;
status = cf_util_get_string(ci, &host->name);
if (status != 0) {
status = cf_util_get_int(child, &host->batch_max);
if (status != 0)
break;
+ } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
+ status = cf_util_get_int(child, &host->batch_timeout);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("Timeout", child->key) == 0) {
+ status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
+ if (status != 0)
+ break;
} else if (strcasecmp("Port", child->key) == 0) {
host->port = cf_util_get_port_number(child);
if (host->port == -1) {