X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fwrite_redis.c;h=b4c5e212e300e83a5d5de5d060f02c0940d9b70c;hb=eea01a8f212634414a21462ba79dc058dc5fb304;hp=0b15e0fa95c4e2e37d3ee973bbfca9a47976fe9d;hpb=53181fa3b5a91f47e51d01d56b34906f8b244a1b;p=collectd.git diff --git a/src/write_redis.c b/src/write_redis.c index 0b15e0fa..b4c5e212 100644 --- a/src/write_redis.c +++ b/src/write_redis.c @@ -1,6 +1,6 @@ /** * collectd - src/write_redis.c - * Copyright (C) 2010 Florian Forster + * Copyright (C) 2010 Florian Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -30,7 +30,8 @@ #include "configfile.h" #include -#include +#include +#include struct wr_node_s { @@ -38,9 +39,9 @@ struct wr_node_s char *host; int port; - int timeout; + struct timeval timeout; - REDIS conn; + redisContext *conn; pthread_mutex_t lock; }; typedef struct wr_node_s wr_node_t; @@ -53,43 +54,63 @@ static int wr_write (const data_set_t *ds, /* {{{ */ user_data_t *ud) { wr_node_t *node = ud->data; + char ident[512]; char key[512]; char value[512]; - char tmp[512]; + char time[24]; + size_t value_size; + char *value_ptr; int status; + redisReply *rr; int i; - status = FORMAT_VL (tmp, sizeof (tmp), vl); + status = FORMAT_VL (ident, sizeof (ident), vl); if (status != 0) return (status); - ssnprintf (key, sizeof (key), "collectd/%s", tmp); + ssnprintf (key, sizeof (key), "collectd/%s", ident); + ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time)); + + memset (value, 0, sizeof (value)); + value_size = sizeof (value); + value_ptr = &value[0]; + +#define APPEND(...) do { \ + status = snprintf (value_ptr, value_size, __VA_ARGS__); \ + if (((size_t) status) > value_size) \ + { \ + value_ptr += value_size; \ + value_size = 0; \ + } \ + else \ + { \ + value_ptr += status; \ + value_size -= status; \ + } \ +} while (0) + + APPEND ("%s:", time); - ssnprintf (value, sizeof (value), "%lu", (unsigned long) vl->time); for (i = 0; i < ds->ds_num; i++) { if (ds->ds[i].type == DS_TYPE_COUNTER) - ssnprintf (tmp, sizeof (tmp), "%s:%llu", - value, vl->values[i].counter); + APPEND ("%llu", vl->values[i].counter); else if (ds->ds[i].type == DS_TYPE_GAUGE) - ssnprintf (tmp, sizeof (tmp), "%s:%g", - value, vl->values[i].gauge); + APPEND ("%g", vl->values[i].gauge); else if (ds->ds[i].type == DS_TYPE_DERIVE) - ssnprintf (tmp, sizeof (tmp), "%s:%"PRIi64, - value, vl->values[i].derive); + APPEND ("%"PRIi64, vl->values[i].derive); else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) - ssnprintf (tmp, sizeof (tmp), "%s:%"PRIu64, - value, vl->values[i].absolute); + APPEND ("%"PRIu64, vl->values[i].absolute); else assert (23 == 42); - - memcpy (value, tmp, sizeof (value)); } +#undef APPEND + pthread_mutex_lock (&node->lock); if (node->conn == NULL) { - node->conn = credis_connect (node->host, node->port, node->timeout); + node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout); if (node->conn == NULL) { ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed.", @@ -100,10 +121,14 @@ static int wr_write (const data_set_t *ds, /* {{{ */ } } - /* "credis_zadd" doesn't handle a NULL pointer gracefully, so I'd rather - * have a meaningful assertion message than a normal segmentation fault. */ assert (node->conn != NULL); - status = credis_zadd (node->conn, key, (double) vl->time, value); + rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value); + if (rr==NULL) + WARNING("ZADD command error. key:%s", key); + + rr = redisCommand (node->conn, "SADD collectd/values %s", ident); + if (rr==NULL) + WARNING("SADD command error. ident:%s", ident); pthread_mutex_unlock (&node->lock); @@ -119,7 +144,7 @@ static void wr_config_free (void *ptr) /* {{{ */ if (node->conn != NULL) { - credis_close (node->conn); + redisFree (node->conn); node->conn = NULL; } @@ -130,6 +155,7 @@ static void wr_config_free (void *ptr) /* {{{ */ static int wr_config_node (oconfig_item_t *ci) /* {{{ */ { wr_node_t *node; + int timeout; int status; int i; @@ -139,7 +165,8 @@ static int wr_config_node (oconfig_item_t *ci) /* {{{ */ memset (node, 0, sizeof (*node)); node->host = NULL; node->port = 0; - node->timeout = 1000; + node->timeout.tv_sec = 0; + node->timeout.tv_usec = 1000; node->conn = NULL; pthread_mutex_init (&node->lock, /* attr = */ NULL); @@ -165,8 +192,10 @@ static int wr_config_node (oconfig_item_t *ci) /* {{{ */ status = 0; } } - else if (strcasecmp ("Timeout", child->key) == 0) - status = cf_util_get_int (child, &node->timeout); + else if (strcasecmp ("Timeout", child->key) == 0) { + status = cf_util_get_int (child, &timeout); + if (status == 0) node->timeout.tv_usec = timeout; + } else WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".", child->key);