X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fceph.c;h=419ca6e56170758c5c05dea2e1e20994b53b2ff3;hb=3348ace2cdd95bc1de5e28d11f3999cf5bd4ebf4;hp=e8bde9b4967936dfbfaeb0b3cd28b1ff15f8a7fe;hpb=d3bc2e679b189e3512b38d9f6b3ffd8af2a46317;p=collectd.git diff --git a/src/ceph.c b/src/ceph.c index e8bde9b4..419ca6e5 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -21,6 +21,7 @@ * Dan Ryder **/ +#define _DEFAULT_SOURCE #define _BSD_SOURCE #include "collectd.h" @@ -42,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -114,6 +114,14 @@ struct ceph_daemon uint32_t *ds_types; /** Track ds names to match with types */ char **ds_names; + + /** + * Keep track of last data for latency values so we can calculate rate + * since last poll. + */ + struct last_data **last_poll_data; + /** index of last poll data */ + int last_idx; }; /******* JSON parsing *******/ @@ -132,14 +140,6 @@ struct yajl_struct }; typedef struct yajl_struct yajl_struct; -/** - * Keep track of last data for latency values so we can calculate rate - * since last poll. - */ -struct last_data **last_poll_data = NULL; -/** index of last poll data */ -int last_idx = 0; - enum perfcounter_type_d { PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8, @@ -200,7 +200,6 @@ struct last_data uint64_t last_count; }; - /******* network I/O *******/ enum cstate_t { @@ -256,7 +255,7 @@ static int ceph_cb_boolean(void *ctx, int bool_val) return CEPH_CB_CONTINUE; } -static int +static int ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) { yajl_struct *yajl = (yajl_struct*)ctx; @@ -321,7 +320,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) return CEPH_CB_CONTINUE; } -static int ceph_cb_string(void *ctx, const unsigned char *string_val, +static int ceph_cb_string(void *ctx, const unsigned char *string_val, yajl_len_t string_len) { return CEPH_CB_CONTINUE; @@ -404,7 +403,14 @@ static void ceph_daemons_print(void) static void ceph_daemon_free(struct ceph_daemon *d) { int i = 0; - for(; i < d->ds_num; i++) + for(; i < d->last_idx; i++) + { + sfree(d->last_poll_data[i]); + } + sfree(d->last_poll_data); + d->last_poll_data = NULL; + d->last_idx = 0; + for(i = 0; i < d->ds_num; i++) { sfree(d->ds_names[i]); } @@ -522,7 +528,7 @@ static int parse_keys(const char *key_str, char *ds_name) int max_str_len = 100; char tmp_ds_name[max_str_len]; memset(tmp_ds_name, 0, sizeof(tmp_ds_name)); - if(ds_name == NULL || key_str == NULL || key_str[0] == '\0' || + if(ds_name == NULL || key_str == NULL || key_str[0] == '\0' || ds_name[0] != '\0') { return -1; @@ -533,6 +539,7 @@ static int parse_keys(const char *key_str, char *ds_name) memcpy(tmp_ds_name, key_str, max_str_len - 1); goto compact; } + ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr); if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4)) { /** copy whole key **/ @@ -543,7 +550,7 @@ static int parse_keys(const char *key_str, char *ds_name) memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ? (max_str_len - 1) : (rptr - key_str))); } - + compact: compact_ds_name(tmp_ds_name, ds_name); return 0; } @@ -558,7 +565,7 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, uint32_t type; char ds_name[DATA_MAX_NAME_LEN]; memset(ds_name, 0, sizeof(ds_name)); - + if(convert_special_metrics) { /** @@ -604,7 +611,7 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1); d->ds_num = (d->ds_num + 1); - + return 0; } @@ -649,7 +656,8 @@ static int cc_handle_bool(struct oconfig_item_s *item, int *dest) static int cc_add_daemon_config(oconfig_item_t *ci) { int ret, i; - struct ceph_daemon *array, *nd, cd; + struct ceph_daemon *nd, cd; + struct ceph_daemon **tmp; memset(&cd, 0, sizeof(struct ceph_daemon)); if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) @@ -700,21 +708,22 @@ static int cc_add_daemon_config(oconfig_item_t *ci) "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path); return -EINVAL; } - array = realloc(g_daemons, - sizeof(struct ceph_daemon *) * (g_num_daemons + 1)); - if(array == NULL) + + tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons)); + if(tmp == NULL) { /* The positive return value here indicates that this is a * runtime error, not a configuration error. */ return ENOMEM; } - g_daemons = (struct ceph_daemon**) array; - nd = malloc(sizeof(struct ceph_daemon)); + g_daemons = tmp; + + nd = malloc(sizeof(*nd)); if(!nd) { return ENOMEM; } - memcpy(nd, &cd, sizeof(struct ceph_daemon)); + memcpy(nd, &cd, sizeof(*nd)); g_daemons[g_num_daemons++] = nd; return 0; } @@ -798,86 +807,118 @@ node_handler_define_schema(void *arg, const char *val, const char *key) struct ceph_daemon *d = (struct ceph_daemon *) arg; int pc_type; pc_type = atoi(val); - DEBUG("ceph plugin: ceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)", - d->name, key, pc_type); return ceph_daemon_add_ds_entry(d, key, pc_type); } /** * Latency counter does not yet have an entry in last poll data - add it. */ -static int add_last(const char *ds_n, double cur_sum, uint64_t cur_count) +static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum, + uint64_t cur_count) { - last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data)); - if(!last_poll_data[last_idx]) + d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data)); + if(!d->last_poll_data[d->last_idx]) { return -ENOMEM; } - sstrncpy(last_poll_data[last_idx]->ds_name,ds_n, - sizeof(last_poll_data[last_idx]->ds_name)); - last_poll_data[last_idx]->last_sum = cur_sum; - last_poll_data[last_idx]->last_count = cur_count; - last_idx++; + sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n, + sizeof(d->last_poll_data[d->last_idx]->ds_name)); + d->last_poll_data[d->last_idx]->last_sum = cur_sum; + d->last_poll_data[d->last_idx]->last_count = cur_count; + d->last_idx = (d->last_idx + 1); return 0; } /** * Update latency counter or add new entry if it doesn't exist */ -static int update_last(const char *ds_n, int index, double cur_sum, - uint64_t cur_count) +static int update_last(struct ceph_daemon *d, const char *ds_n, int index, + double cur_sum, uint64_t cur_count) { - if((last_idx > index) && (strcmp(last_poll_data[index]->ds_name, ds_n) == 0)) + if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)) { - last_poll_data[index]->last_sum = cur_sum; - last_poll_data[index]->last_count = cur_count; + d->last_poll_data[index]->last_sum = cur_sum; + d->last_poll_data[index]->last_count = cur_count; return 0; } - if(!last_poll_data) + if(!d->last_poll_data) { - last_poll_data = malloc(1 * sizeof(struct last_data *)); - if(!last_poll_data) + d->last_poll_data = malloc(1 * sizeof(struct last_data *)); + if(!d->last_poll_data) { return -ENOMEM; } } else { - struct last_data **tmp_last = realloc(last_poll_data, - ((last_idx+1) * sizeof(struct last_data *))); + struct last_data **tmp_last = realloc(d->last_poll_data, + ((d->last_idx+1) * sizeof(struct last_data *))); if(!tmp_last) { return -ENOMEM; } - last_poll_data = tmp_last; + d->last_poll_data = tmp_last; + } + return add_last(d, ds_n, cur_sum, cur_count); +} + +/** + * If using index guess failed (shouldn't happen, but possible if counters + * get rearranged), resort to searching for counter name + */ +static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n) +{ + int i = 0; + for(; i < d->last_idx; i++) + { + if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0) + { + return i; + } } - return add_last(ds_n, cur_sum, cur_count); + return -1; } /** * Calculate average b/t current data and last poll data * if last poll data exists */ -static double get_last_avg(const char *ds_n, int index, +static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index, double cur_sum, uint64_t cur_count) { double result = -1.1, sum_delt = 0.0; uint64_t count_delt = 0; - if((last_idx > index) && - (strcmp(last_poll_data[index]->ds_name, ds_n) == 0) && - (cur_count > last_poll_data[index]->last_count)) + int tmp_index = 0; + if(d->last_idx > index) { - sum_delt = (cur_sum - last_poll_data[index]->last_sum); - count_delt = (cur_count - last_poll_data[index]->last_count); - result = (sum_delt / count_delt); + if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0) + { + tmp_index = index; + } + //test previous index + else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0)) + { + tmp_index = (index - 1); + } + else + { + tmp_index = backup_search_for_last_avg(d, ds_n); + } + + if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count)) + { + sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum); + count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count); + result = (sum_delt / count_delt); + } } if(result == -1.1) { result = NAN; } - if(update_last(ds_n, index, cur_sum, cur_count) == -ENOMEM) + if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM) { return -ENOMEM; } @@ -925,7 +966,7 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key) //don't overflow bounds of array index = (vtmp->d->ds_num - 1); } - + /** * counters should remain in same order we parsed schema... we maintain the * index variable to keep track of current point in list of counters. first @@ -964,29 +1005,24 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key) { double sum, result; sscanf(val, "%lf", &sum); - DEBUG("ceph plugin: avgcount:%" PRIu64,vtmp->avgcount); - DEBUG("ceph plugin: sum:%lf",sum); if(vtmp->avgcount == 0) { vtmp->avgcount = 1; } - + /** User wants latency values as long run avg */ if(long_run_latency_avg) { result = (sum / vtmp->avgcount); - DEBUG("ceph plugin: uv->gauge = sumd / avgcounti = :%lf", result); } else { - result = get_last_avg(ds_name, vtmp->latency_index, sum, vtmp->avgcount); + result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount); if(result == -ENOMEM) { return -ENOMEM; } - DEBUG("ceph plugin: uv->gauge = (sumd_now - sumd_last) / " - "(avgcounti_now - avgcounti_last) = :%lf", result); } uv.gauge = result; @@ -997,12 +1033,10 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key) case DSET_BYTES: sscanf(val, "%lf", &tmp_d); uv.gauge = tmp_d; - DEBUG("ceph plugin: uv->gauge = %lf",uv.gauge); break; case DSET_RATE: sscanf(val, "%" PRIu64, &tmp_u); uv.derive = tmp_u; - DEBUG("ceph plugin: uv->derive = %" PRIu64 "",(uint64_t)uv.derive); break; case DSET_TYPE_UNFOUND: default: @@ -1015,7 +1049,6 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key) vtmp->vlist.values = &uv; vtmp->vlist.values_len = 1; - DEBUG("ceph plugin: dispatching %s\n", ds_name); vtmp->index = (vtmp->index + 1); plugin_dispatch_values(&vtmp->vlist); @@ -1147,6 +1180,10 @@ static int cconn_process_json(struct cconn *io) result = cconn_process_data(io, &io->yajl, hand); break; case ASOK_REQ_SCHEMA: + //init daemon specific variables + io->d->ds_num = 0; + io->d->last_idx = 0; + io->d->last_poll_data = NULL; io->yajl.handler = node_handler_define_schema; io->yajl.handler_arg = io->d; result = traverse_json(io->json, io->json_len, hand); @@ -1196,7 +1233,6 @@ static int cconn_validate_revents(struct cconn *io, int revents) case CSTATE_READ_AMT: case CSTATE_READ_JSON: return (revents & POLLIN) ? 0 : -EINVAL; - return (revents & POLLIN) ? 0 : -EINVAL; default: ERROR("ceph plugin: cconn_validate_revents(name=%s) got to " "illegal state on line %d", io->d->name, __LINE__); @@ -1435,8 +1471,6 @@ static int cconn_main_loop(uint32_t request_type) } else if(ret == 1) { - DEBUG("ceph plugin: did cconn_prepare(name=%s,i=%d,st=%d)", - io->d->name, i, io->state); polled_io_array[nfds++] = io_array + i; } } @@ -1444,7 +1478,6 @@ static int cconn_main_loop(uint32_t request_type) { /* finished */ ret = 0; - DEBUG("ceph plugin: cconn_main_loop: no more cconn to manage."); goto done; } gettimeofday(&tv, NULL); @@ -1517,7 +1550,6 @@ static int ceph_read(void) static int ceph_init(void) { int ret; - DEBUG("ceph plugin: ceph_init"); ceph_daemons_print(); ret = cconn_main_loop(ASOK_REQ_VERSION); @@ -1535,13 +1567,6 @@ static int ceph_shutdown(void) sfree(g_daemons); g_daemons = NULL; g_num_daemons = 0; - for(i = 0; i < last_idx; i++) - { - sfree(last_poll_data[i]); - } - sfree(last_poll_data); - last_poll_data = NULL; - last_idx = 0; DEBUG("ceph plugin: finished ceph_shutdown"); return 0; }