multiple memcached instances, v2
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c, based on src/hddtemp.c
3  * Copyright (C) 2007       Antony Dovgal
4  * Copyright (C) 2007-2010  Florian Forster
5  * Copyright (C) 2009       Doug MacEachern
6  * Copyright (C) 2009       Franck Lombardi
7  * Copyright (C) 2012       Nicolas Szalay
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
22  *
23  * Authors:
24  *   Antony Dovgal <tony at daylessday dot org>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Doug MacEachern <dougm at hyperic.com>
27  *   Franck Lombardi
28  *   Nicolas Szalay
29  **/
30
31 #include "collectd.h"
32 #include "common.h"
33 #include "plugin.h"
34 #include "configfile.h"
35
36 # include <poll.h>
37 # include <netdb.h>
38 # include <sys/socket.h>
39 # include <sys/un.h>
40 # include <netinet/in.h>
41 # include <netinet/tcp.h>
42
43 /* Hack to work around the missing define in AIX */
44 #ifndef MSG_DONTWAIT
45 # define MSG_DONTWAIT MSG_NONBLOCK
46 #endif
47
48 #define MEMCACHED_DEF_HOST "127.0.0.1"
49 #define MEMCACHED_DEF_PORT "11211"
50
51 #define MEMCACHED_RETRY_COUNT 100
52
53 struct memcached_s
54 {
55   char *name;
56   char *socket;
57   char *host;
58   char *port;
59 };
60
61 typedef struct memcached_s memcached_t;
62
63 static int memcached_read (user_data_t *user_data);
64
65 static void memcached_free (memcached_t *st)
66 {
67   if (st == NULL)
68     return;
69
70   sfree (st->name);
71   sfree (st->socket);
72   sfree (st->host);
73   sfree (st->port);
74 }
75
76 static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data)
77 {
78   int fd=-1;
79   ssize_t status;
80   int buffer_fill;
81   int i = 0;
82
83   memcached_t *st;
84   st = user_data->data;
85   if (st->socket != NULL) {
86     struct sockaddr_un serv_addr;
87
88      memset (&serv_addr, 0, sizeof (serv_addr));
89      serv_addr.sun_family = AF_UNIX;
90      sstrncpy (serv_addr.sun_path, st->socket,
91      sizeof (serv_addr.sun_path));
92
93      /* create our socket descriptor */
94      fd = socket (AF_UNIX, SOCK_STREAM, 0);
95      if (fd < 0) {
96        char errbuf[1024];
97        ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
98        sizeof (errbuf)));
99        return -1;
100      }
101   }
102   else {
103     if (st->port != NULL) {
104       const char *host;
105       const char *port;
106
107       struct addrinfo  ai_hints;
108       struct addrinfo *ai_list, *ai_ptr;
109       int              ai_return = 0;
110
111       memset (&ai_hints, '\0', sizeof (ai_hints));
112       ai_hints.ai_flags    = 0;
113 #ifdef AI_ADDRCONFIG
114     /*  ai_hints.ai_flags   |= AI_ADDRCONFIG; */
115 #endif
116       ai_hints.ai_family   = AF_INET;
117       ai_hints.ai_socktype = SOCK_STREAM;
118       ai_hints.ai_protocol = 0;
119
120       host = st->host;
121       if (host == NULL) {
122         host = MEMCACHED_DEF_HOST;
123       }
124
125       port = st->port;
126       if (strlen (port) == 0) {
127         port = MEMCACHED_DEF_PORT;
128       }
129
130       if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
131         char errbuf[1024];
132         ERROR ("memcached: getaddrinfo (%s, %s): %s",
133           host, port,
134           (ai_return == EAI_SYSTEM)
135           ? sstrerror (errno, errbuf, sizeof (errbuf))
136           : gai_strerror (ai_return));
137         return -1;
138       }
139
140       fd = -1;
141       for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
142         /* create our socket descriptor */
143         fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
144         if (fd < 0) {
145           char errbuf[1024];
146           ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
147           continue;
148         }
149
150         /* connect to the memcached daemon */
151         status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen);
152         if (status != 0) {
153           shutdown (fd, SHUT_RDWR);
154           close (fd);
155           fd = -1;
156           continue;
157         }
158
159         /* A socket could be opened and connecting succeeded. We're
160          * done. */
161         break;
162       }
163
164       freeaddrinfo (ai_list);
165     }
166   }
167
168   if (fd < 0) {
169     ERROR ("memcached: Could not connect to daemon.");
170     return -1;
171   }
172
173   if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
174     ERROR ("memcached: Could not send command to the memcached daemon.");
175     return -1;
176   }
177
178   {
179     struct pollfd p;
180     int status;
181
182     memset (&p, 0, sizeof (p));
183     p.fd = fd;
184     p.events = POLLIN | POLLERR | POLLHUP;
185     p.revents = 0;
186
187     status = poll (&p, /* nfds = */ 1,
188         /* timeout = */ CDTIME_T_TO_MS (interval_g));
189     if (status <= 0)
190     {
191       if (status == 0)
192       {
193         ERROR ("memcached: poll(2) timed out after %.3f seconds.",
194             CDTIME_T_TO_DOUBLE (interval_g));
195       }
196       else
197       {
198         char errbuf[1024];
199         ERROR ("memcached: poll(2) failed: %s",
200             sstrerror (errno, errbuf, sizeof (errbuf)));
201       }
202       shutdown (fd, SHUT_RDWR);
203       close (fd);
204       return (-1);
205     }
206   }
207
208   /* receive data from the memcached daemon */
209   memset (buffer, '\0', buffer_size);
210
211   buffer_fill = 0;
212   while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
213     if (i > MEMCACHED_RETRY_COUNT) {
214       ERROR("recv() timed out");
215       break;
216     }
217     i++;
218
219     if (status == -1) {
220       char errbuf[1024];
221
222       if (errno == EAGAIN) {
223         continue;
224       }
225
226       ERROR ("memcached: Error reading from socket: %s",
227           sstrerror (errno, errbuf, sizeof (errbuf)));
228       shutdown(fd, SHUT_RDWR);
229       close (fd);
230       return -1;
231     }
232     buffer_fill += status;
233
234     if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
235       /* we got all the data */
236       break;
237     }
238   }
239
240   if (buffer_fill >= buffer_size) {
241     buffer[buffer_size - 1] = '\0';
242     WARNING ("memcached: Message from memcached has been truncated.");
243   } else if (buffer_fill == 0) {
244     WARNING ("memcached: Peer has unexpectedly shut down the socket. "
245         "Buffer: `%s'", buffer);
246     shutdown(fd, SHUT_RDWR);
247     close(fd);
248     return -1;
249   }
250
251   shutdown(fd, SHUT_RDWR);
252   close(fd);
253   return 0;
254 }
255
256 /* Configuration handling functiions
257  * <Plugin memcached>
258  *   <Instance "instance_name">
259  *     Host foo.zomg.com
260  *     Port "1234"
261  *   </Instance>
262  * </Plugin>
263  */
264 static int config_set_string (char **ret_string, oconfig_item_t *ci)
265 {
266   char *string;
267
268   if ((ci->values_num != 1)
269       || (ci->values[0].type != OCONFIG_TYPE_STRING))
270   {
271     WARNING ("memcached plugin: The `%s' config option "
272         "needs exactly one string argument.", ci->key);
273     return (-1);
274   }
275
276   string = strdup (ci->values[0].value.string);
277   if (string == NULL)
278   {
279     ERROR ("memcached plugin: strdup failed.");
280     return (-1);
281   }
282
283   if (*ret_string != NULL)
284     free (*ret_string);
285   *ret_string = string;
286
287   return (0);
288 }
289
290 static int config_add_instance(oconfig_item_t *ci)
291 {
292   memcached_t *st;
293   int i;
294   int status;
295
296   if ((ci->values_num != 1)
297     || (ci->values[0].type != OCONFIG_TYPE_STRING))
298   {
299     WARNING ("memcached plugin: The `%s' config option "
300       "needs exactly one string argument.", ci->key);
301     return (-1);
302   }
303
304   st = (memcached_t *) malloc (sizeof (*st));
305   if (st == NULL)
306   {
307     ERROR ("memcached plugin: malloc failed.");
308     return (-1);
309   }
310
311   st->name = NULL;
312   st->socket = NULL;
313   st->host = NULL;
314   st->port = NULL;
315   memset (st, 0, sizeof (*st));
316
317   status = config_set_string (&st->name, ci);
318   if (status != 0)
319   {
320     sfree (st);
321     return (status);
322   }
323   assert (st->name != NULL);
324
325   for (i = 0; i < ci->children_num; i++)
326   {
327     oconfig_item_t *child = ci->children + i;
328
329     if (strcasecmp ("Socket", child->key) == 0)
330       status = config_set_string (&st->socket, child);
331     else if (strcasecmp ("Host", child->key) == 0)
332       status = config_set_string (&st->host, child);
333     else if (strcasecmp ("Port", child->key) == 0)
334       status = config_set_string (&st->port, child);
335     else
336     {
337       WARNING ("memcached plugin: Option `%s' not allowed here.",
338           child->key);
339       status = -1;
340     }
341
342     if (status != 0)
343       break;
344   }
345
346   if (status == 0)
347   {
348     user_data_t ud;
349     char callback_name[3*DATA_MAX_NAME_LEN];
350
351     memset (&ud, 0, sizeof (ud));
352     ud.data = st;
353     ud.free_func = (void *) memcached_free;
354
355     memset (callback_name, 0, sizeof (callback_name));
356     ssnprintf (callback_name, sizeof (callback_name),
357         "memcached/%s/%s",
358         (st->host != NULL) ? st->host : hostname_g,
359         (st->port != NULL) ? st->port : "default"),
360
361     status = plugin_register_complex_read (/* group = */ NULL,
362         /* name      = */ callback_name,
363         /* callback  = */ memcached_read,
364         /* interval  = */ NULL,
365         /* user_data = */ &ud);
366   }
367
368   if (status != 0)
369   {
370     memcached_free(st);
371     return (-1);
372   }
373
374   return (0);
375 }
376
377 static int config (oconfig_item_t *ci)
378 {
379   int status = 0;
380   int i;
381
382   for (i = 0; i < ci->children_num; i++)
383   {
384     oconfig_item_t *child = ci->children + i;
385
386     if (strcasecmp ("Instance", child->key) == 0)
387       config_add_instance (child);
388     else
389       WARNING ("memcached plugin: The configuration option "
390           "\"%s\" is not allowed here. Did you "
391           "forget to add an <Instance /> block "
392           "around the configuration?",
393           child->key);
394   } /* for (ci->children) */
395
396   return (status);
397 }
398
399 static void submit_derive (const char *type, const char *type_inst,
400     derive_t value, memcached_t *st)
401 {
402   value_t values[1];
403   value_list_t vl = VALUE_LIST_INIT;
404
405   values[0].derive = value;
406
407   vl.values = values;
408   vl.values_len = 1;
409   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
410   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
411   if (st->name != NULL)
412     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
413   sstrncpy (vl.type, type, sizeof (vl.type));
414   if (type_inst != NULL)
415     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
416
417   plugin_dispatch_values (&vl);
418 }
419
420 static void submit_derive2 (const char *type, const char *type_inst,
421     derive_t value0, derive_t value1, memcached_t *st)
422 {
423   value_t values[2];
424   value_list_t vl = VALUE_LIST_INIT;
425
426   values[0].derive = value0;
427   values[1].derive = value1;
428
429   vl.values = values;
430   vl.values_len = 2;
431   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
432   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
433   if (st->name != NULL)
434     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
435   sstrncpy (vl.type, type, sizeof (vl.type));
436   if (type_inst != NULL)
437     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
438
439   plugin_dispatch_values (&vl);
440 }
441
442 static void submit_gauge (const char *type, const char *type_inst,
443     gauge_t value, memcached_t *st)
444 {
445   value_t values[1];
446   value_list_t vl = VALUE_LIST_INIT;
447
448   values[0].gauge = value;
449
450   vl.values = values;
451   vl.values_len = 1;
452   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
453   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
454   if (st->name != NULL)
455     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
456   sstrncpy (vl.type, type, sizeof (vl.type));
457   if (type_inst != NULL)
458     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
459
460   plugin_dispatch_values (&vl);
461 }
462
463 static void submit_gauge2 (const char *type, const char *type_inst,
464     gauge_t value0, gauge_t value1, memcached_t *st)
465 {
466   value_t values[2];
467   value_list_t vl = VALUE_LIST_INIT;
468
469   values[0].gauge = value0;
470   values[1].gauge = value1;
471
472   vl.values = values;
473   vl.values_len = 2;
474   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
475   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
476   if (st->name != NULL)
477     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
478   sstrncpy (vl.type, type, sizeof (vl.type));
479   if (type_inst != NULL)
480     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
481
482   plugin_dispatch_values (&vl);
483 }
484
485 static int memcached_read (user_data_t *user_data)
486 {
487   char buf[4096];
488   char *fields[3];
489   char *ptr;
490   char *line;
491   char *saveptr;
492   int fields_num;
493
494   gauge_t bytes_used = NAN;
495   gauge_t bytes_total = NAN;
496   gauge_t hits = NAN;
497   gauge_t gets = NAN;
498   derive_t rusage_user = 0;
499   derive_t rusage_syst = 0;
500   derive_t octets_rx = 0;
501   derive_t octets_tx = 0;
502
503   memcached_t *st;
504   st = user_data->data;
505
506   /* get data from daemon */
507   if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) {
508     return -1;
509   }
510
511 #define FIELD_IS(cnst) \
512   (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
513
514   ptr = buf;
515   saveptr = NULL;
516   while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
517   {
518     int name_len;
519
520     ptr = NULL;
521
522     fields_num = strsplit(line, fields, 3);
523     if (fields_num != 3)
524       continue;
525
526     name_len = strlen(fields[1]);
527     if (name_len == 0)
528       continue;
529
530     /*
531      * For an explanation on these fields please refer to
532      * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
533      */
534
535     /*
536      * CPU time consumed by the memcached process
537      */
538     if (FIELD_IS ("rusage_user"))
539     {
540       rusage_user = atoll (fields[2]);
541     }
542     else if (FIELD_IS ("rusage_system"))
543     {
544       rusage_syst = atoll(fields[2]);
545     }
546
547     /*
548      * Number of threads of this instance
549      */
550     else if (FIELD_IS ("threads"))
551     {
552       submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
553     }
554
555     /*
556      * Number of items stored
557      */
558     else if (FIELD_IS ("curr_items"))
559     {
560       submit_gauge ("memcached_items", "current", atof (fields[2]), st);
561     }
562
563     /*
564      * Number of bytes used and available (total - used)
565      */
566     else if (FIELD_IS ("bytes"))
567     {
568       bytes_used = atof (fields[2]);
569     }
570     else if (FIELD_IS ("limit_maxbytes"))
571     {
572       bytes_total = atof(fields[2]);
573     }
574
575     /*
576      * Connections
577      */
578     else if (FIELD_IS ("curr_connections"))
579     {
580       submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
581     }
582
583     /*
584      * Commands
585      */
586     else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
587     {
588       const char *name = fields[1] + 4;
589       submit_derive ("memcached_command", name, atoll (fields[2]), st);
590       if (strcmp (name, "get") == 0)
591         gets = atof (fields[2]);
592     }
593
594     /*
595      * Operations on the cache, i. e. cache hits, cache misses and evictions of items
596      */
597     else if (FIELD_IS ("get_hits"))
598     {
599       submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
600       hits = atof (fields[2]);
601     }
602     else if (FIELD_IS ("get_misses"))
603     {
604       submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
605     }
606     else if (FIELD_IS ("evictions"))
607     {
608       submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
609     }
610
611     /*
612      * Network traffic
613      */
614     else if (FIELD_IS ("bytes_read"))
615     {
616       octets_rx = atoll (fields[2]);
617     }
618     else if (FIELD_IS ("bytes_written"))
619     {
620       octets_tx = atoll (fields[2]);
621     }
622   } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
623
624   if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
625     submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
626
627   if ((rusage_user != 0) || (rusage_syst != 0))
628     submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
629
630   if ((octets_rx != 0) || (octets_tx != 0))
631     submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
632
633   if (!isnan (gets) && !isnan (hits))
634   {
635     gauge_t rate = NAN;
636
637     if (gets != 0.0)
638       rate = 100.0 * hits / gets;
639
640     submit_gauge ("percent", "hitratio", rate, st);
641   }
642
643   return 0;
644 }
645
646 void module_register (void)
647 {
648   plugin_register_complex_config ("memcached", config);
649 }