Merge pull request #3339 from jkohen/patch-1
[collectd.git] / src / ceph.c
1 /**
2  * collectd - src/ceph.c
3  * Copyright (C) 2011  New Dream Network
4  * Copyright (C) 2015  Florian octo Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Colin McCabe <cmccabe at alumni.cmu.edu>
21  *   Dennis Zou <yunzou at cisco.com>
22  *   Dan Ryder <daryder at cisco.com>
23  *   Florian octo Forster <octo at collectd.org>
24  **/
25
26 #define _DEFAULT_SOURCE
27 #define _BSD_SOURCE
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <yajl/yajl_parse.h>
37 #if HAVE_YAJL_YAJL_VERSION_H
38 #include <yajl/yajl_version.h>
39 #endif
40
41 #include <limits.h>
42 #include <poll.h>
43 #include <stdint.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <strings.h>
48 #include <sys/time.h>
49 #include <sys/types.h>
50 #include <sys/un.h>
51 #include <unistd.h>
52 #include <math.h>
53 #include <inttypes.h>
54
55 #define RETRY_AVGCOUNT -1
56
57 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
58 # define HAVE_YAJL_V2 1
59 #endif
60
61 #define RETRY_ON_EINTR(ret, expr) \
62     while(1) { \
63         ret = expr; \
64         if(ret >= 0) \
65             break; \
66         ret = -errno; \
67         if(ret != -EINTR) \
68             break; \
69     }
70
71 /** Timeout interval in seconds */
72 #define CEPH_TIMEOUT_INTERVAL 1
73
74 /** Maximum path length for a UNIX domain socket on this system */
75 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
76
77 /** Yajl callback returns */
78 #define CEPH_CB_CONTINUE 1
79 #define CEPH_CB_ABORT 0
80
81 #if HAVE_YAJL_V2
82 typedef size_t yajl_len_t;
83 #else
84 typedef unsigned int yajl_len_t;
85 #endif
86
87 /** Number of types for ceph defined in types.db */
88 #define CEPH_DSET_TYPES_NUM 3
89 /** ceph types enum */
90 enum ceph_dset_type_d
91 {
92     DSET_LATENCY = 0,
93     DSET_BYTES = 1,
94     DSET_RATE = 2,
95     DSET_TYPE_UNFOUND = 1000
96 };
97
98 /** Valid types for ceph defined in types.db */
99 const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
100                                    {"ceph_latency", "ceph_bytes", "ceph_rate"};
101
102 /******* ceph_daemon *******/
103 struct ceph_daemon
104 {
105     /** Version of the admin_socket interface */
106     uint32_t version;
107     /** daemon name **/
108     char name[DATA_MAX_NAME_LEN];
109
110     /** Path to the socket that we use to talk to the ceph daemon */
111     char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
112
113     /** Number of counters */
114     int ds_num;
115     /** Track ds types */
116     uint32_t *ds_types;
117     /** Track ds names to match with types */
118     char **ds_names;
119
120     /**
121      * Keep track of last data for latency values so we can calculate rate
122      * since last poll.
123      */
124     struct last_data **last_poll_data;
125     /** index of last poll data */
126     int last_idx;
127 };
128
129 /******* JSON parsing *******/
130 typedef int (*node_handler_t)(void *, const char*, const char*);
131
132 /** Track state and handler while parsing JSON */
133 struct yajl_struct
134 {
135     node_handler_t handler;
136     void * handler_arg;
137     struct {
138       char key[DATA_MAX_NAME_LEN];
139       int key_len;
140     } state[YAJL_MAX_DEPTH];
141     int depth;
142 };
143 typedef struct yajl_struct yajl_struct;
144
145 enum perfcounter_type_d
146 {
147     PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
148 };
149
150 /** Give user option to use default (long run = since daemon started) avg */
151 static int long_run_latency_avg = 0;
152
153 /**
154  * Give user option to use default type for special cases -
155  * filestore.journal_wr_bytes is currently only metric here. Ceph reports the
156  * type as a sum/count pair and will calculate it the same as a latency value.
157  * All other "bytes" metrics (excluding the used/capacity bytes for the OSD)
158  * use the DERIVE type. Unless user specifies to use given type, convert this
159  * metric to use DERIVE.
160  */
161 static int convert_special_metrics = 1;
162
163 /** Array of daemons to monitor */
164 static struct ceph_daemon **g_daemons = NULL;
165
166 /** Number of elements in g_daemons */
167 static int g_num_daemons = 0;
168
169 /**
170  * A set of data that we build up in memory while parsing the JSON.
171  */
172 struct values_tmp
173 {
174     /** ceph daemon we are processing data for*/
175     struct ceph_daemon *d;
176     /** track avgcount across counters for avgcount/sum latency pairs */
177     uint64_t avgcount;
178     /** current index of counters - used to get type of counter */
179     int index;
180     /** do we already have an avgcount for latency pair */
181     int avgcount_exists;
182     /**
183      * similar to index, but current index of latency type counters -
184      * used to get last poll data of counter
185      */
186     int latency_index;
187     /**
188      * values list - maintain across counters since
189      * host/plugin/plugin instance are always the same
190      */
191     value_list_t vlist;
192 };
193
194 /**
195  * A set of count/sum pairs to keep track of latency types and get difference
196  * between this poll data and last poll data.
197  */
198 struct last_data
199 {
200     char ds_name[DATA_MAX_NAME_LEN];
201     double last_sum;
202     uint64_t last_count;
203 };
204
205 /******* network I/O *******/
206 enum cstate_t
207 {
208     CSTATE_UNCONNECTED = 0,
209     CSTATE_WRITE_REQUEST,
210     CSTATE_READ_VERSION,
211     CSTATE_READ_AMT,
212     CSTATE_READ_JSON,
213 };
214
215 enum request_type_t
216 {
217     ASOK_REQ_VERSION = 0,
218     ASOK_REQ_DATA = 1,
219     ASOK_REQ_SCHEMA = 2,
220     ASOK_REQ_NONE = 1000,
221 };
222
223 struct cconn
224 {
225     /** The Ceph daemon that we're talking to */
226     struct ceph_daemon *d;
227
228     /** Request type */
229     uint32_t request_type;
230
231     /** The connection state */
232     enum cstate_t state;
233
234     /** The socket we use to talk to this daemon */
235     int asok;
236
237     /** The amount of data remaining to read / write. */
238     uint32_t amt;
239
240     /** Length of the JSON to read */
241     uint32_t json_len;
242
243     /** Buffer containing JSON data */
244     unsigned char *json;
245
246     /** Keep data important to yajl processing */
247     struct yajl_struct yajl;
248 };
249
250 static int ceph_cb_null(void *ctx)
251 {
252     return CEPH_CB_CONTINUE;
253 }
254
255 static int ceph_cb_boolean(void *ctx, int bool_val)
256 {
257     return CEPH_CB_CONTINUE;
258 }
259
260 static int
261 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
262 {
263     yajl_struct *yajl = (yajl_struct*)ctx;
264     char buffer[number_len+1];
265     int i, latency_type = 0, result;
266     char key[128];
267
268     memcpy(buffer, number_val, number_len);
269     buffer[sizeof(buffer) - 1] = 0;
270
271     ssnprintf(key, yajl->state[0].key_len, "%s", yajl->state[0].key);
272     for(i = 1; i < yajl->depth; i++)
273     {
274         if((i == yajl->depth-1) && ((strcmp(yajl->state[i].key,"avgcount") == 0)
275                 || (strcmp(yajl->state[i].key,"sum") == 0)))
276         {
277             if(convert_special_metrics)
278             {
279                 /**
280                  * Special case for filestore:JournalWrBytes. For some reason,
281                  * Ceph schema encodes this as a count/sum pair while all
282                  * other "Bytes" data (excluding used/capacity bytes for OSD
283                  * space) uses a single "Derive" type. To spare further
284                  * confusion, keep this KPI as the same type of other "Bytes".
285                  * Instead of keeping an "average" or "rate", use the "sum" in
286                  * the pair and assign that to the derive value.
287                  */
288                 if((strcmp(yajl->state[i-1].key, "journal_wr_bytes") == 0) &&
289                         (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
290                         (strcmp(yajl->state[i].key,"avgcount") == 0))
291                 {
292                     DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
293                     yajl->depth = (yajl->depth - 1);
294                     return CEPH_CB_CONTINUE;
295                 }
296             }
297             //probably a avgcount/sum pair. if not - we'll try full key later
298             latency_type = 1;
299             break;
300         }
301         strncat(key, ".", 1);
302         strncat(key, yajl->state[i].key, yajl->state[i].key_len+1);
303     }
304
305     result = yajl->handler(yajl->handler_arg, buffer, key);
306
307     if((result == RETRY_AVGCOUNT) && latency_type)
308     {
309         strncat(key, ".", 1);
310         strncat(key, yajl->state[yajl->depth-1].key,
311                 yajl->state[yajl->depth-1].key_len+1);
312         result = yajl->handler(yajl->handler_arg, buffer, key);
313     }
314
315     if(result == -ENOMEM)
316     {
317         ERROR("ceph plugin: memory allocation failed");
318         return CEPH_CB_ABORT;
319     }
320
321     yajl->depth = (yajl->depth - 1);
322     return CEPH_CB_CONTINUE;
323 }
324
325 static int ceph_cb_string(void *ctx, const unsigned char *string_val,
326         yajl_len_t string_len)
327 {
328     return CEPH_CB_CONTINUE;
329 }
330
331 static int ceph_cb_start_map(void *ctx)
332 {
333     return CEPH_CB_CONTINUE;
334 }
335
336 static int
337 ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
338 {
339     yajl_struct *yajl = (yajl_struct*)ctx;
340
341     if((yajl->depth+1)  >= YAJL_MAX_DEPTH)
342     {
343         ERROR("ceph plugin: depth exceeds max, aborting.");
344         return CEPH_CB_ABORT;
345     }
346
347     char buffer[string_len+1];
348
349     memcpy(buffer, key, string_len);
350     buffer[sizeof(buffer) - 1] = 0;
351
352     snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer);
353     yajl->state[yajl->depth].key_len = sizeof(buffer);
354     yajl->depth = (yajl->depth + 1);
355
356     return CEPH_CB_CONTINUE;
357 }
358
359 static int ceph_cb_end_map(void *ctx)
360 {
361     yajl_struct *yajl = (yajl_struct*)ctx;
362
363     yajl->depth = (yajl->depth - 1);
364     return CEPH_CB_CONTINUE;
365 }
366
367 static int ceph_cb_start_array(void *ctx)
368 {
369     return CEPH_CB_CONTINUE;
370 }
371
372 static int ceph_cb_end_array(void *ctx)
373 {
374     return CEPH_CB_CONTINUE;
375 }
376
377 static yajl_callbacks callbacks = {
378         ceph_cb_null,
379         ceph_cb_boolean,
380         NULL,
381         NULL,
382         ceph_cb_number,
383         ceph_cb_string,
384         ceph_cb_start_map,
385         ceph_cb_map_key,
386         ceph_cb_end_map,
387         ceph_cb_start_array,
388         ceph_cb_end_array
389 };
390
391 static void ceph_daemon_print(const struct ceph_daemon *d)
392 {
393     DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
394 }
395
396 static void ceph_daemons_print(void)
397 {
398     int i;
399     for(i = 0; i < g_num_daemons; ++i)
400     {
401         ceph_daemon_print(g_daemons[i]);
402     }
403 }
404
405 static void ceph_daemon_free(struct ceph_daemon *d)
406 {
407     int i = 0;
408     for(; i < d->last_idx; i++)
409     {
410         sfree(d->last_poll_data[i]);
411     }
412     sfree(d->last_poll_data);
413     d->last_poll_data = NULL;
414     d->last_idx = 0;
415     for(i = 0; i < d->ds_num; i++)
416     {
417         sfree(d->ds_names[i]);
418     }
419     sfree(d->ds_types);
420     sfree(d->ds_names);
421     sfree(d);
422 }
423
424 /* compact_ds_name removed the special characters ":", "_", "-" and "+" from the
425  * intput string. Characters following these special characters are capitalized.
426  * Trailing "+" and "-" characters are replaces with the strings "Plus" and
427  * "Minus". */
428 static int compact_ds_name (char *buffer, size_t buffer_size, char const *src)
429 {
430     char *src_copy;
431     size_t src_len;
432     char *ptr = buffer;
433     size_t ptr_size = buffer_size;
434     _Bool append_plus = 0;
435     _Bool append_minus = 0;
436
437     if ((buffer == NULL) || (buffer_size <= strlen ("Minus")) || (src == NULL))
438       return EINVAL;
439
440     src_copy = strdup (src);
441     src_len = strlen(src);
442
443     /* Remove trailing "+" and "-". */
444     if (src_copy[src_len - 1] == '+')
445     {
446         append_plus = 1;
447         src_len--;
448         src_copy[src_len] = 0;
449     }
450     else if (src_copy[src_len - 1] == '-')
451     {
452         append_minus = 1;
453         src_len--;
454         src_copy[src_len] = 0;
455     }
456
457     /* Split at special chars, capitalize first character, append to buffer. */
458     char *dummy = src_copy;
459     char *token;
460     char *save_ptr = NULL;
461     while ((token = strtok_r (dummy, ":_-+", &save_ptr)) != NULL)
462     {
463         size_t len;
464
465         dummy = NULL;
466
467         token[0] = toupper ((int) token[0]);
468
469         assert (ptr_size > 1);
470
471         len = strlen (token);
472         if (len >= ptr_size)
473             len = ptr_size - 1;
474
475         assert (len > 0);
476         assert (len < ptr_size);
477
478         sstrncpy (ptr, token, len + 1);
479         ptr += len;
480         ptr_size -= len;
481
482         assert (*ptr == 0);
483         if (ptr_size <= 1)
484             break;
485     }
486
487     /* Append "Plus" or "Minus" if "+" or "-" has been stripped above. */
488     if (append_plus || append_minus)
489     {
490         char const *append = "Plus";
491         if (append_minus)
492             append = "Minus";
493
494         size_t offset = buffer_size - (strlen (append) + 1);
495         if (offset > strlen (buffer))
496             offset = strlen (buffer);
497
498         sstrncpy (buffer + offset, append, buffer_size - offset);
499     }
500
501     sfree (src_copy);
502     return 0;
503 }
504
505 static _Bool has_suffix (char const *str, char const *suffix)
506 {
507     size_t str_len = strlen (str);
508     size_t suffix_len = strlen (suffix);
509     size_t offset;
510
511     if (suffix_len > str_len)
512         return 0;
513     offset = str_len - suffix_len;
514
515     if (strcmp (str + offset, suffix) == 0)
516         return 1;
517
518     return 0;
519 }
520
521 /* count_parts returns the number of elements a "foo.bar.baz" style key has. */
522 static size_t count_parts (char const *key)
523 {
524     char const *ptr;
525     size_t parts_num = 0;
526
527     for (ptr = key; ptr != NULL; ptr = strchr (ptr + 1, '.'))
528         parts_num++;
529
530     return parts_num;
531 }
532
533 /**
534  * Parse key to remove "type" if this is for schema and initiate compaction
535  */
536 static int parse_keys (char *buffer, size_t buffer_size, const char *key_str)
537 {
538     char tmp[2 * buffer_size];
539
540     if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen (key_str) == 0)
541         return EINVAL;
542
543     if ((count_parts (key_str) > 2) && has_suffix (key_str, ".type"))
544     {
545         /* strip ".type" suffix iff the key has more than two parts. */
546         size_t sz = strlen (key_str) - strlen (".type") + 1;
547
548         if (sz > sizeof (tmp))
549             sz = sizeof (tmp);
550         sstrncpy (tmp, key_str, sz);
551     }
552     else
553     {
554         sstrncpy (tmp, key_str, sizeof (tmp));
555     }
556
557     return compact_ds_name (buffer, buffer_size, tmp);
558 }
559
560 /**
561  * while parsing ceph admin socket schema, save counter name and type for later
562  * data processing
563  */
564 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
565         int pc_type)
566 {
567     uint32_t type;
568     char ds_name[DATA_MAX_NAME_LEN];
569     memset(ds_name, 0, sizeof(ds_name));
570
571     if(convert_special_metrics)
572     {
573         /**
574          * Special case for filestore:JournalWrBytes. For some reason, Ceph
575          * schema encodes this as a count/sum pair while all other "Bytes" data
576          * (excluding used/capacity bytes for OSD space) uses a single "Derive"
577          * type. To spare further confusion, keep this KPI as the same type of
578          * other "Bytes". Instead of keeping an "average" or "rate", use the
579          * "sum" in the pair and assign that to the derive value.
580          */
581         if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
582         {
583             pc_type = 10;
584         }
585     }
586
587     d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
588     if(!d->ds_names)
589     {
590         return -ENOMEM;
591     }
592
593     d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
594     if(!d->ds_types)
595     {
596         return -ENOMEM;
597     }
598
599     d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
600     if(!d->ds_names[d->ds_num])
601     {
602         return -ENOMEM;
603     }
604
605     type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
606             ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
607     d->ds_types[d->ds_num] = type;
608
609     if (parse_keys(ds_name, sizeof (ds_name), name))
610     {
611         return 1;
612     }
613
614     sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
615     d->ds_num = (d->ds_num + 1);
616
617     return 0;
618 }
619
620 /******* ceph_config *******/
621 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
622 {
623     const char *val;
624     if(item->values_num != 1)
625     {
626         return -ENOTSUP;
627     }
628     if(item->values[0].type != OCONFIG_TYPE_STRING)
629     {
630         return -ENOTSUP;
631     }
632     val = item->values[0].value.string;
633     if(snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
634     {
635         ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
636                 item->key);
637         return -ENAMETOOLONG;
638     }
639     return 0;
640 }
641
642 static int cc_handle_bool(struct oconfig_item_s *item, int *dest)
643 {
644     if(item->values_num != 1)
645     {
646         return -ENOTSUP;
647     }
648
649     if(item->values[0].type != OCONFIG_TYPE_BOOLEAN)
650     {
651         return -ENOTSUP;
652     }
653
654     *dest = (item->values[0].value.boolean) ? 1 : 0;
655     return 0;
656 }
657
658 static int cc_add_daemon_config(oconfig_item_t *ci)
659 {
660     int ret, i;
661     struct ceph_daemon *nd, cd;
662     struct ceph_daemon **tmp;
663     memset(&cd, 0, sizeof(struct ceph_daemon));
664
665     if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
666     {
667         WARNING("ceph plugin: `Daemon' blocks need exactly one string "
668                 "argument.");
669         return (-1);
670     }
671
672     ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
673     if(ret)
674     {
675         return ret;
676     }
677
678     for(i=0; i < ci->children_num; i++)
679     {
680         oconfig_item_t *child = ci->children + i;
681
682         if(strcasecmp("SocketPath", child->key) == 0)
683         {
684             ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
685             if(ret)
686             {
687                 return ret;
688             }
689         }
690         else
691         {
692             WARNING("ceph plugin: ignoring unknown option %s", child->key);
693         }
694     }
695     if(cd.name[0] == '\0')
696     {
697         ERROR("ceph plugin: you must configure a daemon name.\n");
698         return -EINVAL;
699     }
700     else if(cd.asok_path[0] == '\0')
701     {
702         ERROR("ceph plugin(name=%s): you must configure an administrative "
703         "socket path.\n", cd.name);
704         return -EINVAL;
705     }
706     else if(!((cd.asok_path[0] == '/') ||
707             (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
708     {
709         ERROR("ceph plugin(name=%s): administrative socket paths must begin "
710                 "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
711         return -EINVAL;
712     }
713
714     tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons));
715     if(tmp == NULL)
716     {
717         /* The positive return value here indicates that this is a
718          * runtime error, not a configuration error.  */
719         return ENOMEM;
720     }
721     g_daemons = tmp;
722
723     nd = malloc(sizeof(*nd));
724     if(!nd)
725     {
726         return ENOMEM;
727     }
728     memcpy(nd, &cd, sizeof(*nd));
729     g_daemons[g_num_daemons++] = nd;
730     return 0;
731 }
732
733 static int ceph_config(oconfig_item_t *ci)
734 {
735     int ret, i;
736
737     for(i = 0; i < ci->children_num; ++i)
738     {
739         oconfig_item_t *child = ci->children + i;
740         if(strcasecmp("Daemon", child->key) == 0)
741         {
742             ret = cc_add_daemon_config(child);
743             if(ret == ENOMEM)
744             {
745                 ERROR("ceph plugin: Couldn't allocate memory");
746                 return ret;
747             }
748             else if(ret)
749             {
750                 //process other daemons and ignore this one
751                 continue;
752             }
753         }
754         else if(strcasecmp("LongRunAvgLatency", child->key) == 0)
755         {
756             ret = cc_handle_bool(child, &long_run_latency_avg);
757             if(ret)
758             {
759                 return ret;
760             }
761         }
762         else if(strcasecmp("ConvertSpecialMetricTypes", child->key) == 0)
763         {
764             ret = cc_handle_bool(child, &convert_special_metrics);
765             if(ret)
766             {
767                 return ret;
768             }
769         }
770         else
771         {
772             WARNING("ceph plugin: ignoring unknown option %s", child->key);
773         }
774     }
775     return 0;
776 }
777
778 /**
779  * Parse JSON and get error message if present
780  */
781 static int
782 traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
783 {
784     yajl_status status = yajl_parse(hand, json, json_len);
785     unsigned char *msg;
786
787     switch(status)
788     {
789         case yajl_status_error:
790             msg = yajl_get_error(hand, /* verbose = */ 1,
791                                        /* jsonText = */ (unsigned char *) json,
792                                                       (unsigned int) json_len);
793             ERROR ("ceph plugin: yajl_parse failed: %s", msg);
794             yajl_free_error(hand, msg);
795             return 1;
796         case yajl_status_client_canceled:
797             return 1;
798         default:
799             return 0;
800     }
801 }
802
803 /**
804  * Add entry for each counter while parsing schema
805  */
806 static int
807 node_handler_define_schema(void *arg, const char *val, const char *key)
808 {
809     struct ceph_daemon *d = (struct ceph_daemon *) arg;
810     int pc_type;
811     pc_type = atoi(val);
812     return ceph_daemon_add_ds_entry(d, key, pc_type);
813 }
814
815 /**
816  * Latency counter does not yet have an entry in last poll data - add it.
817  */
818 static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
819         uint64_t cur_count)
820 {
821     d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
822     if(!d->last_poll_data[d->last_idx])
823     {
824         return -ENOMEM;
825     }
826     sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
827             sizeof(d->last_poll_data[d->last_idx]->ds_name));
828     d->last_poll_data[d->last_idx]->last_sum = cur_sum;
829     d->last_poll_data[d->last_idx]->last_count = cur_count;
830     d->last_idx = (d->last_idx + 1);
831     return 0;
832 }
833
834 /**
835  * Update latency counter or add new entry if it doesn't exist
836  */
837 static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
838         double cur_sum, uint64_t cur_count)
839 {
840     if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
841     {
842         d->last_poll_data[index]->last_sum = cur_sum;
843         d->last_poll_data[index]->last_count = cur_count;
844         return 0;
845     }
846
847     if(!d->last_poll_data)
848     {
849         d->last_poll_data = malloc(1 * sizeof(struct last_data *));
850         if(!d->last_poll_data)
851         {
852             return -ENOMEM;
853         }
854     }
855     else
856     {
857         struct last_data **tmp_last = realloc(d->last_poll_data,
858                 ((d->last_idx+1) * sizeof(struct last_data *)));
859         if(!tmp_last)
860         {
861             return -ENOMEM;
862         }
863         d->last_poll_data = tmp_last;
864     }
865     return add_last(d, ds_n, cur_sum, cur_count);
866 }
867
868 /**
869  * If using index guess failed (shouldn't happen, but possible if counters
870  * get rearranged), resort to searching for counter name
871  */
872 static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
873 {
874     int i = 0;
875     for(; i < d->last_idx; i++)
876     {
877         if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
878         {
879             return i;
880         }
881     }
882     return -1;
883 }
884
885 /**
886  * Calculate average b/t current data and last poll data
887  * if last poll data exists
888  */
889 static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
890         double cur_sum, uint64_t cur_count)
891 {
892     double result = -1.1, sum_delt = 0.0;
893     uint64_t count_delt = 0;
894     int tmp_index = 0;
895     if(d->last_idx > index)
896     {
897         if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
898         {
899             tmp_index = index;
900         }
901         //test previous index
902         else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
903         {
904             tmp_index = (index - 1);
905         }
906         else
907         {
908             tmp_index = backup_search_for_last_avg(d, ds_n);
909         }
910
911         if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
912         {
913             sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
914             count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
915             result = (sum_delt / count_delt);
916         }
917     }
918
919     if(result == -1.1)
920     {
921         result = NAN;
922     }
923     if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
924     {
925         return -ENOMEM;
926     }
927     return result;
928 }
929
930 /**
931  * If using index guess failed, resort to searching for counter name
932  */
933 static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
934 {
935     int idx = 0;
936     for(; idx < d->ds_num; idx++)
937     {
938         if(strcmp(d->ds_names[idx], ds_name) == 0)
939         {
940             return d->ds_types[idx];
941         }
942     }
943     return DSET_TYPE_UNFOUND;
944 }
945
946 /**
947  * Process counter data and dispatch values
948  */
949 static int node_handler_fetch_data(void *arg, const char *val, const char *key)
950 {
951     value_t uv;
952     double tmp_d;
953     uint64_t tmp_u;
954     struct values_tmp *vtmp = (struct values_tmp*) arg;
955     uint32_t type = DSET_TYPE_UNFOUND;
956     int index = vtmp->index;
957
958     char ds_name[DATA_MAX_NAME_LEN];
959     memset(ds_name, 0, sizeof(ds_name));
960
961     if (parse_keys (ds_name, sizeof (ds_name), key))
962     {
963         return 1;
964     }
965
966     if(index >= vtmp->d->ds_num)
967     {
968         //don't overflow bounds of array
969         index = (vtmp->d->ds_num - 1);
970     }
971
972     /**
973      * counters should remain in same order we parsed schema... we maintain the
974      * index variable to keep track of current point in list of counters. first
975      * use index to guess point in array for retrieving type. if that doesn't
976      * work, use the old way to get the counter type
977      */
978     if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
979     {
980         //found match
981         type = vtmp->d->ds_types[index];
982     }
983     else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
984     {
985         //try previous key
986         type = vtmp->d->ds_types[index-1];
987     }
988
989     if(type == DSET_TYPE_UNFOUND)
990     {
991         //couldn't find right type by guessing, check the old way
992         type = backup_search_for_type(vtmp->d, ds_name);
993     }
994
995     switch(type)
996     {
997         case DSET_LATENCY:
998             if(vtmp->avgcount_exists == -1)
999             {
1000                 sscanf(val, "%" PRIu64, &vtmp->avgcount);
1001                 vtmp->avgcount_exists = 0;
1002                 //return after saving avgcount - don't dispatch value
1003                 //until latency calculation
1004                 return 0;
1005             }
1006             else
1007             {
1008                 double sum, result;
1009                 sscanf(val, "%lf", &sum);
1010
1011                 if(vtmp->avgcount == 0)
1012                 {
1013                     vtmp->avgcount = 1;
1014                 }
1015
1016                 /** User wants latency values as long run avg */
1017                 if(long_run_latency_avg)
1018                 {
1019                     result = (sum / vtmp->avgcount);
1020                 }
1021                 else
1022                 {
1023                     result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
1024                     if(result == -ENOMEM)
1025                     {
1026                         return -ENOMEM;
1027                     }
1028                 }
1029
1030                 uv.gauge = result;
1031                 vtmp->avgcount_exists = -1;
1032                 vtmp->latency_index = (vtmp->latency_index + 1);
1033             }
1034             break;
1035         case DSET_BYTES:
1036             sscanf(val, "%lf", &tmp_d);
1037             uv.gauge = tmp_d;
1038             break;
1039         case DSET_RATE:
1040             sscanf(val, "%" PRIu64, &tmp_u);
1041             uv.derive = tmp_u;
1042             break;
1043         case DSET_TYPE_UNFOUND:
1044         default:
1045             ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
1046             return -1;
1047     }
1048
1049     sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
1050     sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
1051     vtmp->vlist.values = &uv;
1052     vtmp->vlist.values_len = 1;
1053
1054     vtmp->index = (vtmp->index + 1);
1055     plugin_dispatch_values(&vtmp->vlist);
1056
1057     return 0;
1058 }
1059
1060 static int cconn_connect(struct cconn *io)
1061 {
1062     struct sockaddr_un address;
1063     int flags, fd, err;
1064     if(io->state != CSTATE_UNCONNECTED)
1065     {
1066         ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED");
1067         return -EDOM;
1068     }
1069     fd = socket(PF_UNIX, SOCK_STREAM, 0);
1070     if(fd < 0)
1071     {
1072         int err = -errno;
1073         ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) "
1074             "failed: error %d", err);
1075         return err;
1076     }
1077     memset(&address, 0, sizeof(struct sockaddr_un));
1078     address.sun_family = AF_UNIX;
1079     snprintf(address.sun_path, sizeof(address.sun_path), "%s",
1080             io->d->asok_path);
1081     RETRY_ON_EINTR(err,
1082         connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
1083     if(err < 0)
1084     {
1085         ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d",
1086             fd, err);
1087         close(fd);
1088         return err;
1089     }
1090
1091     flags = fcntl(fd, F_GETFL, 0);
1092     if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
1093     {
1094         err = -errno;
1095         ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d",
1096             fd, err);
1097         close(fd);
1098         return err;
1099     }
1100     io->asok = fd;
1101     io->state = CSTATE_WRITE_REQUEST;
1102     io->amt = 0;
1103     io->json_len = 0;
1104     io->json = NULL;
1105     return 0;
1106 }
1107
1108 static void cconn_close(struct cconn *io)
1109 {
1110     io->state = CSTATE_UNCONNECTED;
1111     if(io->asok != -1)
1112     {
1113         int res;
1114         RETRY_ON_EINTR(res, close(io->asok));
1115     }
1116     io->asok = -1;
1117     io->amt = 0;
1118     io->json_len = 0;
1119     sfree(io->json);
1120     io->json = NULL;
1121 }
1122
1123 /* Process incoming JSON counter data */
1124 static int
1125 cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
1126 {
1127     int ret;
1128     struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
1129     if(!vtmp)
1130     {
1131         return -ENOMEM;
1132     }
1133
1134     vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
1135     sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
1136     sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
1137     sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
1138
1139     vtmp->d = io->d;
1140     vtmp->avgcount_exists = -1;
1141     vtmp->latency_index = 0;
1142     vtmp->index = 0;
1143     yajl->handler_arg = vtmp;
1144     ret = traverse_json(io->json, io->json_len, hand);
1145     sfree(vtmp);
1146     return ret;
1147 }
1148
1149 /**
1150  * Initiate JSON parsing and print error if one occurs
1151  */
1152 static int cconn_process_json(struct cconn *io)
1153 {
1154     if((io->request_type != ASOK_REQ_DATA) &&
1155             (io->request_type != ASOK_REQ_SCHEMA))
1156     {
1157         return -EDOM;
1158     }
1159
1160     int result = 1;
1161     yajl_handle hand;
1162     yajl_status status;
1163
1164     hand = yajl_alloc(&callbacks,
1165 #if HAVE_YAJL_V2
1166       /* alloc funcs = */ NULL,
1167 #else
1168       /* alloc funcs = */ NULL, NULL,
1169 #endif
1170       /* context = */ (void *)(&io->yajl));
1171
1172     if(!hand)
1173     {
1174         ERROR ("ceph plugin: yajl_alloc failed.");
1175         return ENOMEM;
1176     }
1177
1178     io->yajl.depth = 0;
1179
1180     switch(io->request_type)
1181     {
1182         case ASOK_REQ_DATA:
1183             io->yajl.handler = node_handler_fetch_data;
1184             result = cconn_process_data(io, &io->yajl, hand);
1185             break;
1186         case ASOK_REQ_SCHEMA:
1187             //init daemon specific variables
1188             io->d->ds_num = 0;
1189             io->d->last_idx = 0;
1190             io->d->last_poll_data = NULL;
1191             io->yajl.handler = node_handler_define_schema;
1192             io->yajl.handler_arg = io->d;
1193             result = traverse_json(io->json, io->json_len, hand);
1194             break;
1195     }
1196
1197     if(result)
1198     {
1199         goto done;
1200     }
1201
1202 #if HAVE_YAJL_V2
1203     status = yajl_complete_parse(hand);
1204 #else
1205     status = yajl_parse_complete(hand);
1206 #endif
1207
1208     if (status != yajl_status_ok)
1209     {
1210       unsigned char *errmsg = yajl_get_error (hand, /* verbose = */ 0,
1211           /* jsonText = */ NULL, /* jsonTextLen = */ 0);
1212       ERROR ("ceph plugin: yajl_parse_complete failed: %s",
1213           (char *) errmsg);
1214       yajl_free_error (hand, errmsg);
1215       yajl_free (hand);
1216       return 1;
1217     }
1218
1219     done:
1220     yajl_free (hand);
1221     return result;
1222 }
1223
1224 static int cconn_validate_revents(struct cconn *io, int revents)
1225 {
1226     if(revents & POLLERR)
1227     {
1228         ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR",
1229             io->d->name);
1230         return -EIO;
1231     }
1232     switch (io->state)
1233     {
1234         case CSTATE_WRITE_REQUEST:
1235             return (revents & POLLOUT) ? 0 : -EINVAL;
1236         case CSTATE_READ_VERSION:
1237         case CSTATE_READ_AMT:
1238         case CSTATE_READ_JSON:
1239             return (revents & POLLIN) ? 0 : -EINVAL;
1240         default:
1241             ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
1242                 "illegal state on line %d", io->d->name, __LINE__);
1243             return -EDOM;
1244     }
1245 }
1246
1247 /** Handle a network event for a connection */
1248 static int cconn_handle_event(struct cconn *io)
1249 {
1250     int ret;
1251     switch (io->state)
1252     {
1253         case CSTATE_UNCONNECTED:
1254             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1255                 "state on line %d", io->d->name, __LINE__);
1256
1257             return -EDOM;
1258         case CSTATE_WRITE_REQUEST:
1259         {
1260             char cmd[32];
1261             snprintf(cmd, sizeof(cmd), "%s%d%s", "{ \"prefix\": \"",
1262                     io->request_type, "\" }\n");
1263             size_t cmd_len = strlen(cmd);
1264             RETRY_ON_EINTR(ret,
1265                   write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
1266             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
1267                     io->d->name, io->state, io->amt, ret);
1268             if(ret < 0)
1269             {
1270                 return ret;
1271             }
1272             io->amt += ret;
1273             if(io->amt >= cmd_len)
1274             {
1275                 io->amt = 0;
1276                 switch (io->request_type)
1277                 {
1278                     case ASOK_REQ_VERSION:
1279                         io->state = CSTATE_READ_VERSION;
1280                         break;
1281                     default:
1282                         io->state = CSTATE_READ_AMT;
1283                         break;
1284                 }
1285             }
1286             return 0;
1287         }
1288         case CSTATE_READ_VERSION:
1289         {
1290             RETRY_ON_EINTR(ret,
1291                     read(io->asok, ((char*)(&io->d->version)) + io->amt,
1292                             sizeof(io->d->version) - io->amt));
1293             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1294                     io->d->name, io->state, ret);
1295             if(ret < 0)
1296             {
1297                 return ret;
1298             }
1299             io->amt += ret;
1300             if(io->amt >= sizeof(io->d->version))
1301             {
1302                 io->d->version = ntohl(io->d->version);
1303                 if(io->d->version != 1)
1304                 {
1305                     ERROR("ceph plugin: cconn_handle_event(name=%s) not "
1306                         "expecting version %d!", io->d->name, io->d->version);
1307                     return -ENOTSUP;
1308                 }
1309                 DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
1310                         "version %d", io->d->name, io->d->version);
1311                 io->amt = 0;
1312                 cconn_close(io);
1313                 io->request_type = ASOK_REQ_SCHEMA;
1314             }
1315             return 0;
1316         }
1317         case CSTATE_READ_AMT:
1318         {
1319             RETRY_ON_EINTR(ret,
1320                     read(io->asok, ((char*)(&io->json_len)) + io->amt,
1321                             sizeof(io->json_len) - io->amt));
1322             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1323                     io->d->name, io->state, ret);
1324             if(ret < 0)
1325             {
1326                 return ret;
1327             }
1328             io->amt += ret;
1329             if(io->amt >= sizeof(io->json_len))
1330             {
1331                 io->json_len = ntohl(io->json_len);
1332                 io->amt = 0;
1333                 io->state = CSTATE_READ_JSON;
1334                 io->json = calloc(1, io->json_len + 1);
1335                 if(!io->json)
1336                 {
1337                     ERROR("ceph plugin: error callocing io->json");
1338                     return -ENOMEM;
1339                 }
1340             }
1341             return 0;
1342         }
1343         case CSTATE_READ_JSON:
1344         {
1345             RETRY_ON_EINTR(ret,
1346                    read(io->asok, io->json + io->amt, io->json_len - io->amt));
1347             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1348                     io->d->name, io->state, ret);
1349             if(ret < 0)
1350             {
1351                 return ret;
1352             }
1353             io->amt += ret;
1354             if(io->amt >= io->json_len)
1355             {
1356                 ret = cconn_process_json(io);
1357                 if(ret)
1358                 {
1359                     return ret;
1360                 }
1361                 cconn_close(io);
1362                 io->request_type = ASOK_REQ_NONE;
1363             }
1364             return 0;
1365         }
1366         default:
1367             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1368                 "state on line %d", io->d->name, __LINE__);
1369             return -EDOM;
1370     }
1371 }
1372
1373 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1374 {
1375     int ret;
1376     if(io->request_type == ASOK_REQ_NONE)
1377     {
1378         /* The request has already been serviced. */
1379         return 0;
1380     }
1381     else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
1382     {
1383         /* If there are no counters to report on, don't bother
1384          * connecting */
1385         return 0;
1386     }
1387
1388     switch (io->state)
1389     {
1390         case CSTATE_UNCONNECTED:
1391             ret = cconn_connect(io);
1392             if(ret > 0)
1393             {
1394                 return -ret;
1395             }
1396             else if(ret < 0)
1397             {
1398                 return ret;
1399             }
1400             fds->fd = io->asok;
1401             fds->events = POLLOUT;
1402             return 1;
1403         case CSTATE_WRITE_REQUEST:
1404             fds->fd = io->asok;
1405             fds->events = POLLOUT;
1406             return 1;
1407         case CSTATE_READ_VERSION:
1408         case CSTATE_READ_AMT:
1409         case CSTATE_READ_JSON:
1410             fds->fd = io->asok;
1411             fds->events = POLLIN;
1412             return 1;
1413         default:
1414             ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state "
1415                 "on line %d", io->d->name, __LINE__);
1416             return -EDOM;
1417     }
1418 }
1419
1420 /** Returns the difference between two struct timevals in milliseconds.
1421  * On overflow, we return max/min int.
1422  */
1423 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1424 {
1425     int64_t ret;
1426     int sec_diff = t1->tv_sec - t2->tv_sec;
1427     int usec_diff = t1->tv_usec - t2->tv_usec;
1428     ret = usec_diff / 1000;
1429     ret += (sec_diff * 1000);
1430     return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret);
1431 }
1432
1433 /** This handles the actual network I/O to talk to the Ceph daemons.
1434  */
1435 static int cconn_main_loop(uint32_t request_type)
1436 {
1437     int i, ret, some_unreachable = 0;
1438     struct timeval end_tv;
1439     struct cconn io_array[g_num_daemons];
1440
1441     DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
1442
1443     /* create cconn array */
1444     memset(io_array, 0, sizeof(io_array));
1445     for(i = 0; i < g_num_daemons; ++i)
1446     {
1447         io_array[i].d = g_daemons[i];
1448         io_array[i].request_type = request_type;
1449         io_array[i].state = CSTATE_UNCONNECTED;
1450     }
1451
1452     /** Calculate the time at which we should give up */
1453     gettimeofday(&end_tv, NULL);
1454     end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1455
1456     while (1)
1457     {
1458         int nfds, diff;
1459         struct timeval tv;
1460         struct cconn *polled_io_array[g_num_daemons];
1461         struct pollfd fds[g_num_daemons];
1462         memset(fds, 0, sizeof(fds));
1463         nfds = 0;
1464         for(i = 0; i < g_num_daemons; ++i)
1465         {
1466             struct cconn *io = io_array + i;
1467             ret = cconn_prepare(io, fds + nfds);
1468             if(ret < 0)
1469             {
1470                 WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1471                         io->d->name, i, io->state, ret);
1472                 cconn_close(io);
1473                 io->request_type = ASOK_REQ_NONE;
1474                 some_unreachable = 1;
1475             }
1476             else if(ret == 1)
1477             {
1478                 polled_io_array[nfds++] = io_array + i;
1479             }
1480         }
1481         if(nfds == 0)
1482         {
1483             /* finished */
1484             ret = 0;
1485             goto done;
1486         }
1487         gettimeofday(&tv, NULL);
1488         diff = milli_diff(&end_tv, &tv);
1489         if(diff <= 0)
1490         {
1491             /* Timed out */
1492             ret = -ETIMEDOUT;
1493             WARNING("ceph plugin: cconn_main_loop: timed out.");
1494             goto done;
1495         }
1496         RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1497         if(ret < 0)
1498         {
1499             ERROR("ceph plugin: poll(2) error: %d", ret);
1500             goto done;
1501         }
1502         for(i = 0; i < nfds; ++i)
1503         {
1504             struct cconn *io = polled_io_array[i];
1505             int revents = fds[i].revents;
1506             if(revents == 0)
1507             {
1508                 /* do nothing */
1509             }
1510             else if(cconn_validate_revents(io, revents))
1511             {
1512                 WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): "
1513                 "revents validation error: "
1514                 "revents=0x%08x", io->d->name, i, io->state, revents);
1515                 cconn_close(io);
1516                 io->request_type = ASOK_REQ_NONE;
1517                 some_unreachable = 1;
1518             }
1519             else
1520             {
1521                 int ret = cconn_handle_event(io);
1522                 if(ret)
1523                 {
1524                     WARNING("ceph plugin: cconn_handle_event(name=%s,"
1525                     "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1526                     cconn_close(io);
1527                     io->request_type = ASOK_REQ_NONE;
1528                     some_unreachable = 1;
1529                 }
1530             }
1531         }
1532     }
1533     done: for(i = 0; i < g_num_daemons; ++i)
1534     {
1535         cconn_close(io_array + i);
1536     }
1537     if(some_unreachable)
1538     {
1539         DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
1540     }
1541     else
1542     {
1543         DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
1544     }
1545     return ret;
1546 }
1547
1548 static int ceph_read(void)
1549 {
1550     return cconn_main_loop(ASOK_REQ_DATA);
1551 }
1552
1553 /******* lifecycle *******/
1554 static int ceph_init(void)
1555 {
1556     int ret;
1557     ceph_daemons_print();
1558
1559     ret = cconn_main_loop(ASOK_REQ_VERSION);
1560
1561     return (ret) ? ret : 0;
1562 }
1563
1564 static int ceph_shutdown(void)
1565 {
1566     int i;
1567     for(i = 0; i < g_num_daemons; ++i)
1568     {
1569         ceph_daemon_free(g_daemons[i]);
1570     }
1571     sfree(g_daemons);
1572     g_daemons = NULL;
1573     g_num_daemons = 0;
1574     DEBUG("ceph plugin: finished ceph_shutdown");
1575     return 0;
1576 }
1577
1578 void module_register(void)
1579 {
1580     plugin_register_complex_config("ceph", ceph_config);
1581     plugin_register_init("ceph", ceph_init);
1582     plugin_register_read("ceph", ceph_read);
1583     plugin_register_shutdown("ceph", ceph_shutdown);
1584 }