memcached plugin: Fix backwards compatibility.
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c, based on src/hddtemp.c
3  * Copyright (C) 2007       Antony Dovgal
4  * Copyright (C) 2007-2010  Florian Forster
5  * Copyright (C) 2009       Doug MacEachern
6  * Copyright (C) 2009       Franck Lombardi
7  * Copyright (C) 2012       Nicolas Szalay
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
22  *
23  * Authors:
24  *   Antony Dovgal <tony at daylessday dot org>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Doug MacEachern <dougm at hyperic.com>
27  *   Franck Lombardi
28  *   Nicolas Szalay
29  **/
30
31 #include "collectd.h"
32 #include "common.h"
33 #include "plugin.h"
34 #include "configfile.h"
35
36 # include <poll.h>
37 # include <netdb.h>
38 # include <sys/socket.h>
39 # include <sys/un.h>
40 # include <netinet/in.h>
41 # include <netinet/tcp.h>
42
43 /* Hack to work around the missing define in AIX */
44 #ifndef MSG_DONTWAIT
45 # define MSG_DONTWAIT MSG_NONBLOCK
46 #endif
47
48 #define MEMCACHED_DEF_HOST "127.0.0.1"
49 #define MEMCACHED_DEF_PORT "11211"
50
51 #define MEMCACHED_RETRY_COUNT 100
52
53 struct memcached_s
54 {
55   char *name;
56   char *socket;
57   char *host;
58   char *port;
59 };
60 typedef struct memcached_s memcached_t;
61
62 static _Bool memcached_have_instances = 0;
63
64 static int memcached_read (user_data_t *user_data);
65
66 static void memcached_free (memcached_t *st)
67 {
68   if (st == NULL)
69     return;
70
71   sfree (st->name);
72   sfree (st->socket);
73   sfree (st->host);
74   sfree (st->port);
75 }
76
77 static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data)
78 {
79   int fd = -1;
80   ssize_t status;
81   int buffer_fill;
82   int i = 0;
83
84   memcached_t *st;
85   st = user_data->data;
86   if (st->socket != NULL)
87   {
88     struct sockaddr_un serv_addr;
89
90      memset (&serv_addr, 0, sizeof (serv_addr));
91      serv_addr.sun_family = AF_UNIX;
92      sstrncpy (serv_addr.sun_path, st->socket,
93      sizeof (serv_addr.sun_path));
94
95      /* create our socket descriptor */
96      fd = socket (AF_UNIX, SOCK_STREAM, 0);
97      if (fd < 0) {
98        char errbuf[1024];
99        ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
100        sizeof (errbuf)));
101        return -1;
102      }
103   }
104   else
105   {
106     const char *host;
107     const char *port;
108
109     struct addrinfo  ai_hints;
110     struct addrinfo *ai_list, *ai_ptr;
111     int              ai_return = 0;
112
113     memset (&ai_hints, '\0', sizeof (ai_hints));
114     ai_hints.ai_flags    = 0;
115 #ifdef AI_ADDRCONFIG
116     ai_hints.ai_flags   |= AI_ADDRCONFIG;
117 #endif
118     ai_hints.ai_family   = AF_UNSPEC;
119     ai_hints.ai_socktype = SOCK_STREAM;
120     ai_hints.ai_protocol = 0;
121
122     host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
123     port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
124
125     if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
126       char errbuf[1024];
127       ERROR ("memcached: getaddrinfo (%s, %s): %s",
128           host, port,
129           (ai_return == EAI_SYSTEM)
130           ? sstrerror (errno, errbuf, sizeof (errbuf))
131           : gai_strerror (ai_return));
132       return -1;
133     }
134
135     for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
136       /* create our socket descriptor */
137       fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
138       if (fd < 0) {
139         char errbuf[1024];
140         ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
141         continue;
142       }
143
144       /* connect to the memcached daemon */
145       status = (ssize_t) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
146       if (status != 0) {
147         shutdown (fd, SHUT_RDWR);
148         close (fd);
149         fd = -1;
150         continue;
151       }
152
153       /* A socket could be opened and connecting succeeded. We're
154        * done. */
155       break;
156     }
157
158     freeaddrinfo (ai_list);
159   }
160
161   if (fd < 0) {
162     ERROR ("memcached: Could not connect to daemon.");
163     return -1;
164   }
165
166   if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
167     ERROR ("memcached: Could not send command to the memcached daemon.");
168     return -1;
169   }
170
171   {
172     struct pollfd p;
173     int status;
174
175     memset (&p, 0, sizeof (p));
176     p.fd = fd;
177     p.events = POLLIN | POLLERR | POLLHUP;
178     p.revents = 0;
179
180     status = poll (&p, /* nfds = */ 1,
181         /* timeout = */ CDTIME_T_TO_MS (interval_g));
182     if (status <= 0)
183     {
184       if (status == 0)
185       {
186         ERROR ("memcached: poll(2) timed out after %.3f seconds.",
187             CDTIME_T_TO_DOUBLE (interval_g));
188       }
189       else
190       {
191         char errbuf[1024];
192         ERROR ("memcached: poll(2) failed: %s",
193             sstrerror (errno, errbuf, sizeof (errbuf)));
194       }
195       shutdown (fd, SHUT_RDWR);
196       close (fd);
197       return (-1);
198     }
199   }
200
201   /* receive data from the memcached daemon */
202   memset (buffer, '\0', buffer_size);
203
204   buffer_fill = 0;
205   while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
206     if (i > MEMCACHED_RETRY_COUNT) {
207       ERROR("recv() timed out");
208       break;
209     }
210     i++;
211
212     if (status == -1) {
213       char errbuf[1024];
214
215       if (errno == EAGAIN) {
216         continue;
217       }
218
219       ERROR ("memcached: Error reading from socket: %s",
220           sstrerror (errno, errbuf, sizeof (errbuf)));
221       shutdown(fd, SHUT_RDWR);
222       close (fd);
223       return -1;
224     }
225     buffer_fill += status;
226
227     if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
228       /* we got all the data */
229       break;
230     }
231   }
232
233   if (buffer_fill >= buffer_size) {
234     buffer[buffer_size - 1] = '\0';
235     WARNING ("memcached: Message from memcached has been truncated.");
236   } else if (buffer_fill == 0) {
237     WARNING ("memcached: Peer has unexpectedly shut down the socket. "
238         "Buffer: `%s'", buffer);
239     shutdown(fd, SHUT_RDWR);
240     close(fd);
241     return -1;
242   }
243
244   shutdown(fd, SHUT_RDWR);
245   close(fd);
246   return 0;
247 }
248
249 static int memcached_add_read_callback (memcached_t *st)
250 {
251   user_data_t ud;
252   char callback_name[3*DATA_MAX_NAME_LEN];
253   int status;
254
255   memset (&ud, 0, sizeof (ud));
256   ud.data = st;
257   ud.free_func = (void *) memcached_free;
258
259   assert (st->name != NULL);
260   ssnprintf (callback_name, sizeof (callback_name), "memcached/%s", st->name);
261
262   status = plugin_register_complex_read (/* group = */ "memcached",
263       /* name      = */ callback_name,
264       /* callback  = */ memcached_read,
265       /* interval  = */ NULL,
266       /* user_data = */ &ud);
267   return (status);
268 } /* int memcached_add_read_callback */
269
270 /* Configuration handling functiions
271  * <Plugin memcached>
272  *   <Instance "instance_name">
273  *     Host foo.zomg.com
274  *     Port "1234"
275  *   </Instance>
276  * </Plugin>
277  */
278 static int config_add_instance(oconfig_item_t *ci)
279 {
280   memcached_t *st;
281   int i;
282   int status = 0;
283
284   /* Disable automatic generation of default instance in the init callback. */
285   memcached_have_instances = 1;
286
287   st = malloc (sizeof (*st));
288   if (st == NULL)
289   {
290     ERROR ("memcached plugin: malloc failed.");
291     return (-1);
292   }
293
294   memset (st, 0, sizeof (*st));
295   st->name = NULL;
296   st->socket = NULL;
297   st->host = NULL;
298   st->port = NULL;
299
300   if (strcasecmp (ci->key, "Plugin") == 0) /* default instance */
301     st->name = sstrdup ("default");
302   else /* <Instance /> block */
303     status = cf_util_get_string (ci, &st->name);
304   if (status != 0)
305   {
306     sfree (st);
307     return (status);
308   }
309   assert (st->name != NULL);
310
311   for (i = 0; i < ci->children_num; i++)
312   {
313     oconfig_item_t *child = ci->children + i;
314
315     if (strcasecmp ("Socket", child->key) == 0)
316       status = cf_util_get_string (child, &st->socket);
317     else if (strcasecmp ("Host", child->key) == 0)
318       status = cf_util_get_string (child, &st->host);
319     else if (strcasecmp ("Port", child->key) == 0)
320       status = cf_util_get_service (child, &st->port);
321     else
322     {
323       WARNING ("memcached plugin: Option `%s' not allowed here.",
324           child->key);
325       status = -1;
326     }
327
328     if (status != 0)
329       break;
330   }
331
332   if (status == 0)
333     status = memcached_add_read_callback (st);
334
335   if (status != 0)
336   {
337     memcached_free(st);
338     return (-1);
339   }
340
341   return (0);
342 }
343
344 static int memcached_config (oconfig_item_t *ci)
345 {
346   int status = 0;
347   _Bool have_instance_block = 0;
348   int i;
349
350   for (i = 0; i < ci->children_num; i++)
351   {
352     oconfig_item_t *child = ci->children + i;
353
354     if (strcasecmp ("Instance", child->key) == 0)
355     {
356       config_add_instance (child);
357       have_instance_block = 1;
358     }
359     else if (!have_instance_block)
360     {
361       /* Non-instance option: Assume legacy configuration (without <Instance />
362        * blocks) and call config_add_instance() with the <Plugin /> block. */
363       return (config_add_instance (ci));
364     }
365     else
366       WARNING ("memcached plugin: The configuration option "
367           "\"%s\" is not allowed here. Did you "
368           "forget to add an <Instance /> block "
369           "around the configuration?",
370           child->key);
371   } /* for (ci->children) */
372
373   return (status);
374 }
375
376 static int memcached_init (void)
377 {
378   memcached_t *st;
379   int status;
380
381   if (memcached_have_instances)
382     return (0);
383
384   /* No instances were configured, lets start a default instance. */
385   st = malloc (sizeof (*st));
386   if (st == NULL)
387     return (ENOMEM);
388   memset (st, 0, sizeof (*st));
389   st->name = sstrdup ("default");
390   st->socket = NULL;
391   st->host = NULL;
392   st->port = NULL;
393
394   status = memcached_add_read_callback (st);
395   if (status == 0)
396     memcached_have_instances = 1;
397   else
398     memcached_free (st);
399
400   return (status);
401 } /* int memcached_init */
402
403 static void submit_derive (const char *type, const char *type_inst,
404     derive_t value, memcached_t *st)
405 {
406   value_t values[1];
407   value_list_t vl = VALUE_LIST_INIT;
408
409   values[0].derive = value;
410
411   vl.values = values;
412   vl.values_len = 1;
413   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
414   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
415   if (st->name != NULL)
416     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
417   sstrncpy (vl.type, type, sizeof (vl.type));
418   if (type_inst != NULL)
419     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
420
421   plugin_dispatch_values (&vl);
422 }
423
424 static void submit_derive2 (const char *type, const char *type_inst,
425     derive_t value0, derive_t value1, memcached_t *st)
426 {
427   value_t values[2];
428   value_list_t vl = VALUE_LIST_INIT;
429
430   values[0].derive = value0;
431   values[1].derive = value1;
432
433   vl.values = values;
434   vl.values_len = 2;
435   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
436   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
437   if (st->name != NULL)
438     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
439   sstrncpy (vl.type, type, sizeof (vl.type));
440   if (type_inst != NULL)
441     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
442
443   plugin_dispatch_values (&vl);
444 }
445
446 static void submit_gauge (const char *type, const char *type_inst,
447     gauge_t value, memcached_t *st)
448 {
449   value_t values[1];
450   value_list_t vl = VALUE_LIST_INIT;
451
452   values[0].gauge = value;
453
454   vl.values = values;
455   vl.values_len = 1;
456   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
457   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
458   if (st->name != NULL)
459     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
460   sstrncpy (vl.type, type, sizeof (vl.type));
461   if (type_inst != NULL)
462     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
463
464   plugin_dispatch_values (&vl);
465 }
466
467 static void submit_gauge2 (const char *type, const char *type_inst,
468     gauge_t value0, gauge_t value1, memcached_t *st)
469 {
470   value_t values[2];
471   value_list_t vl = VALUE_LIST_INIT;
472
473   values[0].gauge = value0;
474   values[1].gauge = value1;
475
476   vl.values = values;
477   vl.values_len = 2;
478   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
479   sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
480   if (st->name != NULL)
481     sstrncpy (vl.plugin_instance, st->name,  sizeof (vl.plugin_instance));
482   sstrncpy (vl.type, type, sizeof (vl.type));
483   if (type_inst != NULL)
484     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
485
486   plugin_dispatch_values (&vl);
487 }
488
489 static int memcached_read (user_data_t *user_data)
490 {
491   char buf[4096];
492   char *fields[3];
493   char *ptr;
494   char *line;
495   char *saveptr;
496   int fields_num;
497
498   gauge_t bytes_used = NAN;
499   gauge_t bytes_total = NAN;
500   gauge_t hits = NAN;
501   gauge_t gets = NAN;
502   derive_t rusage_user = 0;
503   derive_t rusage_syst = 0;
504   derive_t octets_rx = 0;
505   derive_t octets_tx = 0;
506
507   memcached_t *st;
508   st = user_data->data;
509
510   /* get data from daemon */
511   if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) {
512     return -1;
513   }
514
515 #define FIELD_IS(cnst) \
516   (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
517
518   ptr = buf;
519   saveptr = NULL;
520   while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
521   {
522     int name_len;
523
524     ptr = NULL;
525
526     fields_num = strsplit(line, fields, 3);
527     if (fields_num != 3)
528       continue;
529
530     name_len = strlen(fields[1]);
531     if (name_len == 0)
532       continue;
533
534     /*
535      * For an explanation on these fields please refer to
536      * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
537      */
538
539     /*
540      * CPU time consumed by the memcached process
541      */
542     if (FIELD_IS ("rusage_user"))
543     {
544       rusage_user = atoll (fields[2]);
545     }
546     else if (FIELD_IS ("rusage_system"))
547     {
548       rusage_syst = atoll(fields[2]);
549     }
550
551     /*
552      * Number of threads of this instance
553      */
554     else if (FIELD_IS ("threads"))
555     {
556       submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
557     }
558
559     /*
560      * Number of items stored
561      */
562     else if (FIELD_IS ("curr_items"))
563     {
564       submit_gauge ("memcached_items", "current", atof (fields[2]), st);
565     }
566
567     /*
568      * Number of bytes used and available (total - used)
569      */
570     else if (FIELD_IS ("bytes"))
571     {
572       bytes_used = atof (fields[2]);
573     }
574     else if (FIELD_IS ("limit_maxbytes"))
575     {
576       bytes_total = atof(fields[2]);
577     }
578
579     /*
580      * Connections
581      */
582     else if (FIELD_IS ("curr_connections"))
583     {
584       submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
585     }
586
587     /*
588      * Commands
589      */
590     else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
591     {
592       const char *name = fields[1] + 4;
593       submit_derive ("memcached_command", name, atoll (fields[2]), st);
594       if (strcmp (name, "get") == 0)
595         gets = atof (fields[2]);
596     }
597
598     /*
599      * Operations on the cache, i. e. cache hits, cache misses and evictions of items
600      */
601     else if (FIELD_IS ("get_hits"))
602     {
603       submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
604       hits = atof (fields[2]);
605     }
606     else if (FIELD_IS ("get_misses"))
607     {
608       submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
609     }
610     else if (FIELD_IS ("evictions"))
611     {
612       submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
613     }
614
615     /*
616      * Network traffic
617      */
618     else if (FIELD_IS ("bytes_read"))
619     {
620       octets_rx = atoll (fields[2]);
621     }
622     else if (FIELD_IS ("bytes_written"))
623     {
624       octets_tx = atoll (fields[2]);
625     }
626   } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
627
628   if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
629     submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
630
631   if ((rusage_user != 0) || (rusage_syst != 0))
632     submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
633
634   if ((octets_rx != 0) || (octets_tx != 0))
635     submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
636
637   if (!isnan (gets) && !isnan (hits))
638   {
639     gauge_t rate = NAN;
640
641     if (gets != 0.0)
642       rate = 100.0 * hits / gets;
643
644     submit_gauge ("percent", "hitratio", rate, st);
645   }
646
647   return 0;
648 }
649
650 void module_register (void)
651 {
652   plugin_register_complex_config ("memcached", memcached_config);
653   plugin_register_init ("memcached", memcached_init);
654 }