#include <math.h>
#include <inttypes.h>
-#define MAX_RRD_DS_NAME_LEN 20
-
#define RETRY_AVGCOUNT -1
#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
typedef unsigned int yajl_len_t;
#endif
+/** Number of types for ceph defined in types.db */
+#define CEPH_DSET_TYPES_NUM 3
+/** ceph types enum */
+enum ceph_dset_type_d
+{
+ DSET_LATENCY = 0,
+ DSET_BYTES = 1,
+ DSET_RATE = 2,
+ DSET_TYPE_UNFOUND = 1000
+};
+
+/** Valid types for ceph defined in types.db */
+const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
+ {"ceph_latency", "ceph_bytes", "ceph_rate"};
+
/******* ceph_daemon *******/
struct ceph_daemon
{
/** daemon name **/
char name[DATA_MAX_NAME_LEN];
- int dset_num;
-
/** Path to the socket that we use to talk to the ceph daemon */
char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
- /** The set of key/value pairs that this daemon reports
- * dset.type The daemon name
- * dset.ds_num Number of data sources (key/value pairs)
- * dset.ds Dynamically allocated array of key/value pairs
- */
- /** Dynamically allocated array **/
- struct data_set_s *dset;
- int **pc_types;
+ /** Number of counters */
+ int ds_num;
+ /** Track ds types */
+ uint32_t *ds_types;
+ /** Track ds names to match with types */
+ char **ds_names;
};
/******* JSON parsing *******/
* since last poll.
*/
struct last_data **last_poll_data = NULL;
+/** index of last poll data */
int last_idx = 0;
enum perfcounter_type_d
/** Number of elements in g_daemons */
static int g_num_daemons = 0;
-struct values_holder
-{
- int values_len;
- value_t *values;
-};
-
/**
- * A set of values_t data that we build up in memory while parsing the JSON.
+ * A set of data that we build up in memory while parsing the JSON.
*/
struct values_tmp
{
+ /** ceph daemon we are processing data for*/
struct ceph_daemon *d;
- int holder_num;
- struct values_holder vh[0];
+ /** track avgcount across counters for avgcount/sum latency pairs */
uint64_t avgcount;
+ /** current index of counters - used to get type of counter */
+ int index;
+ /** do we already have an avgcount for latency pair */
+ int avgcount_exists;
+ /**
+ * similar to index, but current index of latency type counters -
+ * used to get last poll data of counter
+ */
+ int latency_index;
+ /**
+ * values list - maintain across counters since
+ * host/plugin/plugin instance are always the same
+ */
+ value_list_t vlist;
};
/**
*/
struct last_data
{
- char dset_name[DATA_MAX_NAME_LEN];
- char ds_name[MAX_RRD_DS_NAME_LEN];
+ char ds_name[DATA_MAX_NAME_LEN];
double last_sum;
uint64_t last_count;
};
(strcmp(yajl->state[i-2].key,"filestore") == 0) &&
(strcmp(yajl->state[i].key,"avgcount") == 0))
{
- DEBUG("Skipping avgcount for filestore.JournalWrBytes");
+ DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
yajl->depth = (yajl->depth - 1);
return CEPH_CB_CONTINUE;
}
static void ceph_daemon_print(const struct ceph_daemon *d)
{
- DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
+ DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
}
static void ceph_daemons_print(void)
static void ceph_daemon_free(struct ceph_daemon *d)
{
int i = 0;
- for(; i < d->dset_num; i++)
+ for(; i < d->ds_num; i++)
{
- plugin_unregister_data_set((d->dset + i)->type);
- sfree(d->dset->ds);
- sfree(d->pc_types[i]);
+ sfree(d->ds_names[i]);
}
- sfree(d->dset);
- sfree(d->pc_types);
+ sfree(d->ds_types);
+ sfree(d->ds_names);
sfree(d);
}
+/**
+ * Compact ds name by removing special characters and trimming length to
+ * DATA_MAX_NAME_LEN if necessary
+ */
static void compact_ds_name(char *source, char *dest)
{
int keys_num = 0, i;
strncat(tmp, keys[i], key_chars_remaining);
key_chars_remaining -= strlen(keys[i]);
}
- /** to coordinate limitation of length of ds name from RRD
+ tmp[DATA_MAX_NAME_LEN - 1] = '\0';
+ /** to coordinate limitation of length of type_instance
* we will truncate ds_name
* when the its length is more than
- * MAX_RRD_DS_NAME_LEN
+ * DATA_MAX_NAME_LEN
*/
- if(strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
+ if(strlen(tmp) > DATA_MAX_NAME_LEN - 1)
{
append_status |= 0x4;
/** we should reserve space for
*/
reserved += 4;
}
- snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
+ snprintf(dest, DATA_MAX_NAME_LEN - reserved, "%s", tmp);
offset = strlen(dest);
switch (append_status)
{
break;
}
}
-static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
+
+/**
+ * Parse key to remove "type" if this is for schema and initiate compaction
+ */
+static int parse_keys(const char *key_str, char *ds_name)
{
char *ptr, *rptr;
- size_t dset_name_len = 0;
size_t ds_name_len = 0;
- char tmp_ds_name[DATA_MAX_NAME_LEN];
+ /**
+ * allow up to 100 characters before compaction - compact_ds_name will not
+ * allow more than DATA_MAX_NAME_LEN chars
+ */
+ int max_str_len = 100;
+ char tmp_ds_name[max_str_len];
memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
- if(dset_name == NULL || ds_name == NULL || key_str == NULL ||
- key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
+ if(ds_name == NULL || key_str == NULL || key_str[0] == '\0' ||
+ ds_name[0] != '\0')
{
return -1;
}
if((ptr = strchr(key_str, '.')) == NULL
|| (rptr = strrchr(key_str, '.')) == NULL)
{
- strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
- strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
+ memcpy(tmp_ds_name, key_str, max_str_len - 1);
goto compact;
}
- dset_name_len =
- (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
- (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
- memcpy(dset_name, key_str, dset_name_len);
- ds_name_len =
- (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
- if(ds_name_len == 0)
- { /** only have two keys **/
- if(!strncmp(rptr + 1, "type", 4))
- {/** if last key is "type",ignore **/
- strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
- }
- else
- {/** if last key isn't "type", copy last key **/
- strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
- }
- }
- else if(!strncmp(rptr + 1, "type", 4))
- {/** more than two keys **/
- memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
+ ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr);
+ if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4))
+ { /** copy whole key **/
+ memcpy(tmp_ds_name, key_str, max_str_len - 1);
}
else
- {/** copy whole keys **/
- strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
+ {/** more than two keys **/
+ memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ?
+ (max_str_len - 1) : (rptr - key_str)));
}
+
compact: compact_ds_name(tmp_ds_name, ds_name);
return 0;
}
-static int get_matching_dset(const struct ceph_daemon *d, const char *name)
-{
- int idx;
- for(idx = 0; idx < d->dset_num; ++idx)
- {
- if(strcmp(d->dset[idx].type, name) == 0)
- {
- return idx;
- }
- }
- return -1;
-}
-
-static int get_matching_value(const struct data_set_s *dset, const char *name,
- int num_values)
-{
- int idx;
- for(idx = 0; idx < num_values; ++idx)
- {
- if(strcmp(dset->ds[idx].name, name) == 0)
- {
- return idx;
- }
- }
- return -1;
-}
-
+/**
+ * while parsing ceph admin socket schema, save counter name and type for later
+ * data processing
+ */
static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
int pc_type)
{
- struct data_source_s *ds;
- struct data_set_s *dset;
- struct data_set_s *dset_array;
- int **pc_types_array = NULL;
- int *pc_types;
- int *pc_types_new;
- int idx = 0;
- if(strlen(name) + 1 > DATA_MAX_NAME_LEN)
- {
- return -ENAMETOOLONG;
- }
- char dset_name[DATA_MAX_NAME_LEN];
- char ds_name[MAX_RRD_DS_NAME_LEN];
- memset(dset_name, 0, sizeof(dset_name));
+ uint32_t type;
+ char ds_name[DATA_MAX_NAME_LEN];
memset(ds_name, 0, sizeof(ds_name));
- if(parse_keys(name, dset_name, ds_name))
- {
- return 1;
- }
- idx = get_matching_dset(d, dset_name);
- if(idx == -1)
- {/* need to add a dset **/
- dset_array = realloc(d->dset,
- sizeof(struct data_set_s) * (d->dset_num + 1));
- if(!dset_array)
- {
- return -ENOMEM;
- }
- pc_types_array = realloc(d->pc_types,
- sizeof(int *) * (d->dset_num + 1));
- if(!pc_types_array)
- {
- return -ENOMEM;
- }
- dset = &dset_array[d->dset_num];
- /** this step is very important, otherwise,
- * realloc for dset->ds will tricky because of
- * a random addr in dset->ds
- */
- memset(dset, 0, sizeof(struct data_set_s));
- dset->ds_num = 0;
- snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
- pc_types = pc_types_array[d->dset_num] = NULL;
- d->dset = dset_array;
- }
- else
- {
- dset = &d->dset[idx];
- pc_types = d->pc_types[idx];
- }
- struct data_source_s *ds_array = realloc(dset->ds,
- sizeof(struct data_source_s) * (dset->ds_num + 1));
- if(!ds_array)
- {
- return -ENOMEM;
- }
- pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
- if(!pc_types_new)
- {
- return -ENOMEM;
- }
- dset->ds = ds_array;
-
+
if(convert_special_metrics)
{
/**
* other "Bytes". Instead of keeping an "average" or "rate", use the
* "sum" in the pair and assign that to the derive value.
*/
- if((strcmp(dset_name,"filestore") == 0) &&
- strcmp(ds_name, "JournalWrBytes") == 0)
+ if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
{
pc_type = 10;
}
}
- if(idx == -1)
+ d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
+ if(!d->ds_names)
{
- pc_types_array[d->dset_num] = pc_types_new;
- d->pc_types = pc_types_array;
- d->pc_types[d->dset_num][dset->ds_num] = pc_type;
- d->dset_num++;
+ return -ENOMEM;
}
- else
+
+ d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
+ if(!d->ds_types)
+ {
+ return -ENOMEM;
+ }
+
+ d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
+ if(!d->ds_names[d->ds_num])
{
- d->pc_types[idx] = pc_types_new;
- d->pc_types[idx][dset->ds_num] = pc_type;
+ return -ENOMEM;
}
- ds = &ds_array[dset->ds_num++];
- snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
- ds->type = (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
- /**
- * Use min of 0 for DERIVE types so we don't get negative values on Ceph
- * service restart
- */
- ds->min = (ds->type == DS_TYPE_DERIVE) ? 0 : NAN;
- ds->max = NAN;
+ type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
+ ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
+ d->ds_types[d->ds_num] = type;
+
+ if(parse_keys(name, ds_name))
+ {
+ return 1;
+ }
+
+ sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
+ d->ds_num = (d->ds_num + 1);
+
return 0;
}
return 0;
}
+/**
+ * Parse JSON and get error message if present
+ */
static int
traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
{
}
}
+/**
+ * Add entry for each counter while parsing schema
+ */
static int
node_handler_define_schema(void *arg, const char *val, const char *key)
{
struct ceph_daemon *d = (struct ceph_daemon *) arg;
int pc_type;
pc_type = atoi(val);
- DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
+ DEBUG("ceph plugin: ceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
d->name, key, pc_type);
return ceph_daemon_add_ds_entry(d, key, pc_type);
}
-static int add_last(const char *dset_n, const char *ds_n, double cur_sum,
- uint64_t cur_count)
+/**
+ * Latency counter does not yet have an entry in last poll data - add it.
+ */
+static int add_last(const char *ds_n, double cur_sum, uint64_t cur_count)
{
last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
if(!last_poll_data[last_idx])
{
return -ENOMEM;
}
- sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,
- sizeof(last_poll_data[last_idx]->dset_name));
sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,
sizeof(last_poll_data[last_idx]->ds_name));
last_poll_data[last_idx]->last_sum = cur_sum;
return 0;
}
-static int update_last(const char *dset_n, const char *ds_n, double cur_sum,
+/**
+ * Update latency counter or add new entry if it doesn't exist
+ */
+static int update_last(const char *ds_n, int index, double cur_sum,
uint64_t cur_count)
{
- int i;
- for(i = 0; i < last_idx; i++)
+ if((last_idx > index) && (strcmp(last_poll_data[index]->ds_name, ds_n) == 0))
{
- if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0 &&
- (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
- {
- last_poll_data[i]->last_sum = cur_sum;
- last_poll_data[i]->last_count = cur_count;
- return 0;
- }
+ last_poll_data[index]->last_sum = cur_sum;
+ last_poll_data[index]->last_count = cur_count;
+ return 0;
}
if(!last_poll_data)
}
last_poll_data = tmp_last;
}
- return add_last(dset_n,ds_n,cur_sum,cur_count);
+ return add_last(ds_n, cur_sum, cur_count);
}
-static double get_last_avg(const char *dset_n, const char *ds_n,
+/**
+ * Calculate average b/t current data and last poll data
+ * if last poll data exists
+ */
+static double get_last_avg(const char *ds_n, int index,
double cur_sum, uint64_t cur_count)
{
- int i;
double result = -1.1, sum_delt = 0.0;
uint64_t count_delt = 0;
- for(i = 0; i < last_idx; i++)
+ if((last_idx > index) &&
+ (strcmp(last_poll_data[index]->ds_name, ds_n) == 0) &&
+ (cur_count > last_poll_data[index]->last_count))
{
- if((strcmp(last_poll_data[i]->dset_name,dset_n) == 0) &&
- (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
- {
- if(cur_count < last_poll_data[i]->last_count)
- {
- break;
- }
- sum_delt = (cur_sum - last_poll_data[i]->last_sum);
- count_delt = (cur_count - last_poll_data[i]->last_count);
- result = (sum_delt / count_delt);
- break;
- }
+ sum_delt = (cur_sum - last_poll_data[index]->last_sum);
+ count_delt = (cur_count - last_poll_data[index]->last_count);
+ result = (sum_delt / count_delt);
}
if(result == -1.1)
{
result = NAN;
}
- if(update_last(dset_n,ds_n,cur_sum,cur_count) == -ENOMEM)
+ if(update_last(ds_n, index, cur_sum, cur_count) == -ENOMEM)
{
return -ENOMEM;
}
return result;
}
+/**
+ * If using index guess failed, resort to searching for counter name
+ */
+static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
+{
+ int idx = 0;
+ for(; idx < d->ds_num; idx++)
+ {
+ if(strcmp(d->ds_names[idx], ds_name) == 0)
+ {
+ return d->ds_types[idx];
+ }
+ }
+ return DSET_TYPE_UNFOUND;
+}
+
+/**
+ * Process counter data and dispatch values
+ */
static int node_handler_fetch_data(void *arg, const char *val, const char *key)
{
- int dset_idx, ds_idx;
- value_t *uv;
- char dset_name[DATA_MAX_NAME_LEN];
- char ds_name[MAX_RRD_DS_NAME_LEN];
+ value_t uv;
+ double tmp_d;
+ uint64_t tmp_u;
struct values_tmp *vtmp = (struct values_tmp*) arg;
- memset(dset_name, 0, sizeof(dset_name));
+ uint32_t type = DSET_TYPE_UNFOUND;
+ int index = vtmp->index;
+
+ char ds_name[DATA_MAX_NAME_LEN];
memset(ds_name, 0, sizeof(ds_name));
- if(parse_keys(key, dset_name, ds_name))
+
+ if(parse_keys(key, ds_name))
{
- DEBUG("enter node_handler_fetch_data");
return 1;
}
- dset_idx = get_matching_dset(vtmp->d, dset_name);
- if(dset_idx == -1)
+
+ if(index >= vtmp->d->ds_num)
{
- return 1;
+ //don't overflow bounds of array
+ index = (vtmp->d->ds_num - 1);
}
- ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
- vtmp->d->dset[dset_idx].ds_num);
- if(ds_idx == -1)
+
+ /**
+ * counters should remain in same order we parsed schema... we maintain the
+ * index variable to keep track of current point in list of counters. first
+ * use index to guess point in array for retrieving type. if that doesn't
+ * work, use the old way to get the counter type
+ */
+ if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
{
- DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
- dset_name,ds_name,dset_idx,ds_idx);
- return RETRY_AVGCOUNT;
+ //found match
+ type = vtmp->d->ds_types[index];
}
- uv = &(vtmp->vh[dset_idx].values[ds_idx]);
-
- if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
+ else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
{
- if(vtmp->avgcount == -1)
- {
- sscanf(val, "%" PRIu64, &vtmp->avgcount);
- }
- else
- {
- double sum, result;
- sscanf(val, "%lf", &sum);
- DEBUG("avgcount:%" PRIu64 "",vtmp->avgcount);
- DEBUG("sum:%lf",sum);
+ //try previous key
+ type = vtmp->d->ds_types[index-1];
+ }
- if(vtmp->avgcount == 0)
- {
- vtmp->avgcount = 1;
- }
+ if(type == DSET_TYPE_UNFOUND)
+ {
+ //couldn't find right type by guessing, check the old way
+ type = backup_search_for_type(vtmp->d, ds_name);
+ }
- /** User wants latency values as long run avg */
- if(long_run_latency_avg)
+ switch(type)
+ {
+ case DSET_LATENCY:
+ if(vtmp->avgcount_exists == -1)
{
- result = (sum / vtmp->avgcount);
- DEBUG("uv->gauge = sumd / avgcounti = :%lf", result);
+ sscanf(val, "%" PRIu64, &vtmp->avgcount);
+ vtmp->avgcount_exists = 0;
+ //return after saving avgcount - don't dispatch value
+ //until latency calculation
+ return 0;
}
else
{
- result = get_last_avg(dset_name, ds_name, sum, vtmp->avgcount);
- if(result == -ENOMEM)
+ double sum, result;
+ sscanf(val, "%lf", &sum);
+ DEBUG("ceph plugin: avgcount:%" PRIu64,vtmp->avgcount);
+ DEBUG("ceph plugin: sum:%lf",sum);
+
+ if(vtmp->avgcount == 0)
{
- return -ENOMEM;
+ vtmp->avgcount = 1;
+ }
+
+ /** User wants latency values as long run avg */
+ if(long_run_latency_avg)
+ {
+ result = (sum / vtmp->avgcount);
+ DEBUG("ceph plugin: uv->gauge = sumd / avgcounti = :%lf", result);
+ }
+ else
+ {
+ result = get_last_avg(ds_name, vtmp->latency_index, sum, vtmp->avgcount);
+ if(result == -ENOMEM)
+ {
+ return -ENOMEM;
+ }
+ DEBUG("ceph plugin: uv->gauge = (sumd_now - sumd_last) / "
+ "(avgcounti_now - avgcounti_last) = :%lf", result);
}
- DEBUG("uv->gauge = (sumd_now - sumd_last) / "
- "(avgcounti_now - avgcounti_last) = :%lf", result);
- }
- uv->gauge = result;
- vtmp->avgcount = -1;
- }
- }
- else if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
- {
- uint64_t derive_val;
- sscanf(val, "%" PRIu64, &derive_val);
- uv->derive = derive_val;
- DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
- }
- else
- {
- double other_val;
- sscanf(val, "%lf", &other_val);
- uv->gauge = other_val;
- DEBUG("uv->gauge %lf",uv->gauge);
+ uv.gauge = result;
+ vtmp->avgcount_exists = -1;
+ vtmp->latency_index = (vtmp->latency_index + 1);
+ }
+ break;
+ case DSET_BYTES:
+ sscanf(val, "%lf", &tmp_d);
+ uv.gauge = tmp_d;
+ DEBUG("ceph plugin: uv->gauge = %lf",uv.gauge);
+ break;
+ case DSET_RATE:
+ sscanf(val, "%" PRIu64, &tmp_u);
+ uv.derive = tmp_u;
+ DEBUG("ceph plugin: uv->derive = %" PRIu64 "",(uint64_t)uv.derive);
+ break;
+ case DSET_TYPE_UNFOUND:
+ default:
+ ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
+ return -1;
}
+
+ sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
+ sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
+ vtmp->vlist.values = &uv;
+ vtmp->vlist.values_len = 1;
+
+ DEBUG("ceph plugin: dispatching %s\n", ds_name);
+ vtmp->index = (vtmp->index + 1);
+ plugin_dispatch_values(&vtmp->vlist);
+
return 0;
}
static int
cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
{
- int i, ret = 0;
- struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp)
- + (sizeof(struct values_holder)) * io->d->dset_num);
+ int ret;
+ struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
if(!vtmp)
{
return -ENOMEM;
}
- for(i = 0; i < io->d->dset_num; i++)
- {
- value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
- vtmp->vh[i].values = val;
- vtmp->vh[i].values_len = io->d->dset[i].ds_num;
- }
+ vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
+ sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
+ sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
+ sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
+
vtmp->d = io->d;
- vtmp->holder_num = io->d->dset_num;
- vtmp->avgcount = -1;
+ vtmp->avgcount_exists = -1;
+ vtmp->latency_index = 0;
+ vtmp->index = 0;
yajl->handler_arg = vtmp;
ret = traverse_json(io->json, io->json_len, hand);
- if(ret)
- {
- goto done;
- }
- for(i = 0; i < vtmp->holder_num; i++)
- {
- value_list_t vl = VALUE_LIST_INIT;
- sstrncpy(vl.host, hostname_g, sizeof(vl.host));
- sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
- strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
- sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
- vl.values = vtmp->vh[i].values;
- vl.values_len = io->d->dset[i].ds_num;
- DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
- io->d->name, vl.values_len, io->json);
- ret = plugin_dispatch_values(&vl);
- if(ret)
- {
- goto done;
- }
- }
-
- done: for(i = 0; i < vtmp->holder_num; i++)
- {
- sfree(vtmp->vh[i].values);
- }
sfree(vtmp);
return ret;
}
+/**
+ * Initiate JSON parsing and print error if one occurs
+ */
static int cconn_process_json(struct cconn *io)
{
if((io->request_type != ASOK_REQ_DATA) &&
size_t cmd_len = strlen(cmd);
RETRY_ON_EINTR(ret,
write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
- DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
io->d->name, io->state, io->amt, ret);
if(ret < 0)
{
RETRY_ON_EINTR(ret,
read(io->asok, ((char*)(&io->d->version)) + io->amt,
sizeof(io->d->version) - io->amt));
- DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
io->d->name, io->state, ret);
if(ret < 0)
{
"expecting version %d!", io->d->name, io->d->version);
return -ENOTSUP;
}
- DEBUG("cconn_handle_event(name=%s): identified as "
+ DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
"version %d", io->d->name, io->d->version);
io->amt = 0;
cconn_close(io);
RETRY_ON_EINTR(ret,
read(io->asok, ((char*)(&io->json_len)) + io->amt,
sizeof(io->json_len) - io->amt));
- DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
io->d->name, io->state, ret);
if(ret < 0)
{
{
RETRY_ON_EINTR(ret,
read(io->asok, io->json + io->amt, io->json_len - io->amt));
- DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
io->d->name, io->state, ret);
if(ret < 0)
{
/* The request has already been serviced. */
return 0;
}
- else if((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
+ else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
{
/* If there are no counters to report on, don't bother
* connecting */
struct timeval end_tv;
struct cconn io_array[g_num_daemons];
- DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
+ DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
/* create cconn array */
memset(io_array, 0, sizeof(io_array));
}
else if(ret == 1)
{
- DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
+ DEBUG("ceph plugin: did cconn_prepare(name=%s,i=%d,st=%d)",
io->d->name, i, io->state);
polled_io_array[nfds++] = io_array + i;
}
{
/* finished */
ret = 0;
- DEBUG("cconn_main_loop: no more cconn to manage.");
+ DEBUG("ceph plugin: cconn_main_loop: no more cconn to manage.");
goto done;
}
gettimeofday(&tv, NULL);
}
if(some_unreachable)
{
- DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
+ DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
}
else
{
- DEBUG("cconn_main_loop: reached all Ceph daemons :)");
+ DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
}
return ret;
}
/******* lifecycle *******/
static int ceph_init(void)
{
- int i, ret, j;
- DEBUG("ceph_init");
+ int ret;
+ DEBUG("ceph plugin: ceph_init");
ceph_daemons_print();
ret = cconn_main_loop(ASOK_REQ_VERSION);
- if(ret)
- {
- return ret;
- }
- for(i = 0; i < g_num_daemons; ++i)
- {
- struct ceph_daemon *d = g_daemons[i];
- for(j = 0; j < d->dset_num; j++)
- {
- ret = plugin_register_data_set(d->dset + j);
- if(ret)
- {
- ERROR("ceph plugin: plugin_register_data_set(%s) failed!",
- d->name);
- }
- else
- {
- DEBUG("plugin_register_data_set(%s): "
- "(d->dset)[%d]->ds_num=%d",
- d->name, j, d->dset[j].ds_num);
- }
- }
- }
- return 0;
+
+ return (ret) ? ret : 0;
}
static int ceph_shutdown(void)
sfree(last_poll_data);
last_poll_data = NULL;
last_idx = 0;
- DEBUG("finished ceph_shutdown");
+ DEBUG("ceph plugin: finished ceph_shutdown");
return 0;
}