X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fceph.c;h=9c53a3e76590e1ba490c83e24c26551815c935cf;hb=56536633afb68ded68ac7bd012060cb9337fbcc5;hp=62161e79fc291a9025a945082fa7bfac0148a72e;hpb=5f3082295d2fb64aa6dad9bdb995cf2bc5b780d8;p=collectd.git diff --git a/src/ceph.c b/src/ceph.c index 62161e79..9c53a3e7 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -1,6 +1,7 @@ /** * collectd - src/ceph.c * Copyright (C) 2011 New Dream Network + * Copyright (C) 2015 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -16,11 +17,13 @@ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * * Authors: - * Colin McCabe - * Dennis Zou - * Dan Ryder + * Colin McCabe + * Dennis Zou + * Dan Ryder + * Florian octo Forster **/ +#define _DEFAULT_SOURCE #define _BSD_SOURCE #include "collectd.h" @@ -30,8 +33,11 @@ #include #include #include -#include -#include /* need for struct json_object_iter */ +#include +#if HAVE_YAJL_YAJL_VERSION_H +#include +#endif + #include #include #include @@ -44,17 +50,24 @@ #include #include #include -#define MAX_RRD_DS_NAME_LEN 20 +#include +#include + +#define RETRY_AVGCOUNT -1 + +#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1) +# define HAVE_YAJL_V2 1 +#endif #define RETRY_ON_EINTR(ret, expr) \ - while(1) { \ - ret = expr; \ - if (ret >= 0) \ - break; \ - ret = -errno; \ - if (ret != -EINTR) \ - break; \ - } + while(1) { \ + ret = expr; \ + if(ret >= 0) \ + break; \ + ret = -errno; \ + if(ret != -EINTR) \ + break; \ + } /** Timeout interval in seconds */ #define CEPH_TIMEOUT_INTERVAL 1 @@ -62,1020 +75,1374 @@ /** Maximum path length for a UNIX domain socket on this system */ #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path)) +/** Yajl callback returns */ +#define CEPH_CB_CONTINUE 1 +#define CEPH_CB_ABORT 0 + +#if HAVE_YAJL_V2 +typedef size_t yajl_len_t; +#else +typedef unsigned int yajl_len_t; +#endif + +/** Number of types for ceph defined in types.db */ +#define CEPH_DSET_TYPES_NUM 3 +/** ceph types enum */ +enum ceph_dset_type_d +{ + DSET_LATENCY = 0, + DSET_BYTES = 1, + DSET_RATE = 2, + DSET_TYPE_UNFOUND = 1000 +}; + +/** Valid types for ceph defined in types.db */ +const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] = + {"ceph_latency", "ceph_bytes", "ceph_rate"}; + /******* ceph_daemon *******/ struct ceph_daemon { - /** Version of the admin_socket interface */ - uint32_t version; - /** daemon name **/ - char name[DATA_MAX_NAME_LEN]; - - int dset_num; - - /** Path to the socket that we use to talk to the ceph daemon */ - char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX]; - - /** The set of key/value pairs that this daemon reports - * dset.type The daemon name - * dset.ds_num Number of data sources (key/value pairs) - * dset.ds Dynamically allocated array of key/value pairs - */ - //struct data_set_s dset; - /** Dynamically allocated array **/ - struct data_set_s *dset; - int **pc_types; + /** Version of the admin_socket interface */ + uint32_t version; + /** daemon name **/ + char name[DATA_MAX_NAME_LEN]; + + /** Path to the socket that we use to talk to the ceph daemon */ + char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX]; + + /** Number of counters */ + int ds_num; + /** Track ds types */ + uint32_t *ds_types; + /** Track ds names to match with types */ + char **ds_names; + + /** + * Keep track of last data for latency values so we can calculate rate + * since last poll. + */ + struct last_data **last_poll_data; + /** index of last poll data */ + int last_idx; +}; + +/******* JSON parsing *******/ +typedef int (*node_handler_t)(void *, const char*, const char*); + +/** Track state and handler while parsing JSON */ +struct yajl_struct +{ + node_handler_t handler; + void * handler_arg; + + char *key; + char *stack[YAJL_MAX_DEPTH]; + size_t depth; }; +typedef struct yajl_struct yajl_struct; enum perfcounter_type_d { - PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8, + PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8, }; +/** Give user option to use default (long run = since daemon started) avg */ +static int long_run_latency_avg = 0; + +/** + * Give user option to use default type for special cases - + * filestore.journal_wr_bytes is currently only metric here. Ceph reports the + * type as a sum/count pair and will calculate it the same as a latency value. + * All other "bytes" metrics (excluding the used/capacity bytes for the OSD) + * use the DERIVE type. Unless user specifies to use given type, convert this + * metric to use DERIVE. + */ +static int convert_special_metrics = 1; + /** Array of daemons to monitor */ static struct ceph_daemon **g_daemons = NULL; /** Number of elements in g_daemons */ static int g_num_daemons = 0; -static void ceph_daemon_print(const struct ceph_daemon *d) +/** + * A set of data that we build up in memory while parsing the JSON. + */ +struct values_tmp { - DEBUG("name=%s, asok_path=%s", d->name, d->asok_path); -} + /** ceph daemon we are processing data for*/ + struct ceph_daemon *d; + /** track avgcount across counters for avgcount/sum latency pairs */ + uint64_t avgcount; + /** current index of counters - used to get type of counter */ + int index; + /** do we already have an avgcount for latency pair */ + int avgcount_exists; + /** + * similar to index, but current index of latency type counters - + * used to get last poll data of counter + */ + int latency_index; + /** + * values list - maintain across counters since + * host/plugin/plugin instance are always the same + */ + value_list_t vlist; +}; -static void ceph_daemons_print(void) +/** + * A set of count/sum pairs to keep track of latency types and get difference + * between this poll data and last poll data. + */ +struct last_data { - int i; - for (i = 0; i < g_num_daemons; ++i) - { - ceph_daemon_print(g_daemons[i]); - } -} + char ds_name[DATA_MAX_NAME_LEN]; + double last_sum; + uint64_t last_count; +}; -struct last_data **last_poll_data = NULL; -int last_idx = 0; +/******* network I/O *******/ +enum cstate_t +{ + CSTATE_UNCONNECTED = 0, + CSTATE_WRITE_REQUEST, + CSTATE_READ_VERSION, + CSTATE_READ_AMT, + CSTATE_READ_JSON, +}; -/*static void ceph_daemon_free(struct ceph_daemon *d) - { - plugin_unregister_data_set(d->dset.type); - sfree(d->dset.ds); - sfree(d); - }*/ -static void ceph_daemon_free(struct ceph_daemon *d) +enum request_type_t { - int i = 0; - for (; i < d->dset_num; i++) - { - plugin_unregister_data_set((d->dset + i)->type); - sfree(d->dset->ds); - sfree(d->pc_types[i]); - } - sfree(d->dset); - sfree(d->pc_types); - sfree(d); -} + ASOK_REQ_VERSION = 0, + ASOK_REQ_DATA = 1, + ASOK_REQ_SCHEMA = 2, + ASOK_REQ_NONE = 1000, +}; -static void compact_ds_name(char *source, char *dest) +struct cconn { - int keys_num = 0, i; - char *save_ptr = NULL, *tmp_ptr = source; - char *keys[16]; - char len_str[3]; - char tmp[DATA_MAX_NAME_LEN]; - int reserved = 0; - int offset = 0; - memset(tmp, 0, sizeof(tmp)); - if (source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0') - { - return; - } - size_t src_len = strlen(source); - snprintf(len_str, sizeof(len_str), "%zu", src_len); - unsigned char append_status = 0x0; - append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0; - append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0; - while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL) - { - tmp_ptr = NULL; - /** capitalize 1st char **/ - keys[keys_num][0] = toupper(keys[keys_num][0]); - keys_num++; - if (keys_num >= 16) - break; - } - /** concatenate each part of source string **/ - for (i = 0; i < keys_num; i++) - { - strcat(tmp, keys[i]); - } - tmp[DATA_MAX_NAME_LEN - 1] = '\0'; - /** to coordinate limitation of length of ds name from RRD - * we will truncate ds_name - * when the its length is more than - * MAX_RRD_DS_NAME_LEN - */ - if (strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1) - { - append_status |= 0x4; - /** we should reserve space for - * len_str - */ - reserved += 2; - } - if (append_status & 0x1) - { - /** we should reserve space for - * "Minus" - */ - reserved += 5; - } - if (append_status & 0x2) - { - /** we should reserve space for - * "Plus" - */ - reserved += 4; - } - snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp); - offset = strlen(dest); - switch (append_status) - { - case 0x1: - memcpy(dest + offset, "Minus", 5); - break; - case 0x2: - memcpy(dest + offset, "Plus", 5); - break; - case 0x4: - memcpy(dest + offset, len_str, 2); - break; - case 0x5: - memcpy(dest + offset, "Minus", 5); - memcpy(dest + offset + 5, len_str, 2); - break; - case 0x6: - memcpy(dest + offset, "Plus", 4); - memcpy(dest + offset + 4, len_str, 2); - break; - default: - break; - } + /** The Ceph daemon that we're talking to */ + struct ceph_daemon *d; + + /** Request type */ + uint32_t request_type; + + /** The connection state */ + enum cstate_t state; + + /** The socket we use to talk to this daemon */ + int asok; + + /** The amount of data remaining to read / write. */ + uint32_t amt; + + /** Length of the JSON to read */ + uint32_t json_len; + + /** Buffer containing JSON data */ + unsigned char *json; + + /** Keep data important to yajl processing */ + struct yajl_struct yajl; +}; + +static int ceph_cb_null(void *ctx) +{ + return CEPH_CB_CONTINUE; } -static int parse_keys(const char *key_str, char *dset_name, char *ds_name) + +static int ceph_cb_boolean(void *ctx, int bool_val) { - char *ptr, *rptr; - size_t dset_name_len = 0; - size_t ds_name_len = 0; - char tmp_ds_name[DATA_MAX_NAME_LEN]; - memset(tmp_ds_name, 0, sizeof(tmp_ds_name)); - if (dset_name == NULL || ds_name == NULL || key_str == NULL - || key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0') - { - return -1; - } - if ((ptr = strchr(key_str, '.')) == NULL - || (rptr = strrchr(key_str, '.')) == NULL) - { - strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1); - strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1); - goto compact; - } - dset_name_len = - (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ? - (DATA_MAX_NAME_LEN - 1) : (ptr - key_str); - memcpy(dset_name, key_str, dset_name_len); - ds_name_len = - (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr); - if (ds_name_len == 0) - { /** only have two keys **/ - if (!strncmp(rptr + 1, "type", 4)) - {/** if last key is "type",ignore **/ - strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1); - } - else - {/** if last key isn't "type", copy last key **/ - strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1); - } - } - else if (!strncmp(rptr + 1, "type", 4)) - {/** more than two keys **/ - memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1); - } - else - {/** copy whole keys **/ - strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1); - } - compact: compact_ds_name(tmp_ds_name, ds_name); - return 0; + return CEPH_CB_CONTINUE; } -int get_matching_dset(const struct ceph_daemon *d, const char *name) +#define BUFFER_ADD(dest, src) do { \ + size_t dest_size = sizeof (dest); \ + strncat ((dest), (src), dest_size - strlen (dest)); \ + (dest)[dest_size - 1] = 0; \ +} while (0) + +static int +ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) { - int idx; - for (idx = 0; idx < d->dset_num; ++idx) - { - if (strcmp(d->dset[idx].type, name) == 0) - { - return idx; - } - } - return -1; + yajl_struct *state = (yajl_struct*) ctx; + char buffer[number_len+1]; + char key[2 * DATA_MAX_NAME_LEN]; + _Bool latency_type = 0; + size_t i; + int status; + + memcpy(buffer, number_val, number_len); + buffer[sizeof(buffer) - 1] = 0; + + memset (key, 0, sizeof (key)); + for (i = 0; i < state->depth; i++) + { + if (state->stack[i] == NULL) + continue; + + if (strlen (key) != 0) + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->stack[i]); + } + + /* Special case for latency metrics. */ + if ((strcmp ("avgcount", state->key) == 0) + || (strcmp ("sum", state->key) == 0)) + { + latency_type = 1; + + /* Super-special case for filestore.journal_wr_bytes.avgcount: For + * some reason, Ceph schema encodes this as a count/sum pair while all + * other "Bytes" data (excluding used/capacity bytes for OSD space) uses + * a single "Derive" type. To spare further confusion, keep this KPI as + * the same type of other "Bytes". Instead of keeping an "average" or + * "rate", use the "sum" in the pair and assign that to the derive + * value. */ + if (convert_special_metrics && (state->depth >= 2) + && (strcmp("filestore", state->stack[state->depth - 2]) == 0) + && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) + && (strcmp("avgcount", state->key) == 0)) + { + DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); + return CEPH_CB_CONTINUE; + } + } + else /* not a latency type */ + { + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->key); + } + + status = state->handler(state->handler_arg, buffer, key); + if((status == RETRY_AVGCOUNT) && latency_type) + { + /* Add previously skipped part of the key, either "avgcount" or "sum", + * and try again. */ + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->key); + + status = state->handler(state->handler_arg, buffer, key); + } + + if (status != 0) + { + ERROR("ceph plugin: JSON handler failed with status %d.", status); + return CEPH_CB_ABORT; + } + + return CEPH_CB_CONTINUE; } -int get_matching_value(const struct data_set_s *dset, const char *name, - int num_values) +static int ceph_cb_string(void *ctx, const unsigned char *string_val, + yajl_len_t string_len) { - int idx; - for (idx = 0; idx < num_values; ++idx) - { - if (strcmp(dset->ds[idx].name, name) == 0) - { - return idx; - } - } - return -1; + return CEPH_CB_CONTINUE; } -static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, - int pc_type) +static int ceph_cb_start_map(void *ctx) { - struct data_source_s *ds; - struct data_set_s *dset; - struct data_set_s *dset_array; - int **pc_types_array = NULL; - int *pc_types; - int *pc_types_new; - int idx = 0; - if (strlen(name) + 1 > DATA_MAX_NAME_LEN) - return -ENAMETOOLONG; - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; - memset(dset_name, 0, sizeof(dset_name)); - memset(ds_name, 0, sizeof(ds_name)); - if (parse_keys(name, dset_name, ds_name)) - return 1; - idx = get_matching_dset(d, dset_name); - if (idx == -1) - {/* need to add a dset **/ - dset_array = realloc(d->dset, - sizeof(struct data_set_s) * (d->dset_num + 1)); - if (!dset_array) - return -ENOMEM; - pc_types_array = realloc(d->pc_types, - sizeof(int *) * (d->dset_num + 1)); - if (!pc_types_array) - return -ENOMEM; - dset = &dset_array[d->dset_num]; - /** this step is very important, otherwise, - * realloc for dset->ds will tricky because of - * a random addr in dset->ds - */ - memset(dset, 0, sizeof(struct data_set_s)); - dset->ds_num = 0; - snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name); - pc_types = pc_types_array[d->dset_num] = NULL; - d->dset = dset_array; - } - else - { - dset = &d->dset[idx]; - pc_types = d->pc_types[idx]; - } - struct data_source_s *ds_array = realloc(dset->ds, - sizeof(struct data_source_s) * (dset->ds_num + 1)); - if (!ds_array) - { - return -ENOMEM; - } - pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1)); - if (!pc_types_new) - { - return -ENOMEM; - } - dset->ds = ds_array; - if (idx == -1) - { - pc_types_array[d->dset_num] = pc_types_new; - d->pc_types = pc_types_array; - d->pc_types[d->dset_num][dset->ds_num] = pc_type; - d->dset_num++; - } - else - { - d->pc_types[idx] = pc_types_new; - d->pc_types[idx][dset->ds_num] = pc_type; - } - ds = &ds_array[dset->ds_num++]; - snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name); - ds->type = - (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE; - ds->min = 0; - ds->max = NAN; - return 0; + yajl_struct *state = (yajl_struct*) ctx; + + /* Push key to the stack */ + if (state->depth == YAJL_MAX_DEPTH) + return CEPH_CB_ABORT; + + state->stack[state->depth] = state->key; + state->depth++; + state->key = NULL; + + return CEPH_CB_CONTINUE; } -/******* ceph_config *******/ -static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len) +static int ceph_cb_end_map(void *ctx) { - const char *val; - if (item->values_num != 1) - { - return -ENOTSUP; - } - if (item->values[0].type != OCONFIG_TYPE_STRING) - { - return -ENOTSUP; - } - val = item->values[0].value.string; - if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1)) - { - ERROR("ceph plugin: configuration parameter '%s' is too long.\n", - item->key); - return -ENAMETOOLONG; - } - return 0; + yajl_struct *state = (yajl_struct*) ctx; + + /* Pop key from the stack */ + if (state->depth == 0) + return CEPH_CB_ABORT; + + sfree (state->key); + state->depth--; + state->key = state->stack[state->depth]; + state->stack[state->depth] = NULL; + + return CEPH_CB_CONTINUE; } -static int cc_add_daemon_config(oconfig_item_t *ci) +static int +ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len) { - int ret, i; - struct ceph_daemon *array, *nd, cd; - memset(&cd, 0, sizeof(struct ceph_daemon)); + yajl_struct *state = (yajl_struct*) ctx; + size_t sz = ((size_t) string_len) + 1; - if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) - { - WARNING("ceph plugin: `Daemon' blocks need exactly one string argument."); - return (-1); - } + sfree (state->key); + state->key = malloc (sz); + if (state->key == NULL) + { + ERROR ("ceph plugin: malloc failed."); + return CEPH_CB_ABORT; + } - ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN); - if (ret) - return ret; + memmove (state->key, key, sz - 1); + state->key[sz - 1] = 0; - for (i=0; i < ci->children_num; i++) - { - oconfig_item_t *child = ci->children + i; + return CEPH_CB_CONTINUE; +} - if (strcasecmp("SocketPath", child->key) == 0) - { - ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path)); - if (ret) - return ret; - } - else - { - WARNING("ceph plugin: ignoring unknown option %s", child->key); - } - } - if (cd.name[0] == '\0') - { - ERROR("ceph plugin: you must configure a daemon name.\n"); - return -EINVAL; - } - else if (cd.asok_path[0] == '\0') - { - ERROR("ceph plugin(name=%s): you must configure an administrative " - "socket path.\n", cd.name); - return -EINVAL; - } - else if (!((cd.asok_path[0] == '/') - || (cd.asok_path[0] == '.' && cd.asok_path[1] == '/'))) - { - ERROR("ceph plugin(name=%s): administrative socket paths must begin with " - "'/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path); - return -EINVAL; - } - array = realloc(g_daemons, - sizeof(struct ceph_daemon *) * (g_num_daemons + 1)); - if (array == NULL) - { - /* The positive return value here indicates that this is a - * runtime error, not a configuration error. */ - return ENOMEM; - } - g_daemons = (struct ceph_daemon**) array; - nd = malloc(sizeof(struct ceph_daemon)); - if (!nd) - return ENOMEM; - memcpy(nd, &cd, sizeof(struct ceph_daemon)); - g_daemons[g_num_daemons++] = nd; - return 0; +static int ceph_cb_start_array(void *ctx) +{ + return CEPH_CB_CONTINUE; } -static int ceph_config(oconfig_item_t *ci) +static int ceph_cb_end_array(void *ctx) { - int ret, i; - - for (i = 0; i < ci->children_num; ++i) - { - oconfig_item_t *child = ci->children + i; - if (strcasecmp("Daemon", child->key) == 0) - { - ret = cc_add_daemon_config(child); - if (ret) - return ret; - } - else - { - WARNING("ceph plugin: ignoring unknown option %s", child->key); - } - } - return 0; + return CEPH_CB_CONTINUE; } -/******* JSON parsing *******/ -typedef int (*node_handler_t)(void*, json_object*, const char*); +static yajl_callbacks callbacks = { + ceph_cb_null, + ceph_cb_boolean, + NULL, + NULL, + ceph_cb_number, + ceph_cb_string, + ceph_cb_start_map, + ceph_cb_map_key, + ceph_cb_end_map, + ceph_cb_start_array, + ceph_cb_end_array +}; -/** Perform a depth-first traversal of the JSON parse tree, - * calling node_handler at each node.*/ -static int traverse_json_impl(json_object *jo, char *key, int max_key, - node_handler_t handler, void *handler_arg) +static void ceph_daemon_print(const struct ceph_daemon *d) { - struct json_object_iter iter; - int ret, plen, klen; - - if (json_object_get_type(jo) != json_type_object) - return 0; - plen = strlen(key); - json_object_object_foreachC(jo, iter) - { - klen = strlen(iter.key); - if (plen + klen + 2 > max_key) - return -ENAMETOOLONG; - if (plen != 0) - strncat(key, ".", max_key); /* really should be strcat */ - strncat(key, iter.key, max_key); - - ret = handler(handler_arg, iter.val, key); - if (ret == 1) - { - ret = traverse_json_impl(iter.val, key, max_key, handler, - handler_arg); - } - else if (ret != 0) - { - return ret; - } - - key[plen] = '\0'; - } - return 0; + DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path); } -static int traverse_json(const char *json, node_handler_t handler, - void *handler_arg) +static void ceph_daemons_print(void) { - json_object *root; - char buf[128]; - buf[0] = '\0'; - root = json_tokener_parse(json); - if (!root) - return -EDOM; - int result = traverse_json_impl(root, buf, sizeof(buf), handler, handler_arg); - json_object_put(root); - return result; + int i; + for(i = 0; i < g_num_daemons; ++i) + { + ceph_daemon_print(g_daemons[i]); + } } -static int node_handler_define_schema(void *arg, json_object *jo, - const char *key) +static void ceph_daemon_free(struct ceph_daemon *d) { - struct ceph_daemon *d = (struct ceph_daemon *) arg; - int pc_type; - if (json_object_get_type(jo) == json_type_object) - return 1; - else if (json_object_get_type(jo) != json_type_int) - return -EDOM; - pc_type = json_object_get_int(jo); - DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)", - d->name, key, pc_type); - return ceph_daemon_add_ds_entry(d, key, pc_type); + int i = 0; + for(; i < d->last_idx; i++) + { + sfree(d->last_poll_data[i]); + } + sfree(d->last_poll_data); + d->last_poll_data = NULL; + d->last_idx = 0; + for(i = 0; i < d->ds_num; i++) + { + sfree(d->ds_names[i]); + } + sfree(d->ds_types); + sfree(d->ds_names); + sfree(d); } -struct values_holder + +/* compact_ds_name removed the special characters ":", "_", "-" and "+" from the + * intput string. Characters following these special characters are capitalized. + * Trailing "+" and "-" characters are replaces with the strings "Plus" and + * "Minus". */ +static int compact_ds_name (char *buffer, size_t buffer_size, char const *src) { - int values_len; - value_t *values; -}; + char *src_copy; + size_t src_len; + char *ptr = buffer; + size_t ptr_size = buffer_size; + _Bool append_plus = 0; + _Bool append_minus = 0; + + if ((buffer == NULL) || (buffer_size <= strlen ("Minus")) || (src == NULL)) + return EINVAL; + + src_copy = strdup (src); + src_len = strlen(src); + + /* Remove trailing "+" and "-". */ + if (src_copy[src_len - 1] == '+') + { + append_plus = 1; + src_len--; + src_copy[src_len] = 0; + } + else if (src_copy[src_len - 1] == '-') + { + append_minus = 1; + src_len--; + src_copy[src_len] = 0; + } + + /* Split at special chars, capitalize first character, append to buffer. */ + char *dummy = src_copy; + char *token; + char *save_ptr = NULL; + while ((token = strtok_r (dummy, ":_-+", &save_ptr)) != NULL) + { + size_t len; + + dummy = NULL; + + token[0] = toupper ((int) token[0]); + + assert (ptr_size > 1); + + len = strlen (token); + if (len >= ptr_size) + len = ptr_size - 1; + + assert (len > 0); + assert (len < ptr_size); + + sstrncpy (ptr, token, len + 1); + ptr += len; + ptr_size -= len; + + assert (*ptr == 0); + if (ptr_size <= 1) + break; + } + + /* Append "Plus" or "Minus" if "+" or "-" has been stripped above. */ + if (append_plus || append_minus) + { + char const *append = "Plus"; + if (append_minus) + append = "Minus"; + + size_t offset = buffer_size - (strlen (append) + 1); + if (offset > strlen (buffer)) + offset = strlen (buffer); + + sstrncpy (buffer + offset, append, buffer_size - offset); + } + + sfree (src_copy); + return 0; +} -/** A set of values_t data that we build up in memory while parsing the JSON. */ -struct values_tmp +static _Bool has_suffix (char const *str, char const *suffix) { - struct ceph_daemon *d; - int holder_num; - struct values_holder vh[0]; -}; + size_t str_len = strlen (str); + size_t suffix_len = strlen (suffix); + size_t offset; -struct last_data + if (suffix_len > str_len) + return 0; + offset = str_len - suffix_len; + + if (strcmp (str + offset, suffix) == 0) + return 1; + + return 0; +} + +/* count_parts returns the number of elements a "foo.bar.baz" style key has. */ +static size_t count_parts (char const *key) { - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; - double last_sum; - uint64_t last_count; -}; + char const *ptr; + size_t parts_num = 0; + + for (ptr = key; ptr != NULL; ptr = strchr (ptr + 1, '.')) + parts_num++; + + return parts_num; +} -int add_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count) +/** + * Parse key to remove "type" if this is for schema and initiate compaction + */ +static int parse_keys (char *buffer, size_t buffer_size, const char *key_str) { - last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data)); - if(!last_poll_data[last_idx]) + char tmp[2 * buffer_size]; + + if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen (key_str) == 0) + return EINVAL; + + if ((count_parts (key_str) > 2) && has_suffix (key_str, ".type")) + { + /* strip ".type" suffix iff the key has more than two parts. */ + size_t sz = strlen (key_str) - strlen (".type") + 1; + + if (sz > sizeof (tmp)) + sz = sizeof (tmp); + sstrncpy (tmp, key_str, sz); + } + else + { + sstrncpy (tmp, key_str, sizeof (tmp)); + } + + return compact_ds_name (buffer, buffer_size, tmp); +} + +/** + * while parsing ceph admin socket schema, save counter name and type for later + * data processing + */ +static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, + int pc_type) +{ + uint32_t type; + char ds_name[DATA_MAX_NAME_LEN]; + memset(ds_name, 0, sizeof(ds_name)); + + if(convert_special_metrics) + { + /** + * Special case for filestore:JournalWrBytes. For some reason, Ceph + * schema encodes this as a count/sum pair while all other "Bytes" data + * (excluding used/capacity bytes for OSD space) uses a single "Derive" + * type. To spare further confusion, keep this KPI as the same type of + * other "Bytes". Instead of keeping an "average" or "rate", use the + * "sum" in the pair and assign that to the derive value. + */ + if((strcmp(name,"filestore.journal_wr_bytes.type") == 0)) { - return ENOMEM; + pc_type = 10; } - sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,sizeof(last_poll_data[last_idx]->dset_name)); - sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,sizeof(last_poll_data[last_idx]->ds_name)); - last_poll_data[last_idx]->last_sum = cur_sum; - last_poll_data[last_idx]->last_count = cur_count; - last_idx++; + } + + d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1)); + if(!d->ds_names) + { + return -ENOMEM; + } + + d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1)); + if(!d->ds_types) + { + return -ENOMEM; + } + + d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN); + if(!d->ds_names[d->ds_num]) + { + return -ENOMEM; + } + + type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE : + ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES); + d->ds_types[d->ds_num] = type; + + if (parse_keys(ds_name, sizeof (ds_name), name)) + { return 1; + } + + sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1); + d->ds_num = (d->ds_num + 1); + + return 0; } -int update_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count) +/******* ceph_config *******/ +static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len) { - int i; - for(i = 0; i < last_idx; i++) - { - if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0) - { - if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0) - { - last_poll_data[i]->last_sum = cur_sum; - last_poll_data[i]->last_count = cur_count; - return 1; - } - } - } + const char *val; + if(item->values_num != 1) + { + return -ENOTSUP; + } + if(item->values[0].type != OCONFIG_TYPE_STRING) + { + return -ENOTSUP; + } + val = item->values[0].value.string; + if(snprintf(dest, dest_len, "%s", val) > (dest_len - 1)) + { + ERROR("ceph plugin: configuration parameter '%s' is too long.\n", + item->key); + return -ENAMETOOLONG; + } + return 0; +} - if(NULL == last_poll_data) +static int cc_handle_bool(struct oconfig_item_s *item, int *dest) +{ + if(item->values_num != 1) + { + return -ENOTSUP; + } + + if(item->values[0].type != OCONFIG_TYPE_BOOLEAN) + { + return -ENOTSUP; + } + + *dest = (item->values[0].value.boolean) ? 1 : 0; + return 0; +} + +static int cc_add_daemon_config(oconfig_item_t *ci) +{ + int ret, i; + struct ceph_daemon *nd, cd; + struct ceph_daemon **tmp; + memset(&cd, 0, sizeof(struct ceph_daemon)); + + if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING("ceph plugin: `Daemon' blocks need exactly one string " + "argument."); + return (-1); + } + + ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN); + if(ret) + { + return ret; + } + + for(i=0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if(strcasecmp("SocketPath", child->key) == 0) { - last_poll_data = malloc(1 * sizeof(struct last_data *)); - if(!last_poll_data) - { - return ENOMEM; - } + ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path)); + if(ret) + { + return ret; + } } else { - struct last_data **tmp_last = realloc(last_poll_data, ((last_idx+1) * sizeof(struct last_data *))); - if(!tmp_last) - { - return ENOMEM; - } - last_poll_data = tmp_last; + WARNING("ceph plugin: ignoring unknown option %s", child->key); } - add_last(dset_n,ds_n,cur_sum,cur_count); - return -1; + } + if(cd.name[0] == '\0') + { + ERROR("ceph plugin: you must configure a daemon name.\n"); + return -EINVAL; + } + else if(cd.asok_path[0] == '\0') + { + ERROR("ceph plugin(name=%s): you must configure an administrative " + "socket path.\n", cd.name); + return -EINVAL; + } + else if(!((cd.asok_path[0] == '/') || + (cd.asok_path[0] == '.' && cd.asok_path[1] == '/'))) + { + ERROR("ceph plugin(name=%s): administrative socket paths must begin " + "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path); + return -EINVAL; + } + + tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons)); + if(tmp == NULL) + { + /* The positive return value here indicates that this is a + * runtime error, not a configuration error. */ + return ENOMEM; + } + g_daemons = tmp; + + nd = malloc(sizeof(*nd)); + if(!nd) + { + return ENOMEM; + } + memcpy(nd, &cd, sizeof(*nd)); + g_daemons[g_num_daemons++] = nd; + return 0; } -double get_last_avg(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count) +static int ceph_config(oconfig_item_t *ci) { - int i; - double result = -1.1; - double sum_delt = 0.0; - uint64_t count_delt = 0; - for(i = 0; i < last_idx; i++) + int ret, i; + + for(i = 0; i < ci->children_num; ++i) + { + oconfig_item_t *child = ci->children + i; + if(strcasecmp("Daemon", child->key) == 0) { - if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0) - { - if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0) - { - if(cur_count < last_poll_data[i]->last_count) - { - break; - } - sum_delt = (cur_sum - last_poll_data[i]->last_sum); - count_delt = (cur_count - last_poll_data[i]->last_count); - result = (sum_delt / count_delt); - break; - } - } + ret = cc_add_daemon_config(child); + if(ret == ENOMEM) + { + ERROR("ceph plugin: Couldn't allocate memory"); + return ret; + } + else if(ret) + { + //process other daemons and ignore this one + continue; + } } + else if(strcasecmp("LongRunAvgLatency", child->key) == 0) + { + ret = cc_handle_bool(child, &long_run_latency_avg); + if(ret) + { + return ret; + } + } + else if(strcasecmp("ConvertSpecialMetricTypes", child->key) == 0) + { + ret = cc_handle_bool(child, &convert_special_metrics); + if(ret) + { + return ret; + } + } + else + { + WARNING("ceph plugin: ignoring unknown option %s", child->key); + } + } + return 0; +} - result = (result == -1.1) ? NAN : result; - update_last(dset_n,ds_n,cur_sum,cur_count); - return result; +/** + * Parse JSON and get error message if present + */ +static int +traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand) +{ + yajl_status status = yajl_parse(hand, json, json_len); + unsigned char *msg; + + switch(status) + { + case yajl_status_error: + msg = yajl_get_error(hand, /* verbose = */ 1, + /* jsonText = */ (unsigned char *) json, + (unsigned int) json_len); + ERROR ("ceph plugin: yajl_parse failed: %s", msg); + yajl_free_error(hand, msg); + return 1; + case yajl_status_client_canceled: + return 1; + default: + return 0; + } } -static int node_handler_fetch_data(void *arg, json_object *jo, const char *key) +/** + * Add entry for each counter while parsing schema + */ +static int +node_handler_define_schema(void *arg, const char *val, const char *key) { - int dset_idx, ds_idx; - value_t *uv; - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; - struct values_tmp *vtmp = (struct values_tmp*) arg; - memset(dset_name, 0, sizeof(dset_name)); - memset(ds_name, 0, sizeof(ds_name)); - if (parse_keys(key, dset_name, ds_name)) - return 1;DEBUG("enter node_handler_fetch_data"); - dset_idx = get_matching_dset(vtmp->d, dset_name); - if (dset_idx == -1) - return 1; - ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name, - vtmp->d->dset[dset_idx].ds_num); - if (ds_idx == -1) - return 1;DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d", - dset_name,ds_name,dset_idx,ds_idx); - uv = &(vtmp->vh[dset_idx].values[ds_idx]); - if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY) - { - json_object *avgcount, *sum; - uint64_t avgcounti; - double sumd; - if (json_object_get_type(jo) != json_type_object) - return -EINVAL; - avgcount = json_object_object_get(jo, "avgcount"); - sum = json_object_object_get(jo, "sum"); - if ((!avgcount) || (!sum)) - return -EINVAL; - avgcounti = json_object_get_int(avgcount); - DEBUG("avgcounti:%ld",avgcounti); - if (avgcounti == 0) - avgcounti = 1; - sumd = json_object_get_double(sum); - DEBUG("sumd:%lf",sumd); - double last_avg = get_last_avg(dset_name, ds_name, sumd, avgcounti); - uv->gauge = last_avg; - DEBUG("uv->gauge = (sumd_now - sumd_last) / (avgcounti_now - avgcounti_last) = :%lf",uv->gauge); - } - else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE) - { - /* We use json_object_get_double here because anything > 32 - * bits may get truncated by json_object_get_int */ - uv->derive = (uint64_t) json_object_get_double(jo); - DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive); - } - else - { - uv->gauge = json_object_get_double(jo); - DEBUG("uv->gauge %lf",uv->gauge); - } - return 0; + struct ceph_daemon *d = (struct ceph_daemon *) arg; + int pc_type; + pc_type = atoi(val); + return ceph_daemon_add_ds_entry(d, key, pc_type); } -/******* network I/O *******/ -enum cstate_t +/** + * Latency counter does not yet have an entry in last poll data - add it. + */ +static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum, + uint64_t cur_count) { - CSTATE_UNCONNECTED = 0, - CSTATE_WRITE_REQUEST, - CSTATE_READ_VERSION, - CSTATE_READ_AMT, - CSTATE_READ_JSON, -}; + d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data)); + if(!d->last_poll_data[d->last_idx]) + { + return -ENOMEM; + } + sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n, + sizeof(d->last_poll_data[d->last_idx]->ds_name)); + d->last_poll_data[d->last_idx]->last_sum = cur_sum; + d->last_poll_data[d->last_idx]->last_count = cur_count; + d->last_idx = (d->last_idx + 1); + return 0; +} -enum request_type_t +/** + * Update latency counter or add new entry if it doesn't exist + */ +static int update_last(struct ceph_daemon *d, const char *ds_n, int index, + double cur_sum, uint64_t cur_count) { - ASOK_REQ_VERSION = 0, - ASOK_REQ_DATA = 1, - ASOK_REQ_SCHEMA = 2, - ASOK_REQ_NONE = 1000, -}; + if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)) + { + d->last_poll_data[index]->last_sum = cur_sum; + d->last_poll_data[index]->last_count = cur_count; + return 0; + } -struct cconn + if(!d->last_poll_data) + { + d->last_poll_data = malloc(1 * sizeof(struct last_data *)); + if(!d->last_poll_data) + { + return -ENOMEM; + } + } + else + { + struct last_data **tmp_last = realloc(d->last_poll_data, + ((d->last_idx+1) * sizeof(struct last_data *))); + if(!tmp_last) + { + return -ENOMEM; + } + d->last_poll_data = tmp_last; + } + return add_last(d, ds_n, cur_sum, cur_count); +} + +/** + * If using index guess failed (shouldn't happen, but possible if counters + * get rearranged), resort to searching for counter name + */ +static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n) { - /** The Ceph daemon that we're talking to */ - struct ceph_daemon *d; + int i = 0; + for(; i < d->last_idx; i++) + { + if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0) + { + return i; + } + } + return -1; +} - /** Request type */ - uint32_t request_type; +/** + * Calculate average b/t current data and last poll data + * if last poll data exists + */ +static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index, + double cur_sum, uint64_t cur_count) +{ + double result = -1.1, sum_delt = 0.0; + uint64_t count_delt = 0; + int tmp_index = 0; + if(d->last_idx > index) + { + if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0) + { + tmp_index = index; + } + //test previous index + else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0)) + { + tmp_index = (index - 1); + } + else + { + tmp_index = backup_search_for_last_avg(d, ds_n); + } - /** The connection state */ - enum cstate_t state; + if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count)) + { + sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum); + count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count); + result = (sum_delt / count_delt); + } + } + + if(result == -1.1) + { + result = NAN; + } + if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM) + { + return -ENOMEM; + } + return result; +} - /** The socket we use to talk to this daemon */ - int asok; +/** + * If using index guess failed, resort to searching for counter name + */ +static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name) +{ + int idx = 0; + for(; idx < d->ds_num; idx++) + { + if(strcmp(d->ds_names[idx], ds_name) == 0) + { + return d->ds_types[idx]; + } + } + return DSET_TYPE_UNFOUND; +} - /** The amount of data remaining to read / write. */ - uint32_t amt; +/** + * Process counter data and dispatch values + */ +static int node_handler_fetch_data(void *arg, const char *val, const char *key) +{ + value_t uv; + double tmp_d; + uint64_t tmp_u; + struct values_tmp *vtmp = (struct values_tmp*) arg; + uint32_t type = DSET_TYPE_UNFOUND; + int index = vtmp->index; + + char ds_name[DATA_MAX_NAME_LEN]; + memset(ds_name, 0, sizeof(ds_name)); + + if (parse_keys (ds_name, sizeof (ds_name), key)) + { + return 1; + } + + if(index >= vtmp->d->ds_num) + { + //don't overflow bounds of array + index = (vtmp->d->ds_num - 1); + } + + /** + * counters should remain in same order we parsed schema... we maintain the + * index variable to keep track of current point in list of counters. first + * use index to guess point in array for retrieving type. if that doesn't + * work, use the old way to get the counter type + */ + if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0) + { + //found match + type = vtmp->d->ds_types[index]; + } + else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0)) + { + //try previous key + type = vtmp->d->ds_types[index-1]; + } + + if(type == DSET_TYPE_UNFOUND) + { + //couldn't find right type by guessing, check the old way + type = backup_search_for_type(vtmp->d, ds_name); + } + + switch(type) + { + case DSET_LATENCY: + if(vtmp->avgcount_exists == -1) + { + sscanf(val, "%" PRIu64, &vtmp->avgcount); + vtmp->avgcount_exists = 0; + //return after saving avgcount - don't dispatch value + //until latency calculation + return 0; + } + else + { + double sum, result; + sscanf(val, "%lf", &sum); + + if(vtmp->avgcount == 0) + { + vtmp->avgcount = 1; + } - /** Length of the JSON to read */ - uint32_t json_len; + /** User wants latency values as long run avg */ + if(long_run_latency_avg) + { + result = (sum / vtmp->avgcount); + } + else + { + result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount); + if(result == -ENOMEM) + { + return -ENOMEM; + } + } - /** Buffer containing JSON data */ - char *json; -}; + uv.gauge = result; + vtmp->avgcount_exists = -1; + vtmp->latency_index = (vtmp->latency_index + 1); + } + break; + case DSET_BYTES: + sscanf(val, "%lf", &tmp_d); + uv.gauge = tmp_d; + break; + case DSET_RATE: + sscanf(val, "%" PRIu64, &tmp_u); + uv.derive = tmp_u; + break; + case DSET_TYPE_UNFOUND: + default: + ERROR("ceph plugin: ds %s was not properly initialized.", ds_name); + return -1; + } + + sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type)); + sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance)); + vtmp->vlist.values = &uv; + vtmp->vlist.values_len = 1; + + vtmp->index = (vtmp->index + 1); + plugin_dispatch_values(&vtmp->vlist); + + return 0; +} static int cconn_connect(struct cconn *io) { - struct sockaddr_un address; - int flags, fd, err; - if (io->state != CSTATE_UNCONNECTED) - { - ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED"); - return -EDOM; - } - fd = socket(PF_UNIX, SOCK_STREAM, 0); - if (fd < 0) - { - int err = -errno; - ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: " - "error %d", err); - return err; - } - memset(&address, 0, sizeof(struct sockaddr_un)); - address.sun_family = AF_UNIX; - snprintf(address.sun_path, sizeof(address.sun_path), "%s", - io->d->asok_path); - RETRY_ON_EINTR(err, - connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un))); - if (err < 0) - { - ERROR("cconn_connect: connect(%d) failed: error %d", fd, err); - return err; - } - - flags = fcntl(fd, F_GETFL, 0); - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) - { - err = -errno; - ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err); - return err; - } - io->asok = fd; - io->state = CSTATE_WRITE_REQUEST; - io->amt = 0; - io->json_len = 0; - io->json = NULL; - return 0; + struct sockaddr_un address; + int flags, fd, err; + if(io->state != CSTATE_UNCONNECTED) + { + ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED"); + return -EDOM; + } + fd = socket(PF_UNIX, SOCK_STREAM, 0); + if(fd < 0) + { + err = -errno; + ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) " + "failed: error %d", err); + return err; + } + memset(&address, 0, sizeof(struct sockaddr_un)); + address.sun_family = AF_UNIX; + snprintf(address.sun_path, sizeof(address.sun_path), "%s", + io->d->asok_path); + RETRY_ON_EINTR(err, + connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un))); + if(err < 0) + { + ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d", + fd, err); + close(fd); + return err; + } + + flags = fcntl(fd, F_GETFL, 0); + if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) + { + err = -errno; + ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d", + fd, err); + close(fd); + return err; + } + io->asok = fd; + io->state = CSTATE_WRITE_REQUEST; + io->amt = 0; + io->json_len = 0; + io->json = NULL; + return 0; } static void cconn_close(struct cconn *io) { - io->state = CSTATE_UNCONNECTED; - if (io->asok != -1) - { - int res; - RETRY_ON_EINTR(res, close(io->asok)); - } - io->asok = -1; - io->amt = 0; - io->json_len = 0; - sfree(io->json); - io->json = NULL; + io->state = CSTATE_UNCONNECTED; + if(io->asok != -1) + { + int res; + RETRY_ON_EINTR(res, close(io->asok)); + } + io->asok = -1; + io->amt = 0; + io->json_len = 0; + sfree(io->json); + io->json = NULL; } /* Process incoming JSON counter data */ -/*static int cconn_process_data(struct cconn *io) - { - int ret; - value_list_t vl = VALUE_LIST_INIT; - struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) + - (sizeof(value_t) * io->d->dset.ds_num)); - if (!vtmp) - return -ENOMEM; - vtmp->d = io->d; - vtmp->values_len = io->d->dset.ds_num; - ret = traverse_json(io->json, node_handler_fetch_data, vtmp); - if (ret) - goto done; - sstrncpy(vl.host, hostname_g, sizeof(vl.host)); - sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin)); - sstrncpy(vl.type, io->d->dset.type, sizeof(vl.type)); - vl.values = vtmp->values; - vl.values_len = vtmp->values_len; - DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"", - io->d->dset.type, vl.values_len, io->json); - ret = plugin_dispatch_values(&vl); - done: - sfree(vtmp); - return ret; - }*/ -static int cconn_process_data(struct cconn *io) +static int +cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand) { - int i, ret = 0; - struct values_tmp *vtmp = calloc(1, - sizeof(struct values_tmp) - + (sizeof(struct values_holder)) * io->d->dset_num); - if (!vtmp) - return -ENOMEM; - for (i = 0; i < io->d->dset_num; i++) - { - value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num)); - vtmp->vh[i].values = val; - vtmp->vh[i].values_len = io->d->dset[i].ds_num; - } - vtmp->d = io->d; - vtmp->holder_num = io->d->dset_num; - ret = traverse_json(io->json, node_handler_fetch_data, vtmp); - if (ret) - goto done; - for (i = 0; i < vtmp->holder_num; i++) - { - value_list_t vl = VALUE_LIST_INIT; - sstrncpy(vl.host, hostname_g, sizeof(vl.host)); - sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin)); - strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance)); - sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type)); - vl.values = vtmp->vh[i].values; - vl.values_len = vtmp->vh[i].values_len; - DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"", - io->d->name, vl.values_len, io->json); - ret = plugin_dispatch_values(&vl); - if (ret) - goto done; - } - - done: for (i = 0; i < vtmp->holder_num; i++) - { - sfree(vtmp->vh[i].values); - } - sfree(vtmp); - return ret; + int ret; + struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1); + if(!vtmp) + { + return -ENOMEM; + } + + vtmp->vlist = (value_list_t)VALUE_LIST_INIT; + sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host)); + sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin)); + sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance)); + + vtmp->d = io->d; + vtmp->avgcount_exists = -1; + vtmp->latency_index = 0; + vtmp->index = 0; + yajl->handler_arg = vtmp; + ret = traverse_json(io->json, io->json_len, hand); + sfree(vtmp); + return ret; } +/** + * Initiate JSON parsing and print error if one occurs + */ static int cconn_process_json(struct cconn *io) { - switch (io->request_type) - { - case ASOK_REQ_DATA: - return cconn_process_data(io); - case ASOK_REQ_SCHEMA: - return traverse_json(io->json, node_handler_define_schema, io->d); - default: - return -EDOM; - } + if((io->request_type != ASOK_REQ_DATA) && + (io->request_type != ASOK_REQ_SCHEMA)) + { + return -EDOM; + } + + int result = 1; + yajl_handle hand; + yajl_status status; + + hand = yajl_alloc(&callbacks, +#if HAVE_YAJL_V2 + /* alloc funcs = */ NULL, +#else + /* alloc funcs = */ NULL, NULL, +#endif + /* context = */ (void *)(&io->yajl)); + + if(!hand) + { + ERROR ("ceph plugin: yajl_alloc failed."); + return ENOMEM; + } + + io->yajl.depth = 0; + + switch(io->request_type) + { + case ASOK_REQ_DATA: + io->yajl.handler = node_handler_fetch_data; + result = cconn_process_data(io, &io->yajl, hand); + break; + case ASOK_REQ_SCHEMA: + //init daemon specific variables + io->d->ds_num = 0; + io->d->last_idx = 0; + io->d->last_poll_data = NULL; + io->yajl.handler = node_handler_define_schema; + io->yajl.handler_arg = io->d; + result = traverse_json(io->json, io->json_len, hand); + break; + } + + if(result) + { + goto done; + } + +#if HAVE_YAJL_V2 + status = yajl_complete_parse(hand); +#else + status = yajl_parse_complete(hand); +#endif + + if (status != yajl_status_ok) + { + unsigned char *errmsg = yajl_get_error (hand, /* verbose = */ 0, + /* jsonText = */ NULL, /* jsonTextLen = */ 0); + ERROR ("ceph plugin: yajl_parse_complete failed: %s", + (char *) errmsg); + yajl_free_error (hand, errmsg); + yajl_free (hand); + return 1; + } + + done: + yajl_free (hand); + return result; } static int cconn_validate_revents(struct cconn *io, int revents) { - if (revents & POLLERR) - { - ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name); - return -EIO; - } - switch (io->state) - { - case CSTATE_WRITE_REQUEST: - return (revents & POLLOUT) ? 0 : -EINVAL; - case CSTATE_READ_VERSION: - case CSTATE_READ_AMT: - case CSTATE_READ_JSON: - return (revents & POLLIN) ? 0 : -EINVAL; - return (revents & POLLIN) ? 0 : -EINVAL; - default: - ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d", - io->d->name, __LINE__); - return -EDOM; - } + if(revents & POLLERR) + { + ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR", + io->d->name); + return -EIO; + } + switch (io->state) + { + case CSTATE_WRITE_REQUEST: + return (revents & POLLOUT) ? 0 : -EINVAL; + case CSTATE_READ_VERSION: + case CSTATE_READ_AMT: + case CSTATE_READ_JSON: + return (revents & POLLIN) ? 0 : -EINVAL; + default: + ERROR("ceph plugin: cconn_validate_revents(name=%s) got to " + "illegal state on line %d", io->d->name, __LINE__); + return -EDOM; + } } /** Handle a network event for a connection */ static int cconn_handle_event(struct cconn *io) { - int ret; - switch (io->state) - { - case CSTATE_UNCONNECTED: - ERROR("cconn_handle_event(name=%s) got to illegal state on line %d", - io->d->name, __LINE__); - - return -EDOM; - case CSTATE_WRITE_REQUEST: - { - char cmd[32]; - /*snprintf(cmd, sizeof(cmd), "%s%d%s", "{\"prefix\":\"", io->request_type, - "\"}");*/ - char req_type_str[2]; - snprintf(req_type_str, sizeof(req_type_str), "%1.1d", io->request_type); - json_object *cmd_object = json_object_new_object(); - json_object_object_add(cmd_object, "prefix", - json_object_new_string(req_type_str)); - const char *cmd_json = json_object_to_json_string(cmd_object); - /** we should send '\n' to server **/ - snprintf(cmd, sizeof(cmd), "%s\n", cmd_json); - size_t cmd_len = strlen(cmd); - RETRY_ON_EINTR(ret, - write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)", - io->d->name, io->state, io->amt, ret); - if (ret < 0) - return ret; - io->amt += ret; - if (io->amt >= cmd_len) - { - io->amt = 0; - switch (io->request_type) - { - case ASOK_REQ_VERSION: - io->state = CSTATE_READ_VERSION; - break; - default: - io->state = CSTATE_READ_AMT; - break; - } - } - json_object_put(cmd_object); - return 0; - } - case CSTATE_READ_VERSION: - { - RETRY_ON_EINTR(ret, - read(io->asok, ((char*)(&io->d->version)) + io->amt, - sizeof(io->d->version) - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", - io->d->name, io->state, ret); - if (ret < 0) - return ret; - io->amt += ret; - if (io->amt >= sizeof(io->d->version)) - { - io->d->version = ntohl(io->d->version); - if (io->d->version != 1) - { - ERROR("cconn_handle_event(name=%s) not " - "expecting version %d!", io->d->name, io->d->version); - return -ENOTSUP; - }DEBUG("cconn_handle_event(name=%s): identified as " - "version %d", io->d->name, io->d->version); - io->amt = 0; - cconn_close(io); - io->request_type = ASOK_REQ_SCHEMA; - } - return 0; - } - case CSTATE_READ_AMT: - { - RETRY_ON_EINTR(ret, - read(io->asok, ((char*)(&io->json_len)) + io->amt, - sizeof(io->json_len) - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", - io->d->name, io->state, ret); - if (ret < 0) - return ret; - io->amt += ret; - if (io->amt >= sizeof(io->json_len)) - { - io->json_len = ntohl(io->json_len); - io->amt = 0; - io->state = CSTATE_READ_JSON; - io->json = calloc(1, io->json_len + 1); - if (!io->json) - return -ENOMEM; - } - return 0; - } - case CSTATE_READ_JSON: - { - RETRY_ON_EINTR(ret, - read(io->asok, io->json + io->amt, io->json_len - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", - io->d->name, io->state, ret); - if (ret < 0) - return ret; - io->amt += ret; - if (io->amt >= io->json_len) - { - ret = cconn_process_json(io); - if (ret) - return ret; - cconn_close(io); - io->request_type = ASOK_REQ_NONE; - } - return 0; - } - default: - ERROR("cconn_handle_event(name=%s) got to illegal state on " - "line %d", io->d->name, __LINE__); - return -EDOM; - } + int ret; + switch (io->state) + { + case CSTATE_UNCONNECTED: + ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal " + "state on line %d", io->d->name, __LINE__); + + return -EDOM; + case CSTATE_WRITE_REQUEST: + { + char cmd[32]; + snprintf(cmd, sizeof(cmd), "%s%d%s", "{ \"prefix\": \"", + io->request_type, "\" }\n"); + size_t cmd_len = strlen(cmd); + RETRY_ON_EINTR(ret, + write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt)); + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)", + io->d->name, io->state, io->amt, ret); + if(ret < 0) + { + return ret; + } + io->amt += ret; + if(io->amt >= cmd_len) + { + io->amt = 0; + switch (io->request_type) + { + case ASOK_REQ_VERSION: + io->state = CSTATE_READ_VERSION; + break; + default: + io->state = CSTATE_READ_AMT; + break; + } + } + return 0; + } + case CSTATE_READ_VERSION: + { + RETRY_ON_EINTR(ret, + read(io->asok, ((char*)(&io->d->version)) + io->amt, + sizeof(io->d->version) - io->amt)); + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + io->d->name, io->state, ret); + if(ret < 0) + { + return ret; + } + io->amt += ret; + if(io->amt >= sizeof(io->d->version)) + { + io->d->version = ntohl(io->d->version); + if(io->d->version != 1) + { + ERROR("ceph plugin: cconn_handle_event(name=%s) not " + "expecting version %d!", io->d->name, io->d->version); + return -ENOTSUP; + } + DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as " + "version %d", io->d->name, io->d->version); + io->amt = 0; + cconn_close(io); + io->request_type = ASOK_REQ_SCHEMA; + } + return 0; + } + case CSTATE_READ_AMT: + { + RETRY_ON_EINTR(ret, + read(io->asok, ((char*)(&io->json_len)) + io->amt, + sizeof(io->json_len) - io->amt)); + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + io->d->name, io->state, ret); + if(ret < 0) + { + return ret; + } + io->amt += ret; + if(io->amt >= sizeof(io->json_len)) + { + io->json_len = ntohl(io->json_len); + io->amt = 0; + io->state = CSTATE_READ_JSON; + io->json = calloc(1, io->json_len + 1); + if(!io->json) + { + ERROR("ceph plugin: error callocing io->json"); + return -ENOMEM; + } + } + return 0; + } + case CSTATE_READ_JSON: + { + RETRY_ON_EINTR(ret, + read(io->asok, io->json + io->amt, io->json_len - io->amt)); + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + io->d->name, io->state, ret); + if(ret < 0) + { + return ret; + } + io->amt += ret; + if(io->amt >= io->json_len) + { + ret = cconn_process_json(io); + if(ret) + { + return ret; + } + cconn_close(io); + io->request_type = ASOK_REQ_NONE; + } + return 0; + } + default: + ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal " + "state on line %d", io->d->name, __LINE__); + return -EDOM; + } } static int cconn_prepare(struct cconn *io, struct pollfd* fds) { - int ret; - if (io->request_type == ASOK_REQ_NONE) - { - /* The request has already been serviced. */ - return 0; - } - else if ((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0)) - { - /* If there are no counters to report on, don't bother - * connecting */ - return 0; - } - - switch (io->state) - { - case CSTATE_UNCONNECTED: - ret = cconn_connect(io); - if (ret > 0) - return -ret; - else if (ret < 0) - return ret; - fds->fd = io->asok; - fds->events = POLLOUT; - return 1; - case CSTATE_WRITE_REQUEST: - fds->fd = io->asok; - fds->events = POLLOUT; - return 1; - case CSTATE_READ_VERSION: - case CSTATE_READ_AMT: - case CSTATE_READ_JSON: - fds->fd = io->asok; - fds->events = POLLIN; - return 1; - default: - ERROR("cconn_prepare(name=%s) got to illegal state on line %d", - io->d->name, __LINE__); - return -EDOM; - } + int ret; + if(io->request_type == ASOK_REQ_NONE) + { + /* The request has already been serviced. */ + return 0; + } + else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0)) + { + /* If there are no counters to report on, don't bother + * connecting */ + return 0; + } + + switch (io->state) + { + case CSTATE_UNCONNECTED: + ret = cconn_connect(io); + if(ret > 0) + { + return -ret; + } + else if(ret < 0) + { + return ret; + } + fds->fd = io->asok; + fds->events = POLLOUT; + return 1; + case CSTATE_WRITE_REQUEST: + fds->fd = io->asok; + fds->events = POLLOUT; + return 1; + case CSTATE_READ_VERSION: + case CSTATE_READ_AMT: + case CSTATE_READ_JSON: + fds->fd = io->asok; + fds->events = POLLIN; + return 1; + default: + ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state " + "on line %d", io->d->name, __LINE__); + return -EDOM; + } } /** Returns the difference between two struct timevals in milliseconds. @@ -1083,197 +1450,164 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds) */ static int milli_diff(const struct timeval *t1, const struct timeval *t2) { - int64_t ret; - int sec_diff = t1->tv_sec - t2->tv_sec; - int usec_diff = t1->tv_usec - t2->tv_usec; - ret = usec_diff / 1000; - ret += (sec_diff * 1000); - if (ret > INT_MAX) - return INT_MAX; - else if (ret < INT_MIN) - return INT_MIN; - return (int) ret; + int64_t ret; + int sec_diff = t1->tv_sec - t2->tv_sec; + int usec_diff = t1->tv_usec - t2->tv_usec; + ret = usec_diff / 1000; + ret += (sec_diff * 1000); + return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret); } /** This handles the actual network I/O to talk to the Ceph daemons. */ static int cconn_main_loop(uint32_t request_type) { - int i, ret, some_unreachable = 0; - struct timeval end_tv; - struct cconn io_array[g_num_daemons]; - - DEBUG("entering cconn_main_loop(request_type = %d)", request_type); - - /* create cconn array */ - memset(io_array, 0, sizeof(io_array)); - for (i = 0; i < g_num_daemons; ++i) - { - io_array[i].d = g_daemons[i]; - io_array[i].request_type = request_type; - io_array[i].state = CSTATE_UNCONNECTED; - } - - /** Calculate the time at which we should give up */ - gettimeofday(&end_tv, NULL); - end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL; - - while (1) - { - int nfds, diff; - struct timeval tv; - struct cconn *polled_io_array[g_num_daemons]; - struct pollfd fds[g_num_daemons]; - memset(fds, 0, sizeof(fds)); - nfds = 0; - for (i = 0; i < g_num_daemons; ++i) - { - struct cconn *io = io_array + i; - ret = cconn_prepare(io, fds + nfds); - if (ret < 0) - { - WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d", - io->d->name, i, io->state, ret); - cconn_close(io); - io->request_type = ASOK_REQ_NONE; - some_unreachable = 1; - } - else if (ret == 1) - { - DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)", - io->d->name, i, io->state); - polled_io_array[nfds++] = io_array + i; - } - } - if (nfds == 0) - { - /* finished */ - ret = 0; - DEBUG("cconn_main_loop: no more cconn to manage."); - goto done; - } - gettimeofday(&tv, NULL); - diff = milli_diff(&end_tv, &tv); - if (diff <= 0) - { - /* Timed out */ - ret = -ETIMEDOUT; - WARNING("ERROR: cconn_main_loop: timed out.\n"); - goto done; - } - RETRY_ON_EINTR(ret, poll(fds, nfds, diff)); - if (ret < 0) - { - ERROR("poll(2) error: %d", ret); - goto done; - } - for (i = 0; i < nfds; ++i) - { - struct cconn *io = polled_io_array[i]; - int revents = fds[i].revents; - if (revents == 0) - { - /* do nothing */ - } - else if (cconn_validate_revents(io, revents)) - { - WARNING("ERROR: cconn(name=%s,i=%d,st=%d): " - "revents validation error: " - "revents=0x%08x", io->d->name, i, io->state, revents); - cconn_close(io); - io->request_type = ASOK_REQ_NONE; - some_unreachable = 1; - } - else - { - int ret = cconn_handle_event(io); - if (ret) - { - WARNING("ERROR: cconn_handle_event(name=%s," - "i=%d,st=%d): error %d", io->d->name, i, io->state, ret); - cconn_close(io); - io->request_type = ASOK_REQ_NONE; - some_unreachable = 1; - } - } - } - } - done: for (i = 0; i < g_num_daemons; ++i) - { - cconn_close(io_array + i); - } - if (some_unreachable) - { - DEBUG("cconn_main_loop: some Ceph daemons were unreachable."); - } - else - { - DEBUG("cconn_main_loop: reached all Ceph daemons :)"); - } - return ret; + int i, ret, some_unreachable = 0; + struct timeval end_tv; + struct cconn io_array[g_num_daemons]; + + DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type); + + /* create cconn array */ + memset(io_array, 0, sizeof(io_array)); + for(i = 0; i < g_num_daemons; ++i) + { + io_array[i].d = g_daemons[i]; + io_array[i].request_type = request_type; + io_array[i].state = CSTATE_UNCONNECTED; + } + + /** Calculate the time at which we should give up */ + gettimeofday(&end_tv, NULL); + end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL; + + while (1) + { + int nfds, diff; + struct timeval tv; + struct cconn *polled_io_array[g_num_daemons]; + struct pollfd fds[g_num_daemons]; + memset(fds, 0, sizeof(fds)); + nfds = 0; + for(i = 0; i < g_num_daemons; ++i) + { + struct cconn *io = io_array + i; + ret = cconn_prepare(io, fds + nfds); + if(ret < 0) + { + WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d", + io->d->name, i, io->state, ret); + cconn_close(io); + io->request_type = ASOK_REQ_NONE; + some_unreachable = 1; + } + else if(ret == 1) + { + polled_io_array[nfds++] = io_array + i; + } + } + if(nfds == 0) + { + /* finished */ + ret = 0; + goto done; + } + gettimeofday(&tv, NULL); + diff = milli_diff(&end_tv, &tv); + if(diff <= 0) + { + /* Timed out */ + ret = -ETIMEDOUT; + WARNING("ceph plugin: cconn_main_loop: timed out."); + goto done; + } + RETRY_ON_EINTR(ret, poll(fds, nfds, diff)); + if(ret < 0) + { + ERROR("ceph plugin: poll(2) error: %d", ret); + goto done; + } + for(i = 0; i < nfds; ++i) + { + struct cconn *io = polled_io_array[i]; + int revents = fds[i].revents; + if(revents == 0) + { + /* do nothing */ + } + else if(cconn_validate_revents(io, revents)) + { + WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): " + "revents validation error: " + "revents=0x%08x", io->d->name, i, io->state, revents); + cconn_close(io); + io->request_type = ASOK_REQ_NONE; + some_unreachable = 1; + } + else + { + ret = cconn_handle_event(io); + if(ret) + { + WARNING("ceph plugin: cconn_handle_event(name=%s," + "i=%d,st=%d): error %d", io->d->name, i, io->state, ret); + cconn_close(io); + io->request_type = ASOK_REQ_NONE; + some_unreachable = 1; + } + } + } + } + done: for(i = 0; i < g_num_daemons; ++i) + { + cconn_close(io_array + i); + } + if(some_unreachable) + { + DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable."); + } + else + { + DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)"); + } + return ret; } static int ceph_read(void) { - return cconn_main_loop(ASOK_REQ_DATA); + return cconn_main_loop(ASOK_REQ_DATA); } /******* lifecycle *******/ static int ceph_init(void) { - int i, ret, j; - DEBUG("ceph_init"); - ceph_daemons_print(); - - ret = cconn_main_loop(ASOK_REQ_VERSION); - if (ret) - return ret; - for (i = 0; i < g_num_daemons; ++i) - { - struct ceph_daemon *d = g_daemons[i]; - for (j = 0; j < d->dset_num; j++) - { - ret = plugin_register_data_set(d->dset + j); - if (ret) - { - ERROR("plugin_register_data_set(%s) failed!", d->name); - } - else - { - DEBUG("plugin_register_data_set(%s): " - "(d->dset)[%d]->ds_num=%d", - d->name, j, d->dset[j].ds_num); - } - } - } - return 0; + int ret; + ceph_daemons_print(); + + ret = cconn_main_loop(ASOK_REQ_VERSION); + + return (ret) ? ret : 0; } static int ceph_shutdown(void) { - int i; - for (i = 0; i < g_num_daemons; ++i) - { - ceph_daemon_free(g_daemons[i]); - } - sfree(g_daemons); - g_daemons = NULL; - g_num_daemons = 0; - for(i = 0; i < last_idx; i++) - { - sfree(last_poll_data[i]); - } - sfree(last_poll_data); - last_poll_data = NULL; - last_idx = 0; - DEBUG("finished ceph_shutdown"); - return 0; + int i; + for(i = 0; i < g_num_daemons; ++i) + { + ceph_daemon_free(g_daemons[i]); + } + sfree(g_daemons); + g_daemons = NULL; + g_num_daemons = 0; + DEBUG("ceph plugin: finished ceph_shutdown"); + return 0; } void module_register(void) { - plugin_register_complex_config("ceph", ceph_config); - plugin_register_init("ceph", ceph_init); - plugin_register_read("ceph", ceph_read); - plugin_register_shutdown("ceph", ceph_shutdown); + plugin_register_complex_config("ceph", ceph_config); + plugin_register_init("ceph", ceph_init); + plugin_register_read("ceph", ceph_read); + plugin_register_shutdown("ceph", ceph_shutdown); } +/* vim: set sw=4 sts=4 et : */