+/**
+ * A set of data that we build up in memory while parsing the JSON.
+ */
+struct values_tmp
+{
+ /** ceph daemon we are processing data for*/
+ struct ceph_daemon *d;
+ /** track avgcount across counters for avgcount/sum latency pairs */
+ uint64_t avgcount;
+ /** current index of counters - used to get type of counter */
+ int index;
+ /** do we already have an avgcount for latency pair */
+ int avgcount_exists;
+ /**
+ * similar to index, but current index of latency type counters -
+ * used to get last poll data of counter
+ */
+ int latency_index;
+ /**
+ * values list - maintain across counters since
+ * host/plugin/plugin instance are always the same
+ */
+ value_list_t vlist;
+};
+
+/**
+ * A set of count/sum pairs to keep track of latency types and get difference
+ * between this poll data and last poll data.
+ */
+struct last_data
+{
+ char ds_name[DATA_MAX_NAME_LEN];
+ double last_sum;
+ uint64_t last_count;
+};
+
+/******* network I/O *******/
+enum cstate_t
+{
+ CSTATE_UNCONNECTED = 0,
+ CSTATE_WRITE_REQUEST,
+ CSTATE_READ_VERSION,
+ CSTATE_READ_AMT,
+ CSTATE_READ_JSON,
+};
+
+enum request_type_t
+{
+ ASOK_REQ_VERSION = 0,
+ ASOK_REQ_DATA = 1,
+ ASOK_REQ_SCHEMA = 2,
+ ASOK_REQ_NONE = 1000,
+};
+
+struct cconn
+{
+ /** The Ceph daemon that we're talking to */
+ struct ceph_daemon *d;
+
+ /** Request type */
+ uint32_t request_type;
+
+ /** The connection state */
+ enum cstate_t state;
+
+ /** The socket we use to talk to this daemon */
+ int asok;
+
+ /** The amount of data remaining to read / write. */
+ uint32_t amt;
+
+ /** Length of the JSON to read */
+ uint32_t json_len;
+
+ /** Buffer containing JSON data */
+ unsigned char *json;
+
+ /** Keep data important to yajl processing */
+ struct yajl_struct yajl;
+};
+
+static int ceph_cb_null(void *ctx)
+{
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_boolean(void *ctx, int bool_val)
+{
+ return CEPH_CB_CONTINUE;
+}
+
+#define BUFFER_ADD(dest, src) do { \
+ size_t dest_size = sizeof (dest); \
+ strncat ((dest), (src), dest_size - strlen (dest)); \
+ (dest)[dest_size - 1] = 0; \
+} while (0)
+
+static int
+ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
+{
+ yajl_struct *state = (yajl_struct*) ctx;
+ char buffer[number_len+1];
+ char key[2 * DATA_MAX_NAME_LEN];
+ _Bool latency_type = 0;
+ size_t i;
+ int status;
+
+ memcpy(buffer, number_val, number_len);
+ buffer[sizeof(buffer) - 1] = 0;
+
+ memset (key, 0, sizeof (key));
+ for (i = 0; i < state->depth; i++)
+ {
+ if (state->stack[i] == NULL)
+ continue;
+
+ if (strlen (key) != 0)
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->stack[i]);
+ }
+
+ /* Special case for latency metrics. */
+ if ((strcmp ("avgcount", state->key) == 0)
+ || (strcmp ("sum", state->key) == 0))
+ {
+ latency_type = 1;
+
+ /* Super-special case for filestore.journal_wr_bytes.avgcount: For
+ * some reason, Ceph schema encodes this as a count/sum pair while all
+ * other "Bytes" data (excluding used/capacity bytes for OSD space) uses
+ * a single "Derive" type. To spare further confusion, keep this KPI as
+ * the same type of other "Bytes". Instead of keeping an "average" or
+ * "rate", use the "sum" in the pair and assign that to the derive
+ * value. */
+ if (convert_special_metrics && (state->depth >= 2)
+ && (strcmp("filestore", state->stack[state->depth - 2]) == 0)
+ && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0)
+ && (strcmp("avgcount", state->key) == 0))
+ {
+ DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
+ return CEPH_CB_CONTINUE;
+ }
+ }
+ else /* not a latency type */
+ {
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->key);
+ }
+
+ status = state->handler(state->handler_arg, buffer, key);
+ if((status == RETRY_AVGCOUNT) && latency_type)
+ {
+ /* Add previously skipped part of the key, either "avgcount" or "sum",
+ * and try again. */
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->key);
+
+ status = state->handler(state->handler_arg, buffer, key);
+ }
+
+ if (status != 0)
+ {
+ ERROR("ceph plugin: JSON handler failed with status %d.", status);
+ return CEPH_CB_ABORT;
+ }
+
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_string(void *ctx, const unsigned char *string_val,
+ yajl_len_t string_len)
+{
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_start_map(void *ctx)
+{
+ yajl_struct *state = (yajl_struct*) ctx;
+
+ /* Push key to the stack */
+ if (state->depth == YAJL_MAX_DEPTH)
+ return CEPH_CB_ABORT;
+
+ state->stack[state->depth] = state->key;
+ state->depth++;
+ state->key = NULL;
+
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_end_map(void *ctx)
+{
+ yajl_struct *state = (yajl_struct*) ctx;
+
+ /* Pop key from the stack */
+ if (state->depth == 0)
+ return CEPH_CB_ABORT;
+
+ sfree (state->key);
+ state->depth--;
+ state->key = state->stack[state->depth];
+ state->stack[state->depth] = NULL;
+
+ return CEPH_CB_CONTINUE;
+}
+
+static int
+ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
+{
+ yajl_struct *state = (yajl_struct*) ctx;
+ size_t sz = ((size_t) string_len) + 1;
+
+ sfree (state->key);
+ state->key = malloc (sz);
+ if (state->key == NULL)
+ {
+ ERROR ("ceph plugin: malloc failed.");
+ return CEPH_CB_ABORT;
+ }
+
+ memmove (state->key, key, sz - 1);
+ state->key[sz - 1] = 0;
+
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_start_array(void *ctx)
+{
+ return CEPH_CB_CONTINUE;
+}
+
+static int ceph_cb_end_array(void *ctx)
+{
+ return CEPH_CB_CONTINUE;
+}
+
+static yajl_callbacks callbacks = {
+ ceph_cb_null,
+ ceph_cb_boolean,
+ NULL,
+ NULL,
+ ceph_cb_number,
+ ceph_cb_string,
+ ceph_cb_start_map,
+ ceph_cb_map_key,
+ ceph_cb_end_map,
+ ceph_cb_start_array,
+ ceph_cb_end_array
+};
+