char passwd[MAX_REDIS_PASSWD_LENGTH];
int port;
struct timeval timeout;
+ redisContext *redisContext;
redis_query_t *queries;
redis_node_t *next;
return 0;
} /* }}} int redis_init */
+static void *c_redisCommand(redis_node_t *rn, const char *format, ...) {
+ redisContext *c = rn->redisContext;
+
+ if (c == NULL)
+ return NULL;
+
+ va_list ap;
+ va_start(ap, format);
+ void *reply = redisvCommand(c, format, ap);
+ va_end(ap);
+
+ if (reply == NULL) {
+ ERROR("redis plugin: Connection error: %s", c->errstr);
+ redisFree(rn->redisContext);
+ rn->redisContext = NULL;
+ }
+
+ return reply;
+} /* void c_redisCommand */
+
static int redis_handle_info(char *node, char const *info_line,
char const *type, char const *type_instance,
char const *field_name, int ds_type) /* {{{ */
} /* }}} int redis_handle_info */
-static int redis_handle_query(redisContext *rh, redis_node_t *rn,
- redis_query_t *rq) /* {{{ */
+static int redis_handle_query(redis_node_t *rn, redis_query_t *rq) /* {{{ */
{
redisReply *rr;
const data_set_t *ds;
return -1;
}
- if ((rr = redisCommand(rh, "SELECT %d", rq->database)) == NULL) {
+ if ((rr = c_redisCommand(rn, "SELECT %d", rq->database)) == NULL) {
WARNING("redis plugin: unable to switch to database `%d' on node `%s'.",
rq->database, rn->name);
return -1;
}
- if ((rr = redisCommand(rh, rq->query)) == NULL) {
+ if ((rr = c_redisCommand(rn, rq->query)) == NULL) {
WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
return -1;
}
} /* }}} int redis_db_stats */
-static int redis_read(void) /* {{{ */
-{
- for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
- redisContext *rh;
+static void redis_check_connection(redis_node_t *rn) {
+ if (rn->redisContext)
+ return;
+
+ redisContext *rh =
+ redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
+
+ if (rh == NULL) {
+ ERROR("redis plugin: can't allocate redis context");
+ return;
+ }
+ if (rh->err) {
+ ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.", rn->name,
+ rn->host, rn->port, rh->errstr);
+ redisFree(rh);
+ return;
+ }
+
+ rn->redisContext = rh;
+
+ if (strlen(rn->passwd) > 0) {
redisReply *rr;
- DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
- rn->host, rn->port);
+ DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
+ rn->passwd);
- rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
- if (rh == NULL) {
- ERROR("redis plugin: can't allocate redis context");
- continue;
+ if ((rr = c_redisCommand(rn, "AUTH %s", rn->passwd)) == NULL) {
+ WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
+ return;
}
- if (rh->err) {
- ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.",
- rn->name, rn->host, rn->port, rh->errstr);
- redisFree(rh);
- continue;
+
+ if (rr->type != REDIS_REPLY_STATUS) {
+ WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
+ freeReplyObject(rr);
+ redisFree(rn->redisContext);
+ rn->redisContext = NULL;
+ return;
}
- if (strlen(rn->passwd) > 0) {
- DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
- rn->passwd);
+ freeReplyObject(rr);
+ }
+ return;
+} /* void redis_check_connection */
+
+static void redis_read_server_info(redis_node_t *rn) {
+ redisReply *rr;
- if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) {
- WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
- goto redis_fail;
- }
+ if ((rr = c_redisCommand(rn, "INFO")) == NULL) {
+ WARNING("redis plugin: unable to get INFO from node `%s'.", rn->name);
+ return;
+ }
- if (rr->type != REDIS_REPLY_STATUS) {
- WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
- goto redis_fail;
- }
+ redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
+ DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "current_connections", "clients",
+ "connected_clients", DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
+ "blocked_clients", DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
+ DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
+ DS_TYPE_GAUGE);
+ /* changes_since_last_save: Deprecated in redis version 2.6 and above */
+ redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
+ "changes_since_last_save", DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "total_connections", NULL,
+ "total_connections_received", DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "total_operations", NULL,
+ "total_commands_processed", DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
+ "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
+ DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
+ DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "pubsub", "channels", "pubsub_channels",
+ DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns",
+ DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
+ "connected_slaves", DS_TYPE_GAUGE);
+ redis_handle_info(rn->name, rr->str, "cache_result", "hits", "keyspace_hits",
+ DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "cache_result", "misses",
+ "keyspace_misses", DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "total_bytes", "input",
+ "total_net_input_bytes", DS_TYPE_DERIVE);
+ redis_handle_info(rn->name, rr->str, "total_bytes", "output",
+ "total_net_output_bytes", DS_TYPE_DERIVE);
+
+ redis_db_stats(rn->name, rr->str);
- freeReplyObject(rr);
- }
+ freeReplyObject(rr);
+} /* void redis_read_server_info */
- if ((rr = redisCommand(rh, "INFO")) == NULL) {
- WARNING("redis plugin: unable to get info from node `%s'.", rn->name);
- goto redis_fail;
- }
+static int redis_read(void) /* {{{ */
+{
+ for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
+ DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
+ rn->host, rn->port);
- redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
- DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "current_connections", "clients",
- "connected_clients", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
- "blocked_clients", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
- DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
- DS_TYPE_GAUGE);
- /* changes_since_last_save: Deprecated in redis version 2.6 and above */
- redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
- "changes_since_last_save", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "total_connections", NULL,
- "total_connections_received", DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "total_operations", NULL,
- "total_commands_processed", DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
- "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
- DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
- DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "pubsub", "channels",
- "pubsub_channels", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "pubsub", "patterns",
- "pubsub_patterns", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
- "connected_slaves", DS_TYPE_GAUGE);
- redis_handle_info(rn->name, rr->str, "cache_result", "hits",
- "keyspace_hits", DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "cache_result", "misses",
- "keyspace_misses", DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "total_bytes", "input",
- "total_net_input_bytes", DS_TYPE_DERIVE);
- redis_handle_info(rn->name, rr->str, "total_bytes", "output",
- "total_net_output_bytes", DS_TYPE_DERIVE);
-
- redis_db_stats(rn->name, rr->str);
-
- for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next)
- redis_handle_query(rh, rn, rq);
-
- redis_fail:
- if (rr != NULL)
- freeReplyObject(rr);
- redisFree(rh);
+ redis_check_connection(rn);
+
+ if (!rn->redisContext) /* no connection */
+ continue;
+
+ redis_read_server_info(rn);
+
+ if (!rn->redisContext) /* connection lost */
+ continue;
+
+ for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
+ redis_handle_query(rn, rq);
+ if (!rn->redisContext) /* connection lost */
+ break;
+ }
}
return 0;