X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fredis.c;h=ea888ca682814fe2dc2b210edc5e629886e9a78c;hb=650a058c8284d1fc845f13e11eecee064ea02b6b;hp=85a8354629240553f7e60a65ae0c5cb72446ae29;hpb=ef4a3db895a0aba7107c0f1c6c2ef2a7f128aaf8;p=collectd.git diff --git a/src/redis.c b/src/redis.c index 85a83546..ea888ca6 100644 --- a/src/redis.c +++ b/src/redis.c @@ -21,22 +21,25 @@ **/ #include "collectd.h" + #include "common.h" #include "plugin.h" -#include "configfile.h" -#include -#include +#include +#include #ifndef HOST_NAME_MAX -# define HOST_NAME_MAX _POSIX_HOST_NAME_MAX +#define HOST_NAME_MAX _POSIX_HOST_NAME_MAX #endif -#define REDIS_DEF_HOST "localhost" +#define REDIS_DEF_HOST "localhost" #define REDIS_DEF_PASSWD "" -#define REDIS_DEF_PORT 6379 +#define REDIS_DEF_PORT 6379 #define REDIS_DEF_TIMEOUT 2000 #define MAX_REDIS_NODE_NAME 64 +#define MAX_REDIS_PASSWD_LENGTH 512 +#define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_QUERY 2048 /* Redis plugin configuration example: * @@ -44,59 +47,65 @@ * * Host "localhost" * Port "6379" - * Timeout 2000 + * Timeout 2 + * Password "foobar" * * */ +struct redis_query_s; +typedef struct redis_query_s redis_query_t; +struct redis_query_s { + char query[MAX_REDIS_QUERY]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_query_t *next; +}; + struct redis_node_s; typedef struct redis_node_s redis_node_t; -struct redis_node_s -{ +struct redis_node_s { char name[MAX_REDIS_NODE_NAME]; char host[HOST_NAME_MAX]; - char passwd[HOST_NAME_MAX]; + char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; - int timeout; + struct timeval timeout; + redis_query_t *queries; redis_node_t *next; }; static redis_node_t *nodes_head = NULL; -static int redis_node_add (const redis_node_t *rn) /* {{{ */ +static int redis_node_add(const redis_node_t *rn) /* {{{ */ { redis_node_t *rn_copy; redis_node_t *rn_ptr; /* Check for duplicates first */ for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next) - if (strcmp (rn->name, rn_ptr->name) == 0) + if (strcmp(rn->name, rn_ptr->name) == 0) break; - if (rn_ptr != NULL) - { - ERROR ("redis plugin: A node with the name `%s' already exists.", - rn->name); + if (rn_ptr != NULL) { + ERROR("redis plugin: A node with the name `%s' already exists.", rn->name); return (-1); } - rn_copy = malloc (sizeof (*rn_copy)); - if (rn_copy == NULL) - { - ERROR ("redis plugin: malloc failed adding redis_node to the tree."); + rn_copy = malloc(sizeof(*rn_copy)); + if (rn_copy == NULL) { + ERROR("redis plugin: malloc failed adding redis_node to the tree."); return (-1); } - memcpy (rn_copy, rn, sizeof (*rn_copy)); + memcpy(rn_copy, rn, sizeof(*rn_copy)); rn_copy->next = NULL; - DEBUG ("redis plugin: Adding node \"%s\".", rn->name); + DEBUG("redis plugin: Adding node \"%s\".", rn->name); if (nodes_head == NULL) nodes_head = rn_copy; - else - { + else { rn_ptr = nodes_head; while (rn_ptr->next != NULL) rn_ptr = rn_ptr->next; @@ -106,43 +115,92 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */ return (0); } /* }}} */ -static int redis_config_node (oconfig_item_t *ci) /* {{{ */ +static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */ +{ + redis_query_t *rq; + int status; + + rq = calloc(1, sizeof(*rq)); + if (rq == NULL) { + ERROR("redis plugin: calloc failed adding redis_query."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query)); + if (status != 0) + goto err; + + /* + * Default to a gauge type. + */ + (void)strncpy(rq->type, "gauge", sizeof(rq->type)); + (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance)); + replace_special(rq->instance, sizeof(rq->instance)); + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Type", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = + cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance)); + } else { + WARNING("redis plugin: unknown configuration option: %s", option->key); + status = -1; + } + if (status != 0) + goto err; + } + return rq; +err: + free(rq); + return NULL; +} /* }}} */ + +static int redis_config_node(oconfig_item_t *ci) /* {{{ */ { - redis_node_t rn; - int i; + redis_query_t *rq; int status; + int timeout; - memset (&rn, 0, sizeof (rn)); - sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); - rn.port = REDIS_DEF_PORT; - rn.timeout = REDIS_DEF_TIMEOUT; + redis_node_t rn = {.port = REDIS_DEF_PORT, + .timeout.tv_usec = REDIS_DEF_TIMEOUT}; - status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); + sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host)); + + status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name)); if (status != 0) return (status); - for (i = 0; i < ci->children_num; i++) - { + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; - if (strcasecmp ("Host", option->key) == 0) - status = cf_util_get_string_buffer (option, rn.host, sizeof (rn.host)); - else if (strcasecmp ("Port", option->key) == 0) - { - status = cf_util_get_port_number (option); - if (status > 0) - { + if (strcasecmp("Host", option->key) == 0) + status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host)); + else if (strcasecmp("Port", option->key) == 0) { + status = cf_util_get_port_number(option); + if (status > 0) { rn.port = status; status = 0; } - } - else if (strcasecmp ("Timeout", option->key) == 0) - status = cf_util_get_int (option, &rn.timeout); - else if (strcasecmp ("Password", option->key) == 0) - status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd)); + } else if (strcasecmp("Query", option->key) == 0) { + rq = redis_config_query(option); + if (rq == NULL) { + status = 1; + } else { + rq->next = rn.queries; + rn.queries = rq; + } + } else if (strcasecmp("Timeout", option->key) == 0) { + status = cf_util_get_int(option, &timeout); + if (status == 0) + rn.timeout.tv_usec = timeout; + } else if (strcasecmp("Password", option->key) == 0) + status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd)); else - WARNING ("redis plugin: Option `%s' not allowed inside a `Node' " - "block. I'll ignore this option.", option->key); + WARNING("redis plugin: Option `%s' not allowed inside a `Node' " + "block. I'll ignore this option.", + option->key); if (status != 0) break; @@ -151,181 +209,245 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ if (status != 0) return (status); - return (redis_node_add (&rn)); + return (redis_node_add(&rn)); } /* }}} int redis_config_node */ -static int redis_config (oconfig_item_t *ci) /* {{{ */ +static int redis_config(oconfig_item_t *ci) /* {{{ */ { - int i; - - for (i = 0; i < ci->children_num; i++) - { + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; - if (strcasecmp ("Node", option->key) == 0) - redis_config_node (option); + if (strcasecmp("Node", option->key) == 0) + redis_config_node(option); else - WARNING ("redis plugin: Option `%s' not allowed in redis" - " configuration. It will be ignored.", option->key); + WARNING("redis plugin: Option `%s' not allowed in redis" + " configuration. It will be ignored.", + option->key); } - if (nodes_head == NULL) - { - ERROR ("redis plugin: No valid node configuration could be found."); + if (nodes_head == NULL) { + ERROR("redis plugin: No valid node configuration could be found."); return (ENOENT); } return (0); } /* }}} */ - __attribute__ ((nonnull(2))) -static void redis_submit_g (char *plugin_instance, - const char *type, const char *type_instance, - gauge_t value) /* {{{ */ +__attribute__((nonnull(2))) static void +redis_submit(char *plugin_instance, const char *type, const char *type_instance, + value_t value) /* {{{ */ { - value_t values[1]; value_list_t vl = VALUE_LIST_INIT; - values[0].gauge = value; - - vl.values = values; + vl.values = &value; vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "redis", sizeof (vl.plugin)); + sstrncpy(vl.plugin, "redis", sizeof(vl.plugin)); if (plugin_instance != NULL) - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof (vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); + sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); + sstrncpy(vl.type, type, sizeof(vl.type)); if (type_instance != NULL) - sstrncpy (vl.type_instance, type_instance, - sizeof (vl.type_instance)); + sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance)); - plugin_dispatch_values (&vl); + plugin_dispatch_values(&vl); } /* }}} */ - __attribute__ ((nonnull(2))) -static void redis_submit_d (char *plugin_instance, - const char *type, const char *type_instance, - derive_t value) /* {{{ */ +static int redis_init(void) /* {{{ */ { - value_t values[1]; - value_list_t vl = VALUE_LIST_INIT; - - values[0].derive = value; + redis_node_t rn = {.name = "default", + .host = REDIS_DEF_HOST, + .port = REDIS_DEF_PORT, + .timeout.tv_sec = 0, + .timeout.tv_usec = REDIS_DEF_TIMEOUT, + .next = NULL}; - vl.values = values; - vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "redis", sizeof (vl.plugin)); - if (plugin_instance != NULL) - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof (vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_instance != NULL) - sstrncpy (vl.type_instance, type_instance, - sizeof (vl.type_instance)); + if (nodes_head == NULL) + redis_node_add(&rn); - plugin_dispatch_values (&vl); -} /* }}} */ + return (0); +} /* }}} int redis_init */ -static int redis_init (void) /* {{{ */ +static int redis_handle_info(char *node, char const *info_line, + char const *type, char const *type_instance, + char const *field_name, int ds_type) /* {{{ */ { - redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PASSWD, - REDIS_DEF_PORT, REDIS_DEF_TIMEOUT, /* next = */ NULL }; + char *str = strstr(info_line, field_name); + static char buf[MAX_REDIS_VAL_SIZE]; + value_t val; + if (str) { + int i; + + str += strlen(field_name) + 1; /* also skip the ':' */ + for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.')); + i++, str++) + buf[i] = *str; + buf[i] = '\0'; + + if (parse_value(buf, &val, ds_type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", field_name); + return (-1); + } - if (nodes_head == NULL) - redis_node_add (&rn); + redis_submit(node, type, type_instance, val); + return (0); + } + return (-1); - return (0); -} /* }}} int redis_init */ +} /* }}} int redis_handle_info */ -static int redis_read (void) /* {{{ */ +static int redis_handle_query(redisContext *rh, redis_node_t *rn, + redis_query_t *rq) /* {{{ */ { - redis_node_t *rn; + redisReply *rr; + const data_set_t *ds; + value_t val; - for (rn = nodes_head; rn != NULL; rn = rn->next) - { - REDIS rh; - REDIS_INFO info; + ds = plugin_get_ds(rq->type); + if (!ds) { + ERROR("redis plugin: DataSet `%s' not defined.", rq->type); + return (-1); + } - int status; + if (ds->ds_num != 1) { + ERROR("redis plugin: DS `%s' has too many types.", rq->type); + return (-1); + } - DEBUG ("redis plugin: querying info from node `%s' (%s:%d).", rn->name, rn->host, rn->port); + if ((rr = redisCommand(rh, rq->query)) == NULL) { + WARNING("redis plugin: unable to carry out query `%s'.", rq->query); + return (-1); + } - rh = credis_connect (rn->host, rn->port, rn->timeout); - if (rh == NULL) - { - ERROR ("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, rn->host, rn->port); + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value(rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rq->type); + freeReplyObject(rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rq->type, + (strlen(rq->instance) > 0) ? rq->instance : NULL, val); + freeReplyObject(rr); + return 0; +} /* }}} int redis_handle_query */ + +static int redis_read(void) /* {{{ */ +{ + for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) { + redisContext *rh; + redisReply *rr; + + DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name, + rn->host, rn->port); + + rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout); + if (rh == NULL) { + ERROR("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, + rn->host, rn->port); continue; } - if (strlen (rn->passwd) > 0) - { - DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd); - status = credis_auth(rh, rn->passwd); - if (status != 0) - { - WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name); - credis_close (rh); - continue; + if (strlen(rn->passwd) > 0) { + DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name, + rn->passwd); + + if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) { + WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name); + goto redis_fail; + } + + if (rr->type != REDIS_REPLY_STATUS) { + WARNING("redis plugin: invalid authentication on node `%s'.", rn->name); + goto redis_fail; } + + freeReplyObject(rr); } - memset (&info, 0, sizeof (info)); - status = credis_info (rh, &info); - if (status != 0) - { - WARNING ("redis plugin: unable to get info from node `%s'.", rn->name); - credis_close (rh); - continue; + if ((rr = redisCommand(rh, "INFO")) == NULL) { + WARNING("redis plugin: unable to get info from node `%s'.", rn->name); + goto redis_fail; } - /* typedef struct _cr_info { - * char redis_version[CREDIS_VERSION_STRING_SIZE]; - * int bgsave_in_progress; - * int connected_clients; - * int connected_slaves; - * unsigned int used_memory; - * long long changes_since_last_save; - * int last_save_time; - * long long total_connections_received; - * long long total_commands_processed; - * int uptime_in_seconds; - * int uptime_in_days; - * int role; - * } REDIS_INFO; */ - - DEBUG ("redis plugin: received info from node `%s': connected_clients = %d; " - "connected_slaves = %d; used_memory = %lu; changes_since_last_save = %lld; " - "bgsave_in_progress = %d; total_connections_received = %lld; " - "total_commands_processed = %lld; uptime_in_seconds = %ld", rn->name, - info.connected_clients, info.connected_slaves, info.used_memory, - info.changes_since_last_save, info.bgsave_in_progress, - info.total_connections_received, info.total_commands_processed, - info.uptime_in_seconds); - - redis_submit_g (rn->name, "current_connections", "clients", info.connected_clients); - redis_submit_g (rn->name, "current_connections", "slaves", info.connected_slaves); - redis_submit_g (rn->name, "memory", "used", info.used_memory); - redis_submit_g (rn->name, "volatile_changes", NULL, info.changes_since_last_save); - redis_submit_d (rn->name, "total_connections", NULL, info.total_connections_received); - redis_submit_d (rn->name, "total_operations", NULL, info.total_commands_processed); - - credis_close (rh); + redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "current_connections", "clients", + "connected_clients", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "blocked_clients", NULL, + "blocked_clients", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", + DS_TYPE_GAUGE); + /* changes_since_last_save: Deprecated in redis version 2.6 and above */ + redis_handle_info(rn->name, rr->str, "volatile_changes", NULL, + "changes_since_last_save", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "total_connections", NULL, + "total_connections_received", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_operations", NULL, + "total_commands_processed", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "operations_per_second", NULL, + "instantaneous_ops_per_sec", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys", + DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys", + DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "pubsub", "channels", + "pubsub_channels", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "pubsub", "patterns", + "pubsub_patterns", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "current_connections", "slaves", + "connected_slaves", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "cache_result", "hits", + "keyspace_hits", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "cache_result", "misses", + "keyspace_misses", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_bytes", "input", + "total_net_input_bytes", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_bytes", "output", + "total_net_output_bytes", DS_TYPE_DERIVE); + + for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) + redis_handle_query(rh, rn, rq); + + redis_fail: + if (rr != NULL) + freeReplyObject(rr); + redisFree(rh); } return 0; } /* }}} */ -void module_register (void) /* {{{ */ +void module_register(void) /* {{{ */ { - plugin_register_complex_config ("redis", redis_config); - plugin_register_init ("redis", redis_init); - plugin_register_read ("redis", redis_read); + plugin_register_complex_config("redis", redis_config); + plugin_register_init("redis", redis_init); + plugin_register_read("redis", redis_read); /* TODO: plugin_register_write: one redis list per value id with * X elements */ } /* }}} */ - -/* vim: set sw=2 sts=2 et fdm=marker : */