2 * collectd - src/memcached.c, based on src/hddtemp.c
3 * Copyright (C) 2007 Antony Dovgal
4 * Copyright (C) 2007-2010 Florian Forster
5 * Copyright (C) 2009 Doug MacEachern
6 * Copyright (C) 2009 Franck Lombardi
8 * ############################
9 * ##### BIG FAT WARNING ######
10 * ############################
11 * Experimental multi instances version by Nicolas Szalay <nico@rottenbytes.info>
12 * Parts for config parsing taken from src/apache.c
16 * This program is free software; you can redistribute it and/or modify it
17 * under the terms of the GNU General Public License as published by the
18 * Free Software Foundation; either version 2 of the License, or (at your
19 * option) any later version.
21 * This program is distributed in the hope that it will be useful, but
22 * WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
24 * General Public License for more details.
26 * You should have received a copy of the GNU General Public License along
27 * with this program; if not, write to the Free Software Foundation, Inc.,
28 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 * Antony Dovgal <tony at daylessday dot org>
32 * Florian octo Forster <octo at collectd.org>
33 * Doug MacEachern <dougm at hyperic.com>
40 #include "configfile.h"
44 # include <sys/socket.h>
46 # include <netinet/in.h>
47 # include <netinet/tcp.h>
49 /* Hack to work around the missing define in AIX */
51 # define MSG_DONTWAIT MSG_NONBLOCK
54 #define MEMCACHED_DEF_HOST "127.0.0.1"
55 #define MEMCACHED_DEF_PORT "11211"
57 #define MEMCACHED_RETRY_COUNT 100
67 typedef struct memcached_s memcached_t;
69 /* Tina says : "we don't need another hero^W^Wader" */
70 static int memcached_read (user_data_t *user_data);
72 static void memcached_free (memcached_t *st)
83 static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data)
92 if (st->socket != NULL) {
93 struct sockaddr_un serv_addr;
95 memset (&serv_addr, 0, sizeof (serv_addr));
96 serv_addr.sun_family = AF_UNIX;
97 sstrncpy (serv_addr.sun_path, st->socket,
98 sizeof (serv_addr.sun_path));
100 /* create our socket descriptor */
101 fd = socket (AF_UNIX, SOCK_STREAM, 0);
104 ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
111 if (st->port != NULL) {
115 struct addrinfo ai_hints;
116 struct addrinfo *ai_list, *ai_ptr;
119 memset (&ai_hints, '\0', sizeof (ai_hints));
120 ai_hints.ai_flags = 0;
122 /* ai_hints.ai_flags |= AI_ADDRCONFIG; */
124 ai_hints.ai_family = AF_INET;
125 ai_hints.ai_socktype = SOCK_STREAM;
126 ai_hints.ai_protocol = 0;
130 host = MEMCACHED_DEF_HOST;
134 if (strlen (port) == 0) {
135 port = MEMCACHED_DEF_PORT;
138 if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
140 ERROR ("memcached: getaddrinfo (%s, %s): %s",
142 (ai_return == EAI_SYSTEM)
143 ? sstrerror (errno, errbuf, sizeof (errbuf))
144 : gai_strerror (ai_return));
149 for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
150 /* create our socket descriptor */
151 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
154 ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
158 /* connect to the memcached daemon */
159 status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen);
161 shutdown (fd, SHUT_RDWR);
167 /* A socket could be opened and connecting succeeded. We're
172 freeaddrinfo (ai_list);
177 ERROR ("memcached: Could not connect to daemon.");
181 if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
182 ERROR ("memcached: Could not send command to the memcached daemon.");
190 memset (&p, 0, sizeof (p));
192 p.events = POLLIN | POLLERR | POLLHUP;
195 status = poll (&p, /* nfds = */ 1,
196 /* timeout = */ CDTIME_T_TO_MS (interval_g));
201 ERROR ("memcached: poll(2) timed out after %.3f seconds.",
202 CDTIME_T_TO_DOUBLE (interval_g));
207 ERROR ("memcached: poll(2) failed: %s",
208 sstrerror (errno, errbuf, sizeof (errbuf)));
210 shutdown (fd, SHUT_RDWR);
216 /* receive data from the memcached daemon */
217 memset (buffer, '\0', buffer_size);
220 while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
221 if (i > MEMCACHED_RETRY_COUNT) {
222 ERROR("recv() timed out");
230 if (errno == EAGAIN) {
234 ERROR ("memcached: Error reading from socket: %s",
235 sstrerror (errno, errbuf, sizeof (errbuf)));
236 shutdown(fd, SHUT_RDWR);
240 buffer_fill += status;
242 if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
243 /* we got all the data */
248 if (buffer_fill >= buffer_size) {
249 buffer[buffer_size - 1] = '\0';
250 WARNING ("memcached: Message from memcached has been truncated.");
251 } else if (buffer_fill == 0) {
252 WARNING ("memcached: Peer has unexpectedly shut down the socket. "
253 "Buffer: `%s'", buffer);
254 shutdown(fd, SHUT_RDWR);
259 shutdown(fd, SHUT_RDWR);
264 /* Configuration handling functiions
266 * <Instance "instance_name">
272 static int config_set_string (char **ret_string, oconfig_item_t *ci)
276 if ((ci->values_num != 1)
277 || (ci->values[0].type != OCONFIG_TYPE_STRING))
279 WARNING ("memcached plugin: The `%s' config option "
280 "needs exactly one string argument.", ci->key);
284 string = strdup (ci->values[0].value.string);
287 ERROR ("memcached plugin: strdup failed.");
291 if (*ret_string != NULL)
293 *ret_string = string;
298 static int config_add (oconfig_item_t *ci)
304 if ((ci->values_num != 1)
305 || (ci->values[0].type != OCONFIG_TYPE_STRING))
307 WARNING ("memcached plugin: The `%s' config option "
308 "needs exactly one string argument.", ci->key);
312 st = (memcached_t *) malloc (sizeof (*st));
315 ERROR ("memcached plugin: malloc failed.");
319 memset (st, 0, sizeof (*st));
321 status = config_set_string (&st->name, ci);
327 assert (st->name != NULL);
329 for (i = 0; i < ci->children_num; i++)
331 oconfig_item_t *child = ci->children + i;
333 if (strcasecmp ("Socket", child->key) == 0)
334 status = config_set_string (&st->socket, child);
335 else if (strcasecmp ("Host", child->key) == 0)
336 status = config_set_string (&st->host, child);
337 else if (strcasecmp ("Port", child->key) == 0)
338 status = config_set_string (&st->port, child);
341 WARNING ("memcached plugin: Option `%s' not allowed here.",
353 char callback_name[3*DATA_MAX_NAME_LEN];
355 memset (&ud, 0, sizeof (ud));
357 ud.free_func = (void *) memcached_free;
359 memset (callback_name, 0, sizeof (callback_name));
360 ssnprintf (callback_name, sizeof (callback_name),
362 (st->host != NULL) ? st->host : hostname_g,
363 (st->port != NULL) ? st->port : "default"),
365 status = plugin_register_complex_read (/* group = */ NULL,
366 /* name = */ callback_name,
367 /* callback = */ memcached_read,
368 /* interval = */ NULL,
369 /* user_data = */ &ud);
381 static int config (oconfig_item_t *ci)
386 for (i = 0; i < ci->children_num; i++)
388 oconfig_item_t *child = ci->children + i;
390 if (strcasecmp ("Instance", child->key) == 0)
393 WARNING ("memcached plugin: The configuration option "
394 "\"%s\" is not allowed here. Did you "
395 "forget to add an <Instance /> block "
396 "around the configuration?",
398 } /* for (ci->children) */
403 static void submit_derive (const char *type, const char *type_inst,
404 derive_t value, memcached_t *st)
407 value_list_t vl = VALUE_LIST_INIT;
409 values[0].derive = value;
413 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
414 sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
415 if (st->name != NULL)
416 sstrncpy (vl.plugin_instance, st->name, sizeof (vl.plugin_instance));
417 sstrncpy (vl.type, type, sizeof (vl.type));
418 if (type_inst != NULL)
419 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
421 plugin_dispatch_values (&vl);
424 static void submit_derive2 (const char *type, const char *type_inst,
425 derive_t value0, derive_t value1, memcached_t *st)
428 value_list_t vl = VALUE_LIST_INIT;
430 values[0].derive = value0;
431 values[1].derive = value1;
435 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
436 sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
437 if (st->name != NULL)
438 sstrncpy (vl.plugin_instance, st->name, sizeof (vl.plugin_instance));
439 sstrncpy (vl.type, type, sizeof (vl.type));
440 if (type_inst != NULL)
441 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
443 plugin_dispatch_values (&vl);
446 static void submit_gauge (const char *type, const char *type_inst,
447 gauge_t value, memcached_t *st)
450 value_list_t vl = VALUE_LIST_INIT;
452 values[0].gauge = value;
456 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
457 sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
458 if (st->name != NULL)
459 sstrncpy (vl.plugin_instance, st->name, sizeof (vl.plugin_instance));
460 sstrncpy (vl.type, type, sizeof (vl.type));
461 if (type_inst != NULL)
462 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
464 plugin_dispatch_values (&vl);
467 static void submit_gauge2 (const char *type, const char *type_inst,
468 gauge_t value0, gauge_t value1, memcached_t *st)
471 value_list_t vl = VALUE_LIST_INIT;
473 values[0].gauge = value0;
474 values[1].gauge = value1;
478 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
479 sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
480 if (st->name != NULL)
481 sstrncpy (vl.plugin_instance, st->name, sizeof (vl.plugin_instance));
482 sstrncpy (vl.type, type, sizeof (vl.type));
483 if (type_inst != NULL)
484 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
486 plugin_dispatch_values (&vl);
489 static int memcached_read (user_data_t *user_data)
498 gauge_t bytes_used = NAN;
499 gauge_t bytes_total = NAN;
502 derive_t rusage_user = 0;
503 derive_t rusage_syst = 0;
504 derive_t octets_rx = 0;
505 derive_t octets_tx = 0;
508 st = user_data->data;
510 /* get data from daemon */
511 if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) {
515 #define FIELD_IS(cnst) \
516 (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
520 while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
526 fields_num = strsplit(line, fields, 3);
530 name_len = strlen(fields[1]);
535 * For an explanation on these fields please refer to
536 * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
540 * CPU time consumed by the memcached process
542 if (FIELD_IS ("rusage_user"))
544 rusage_user = atoll (fields[2]);
546 else if (FIELD_IS ("rusage_system"))
548 rusage_syst = atoll(fields[2]);
552 * Number of threads of this instance
554 else if (FIELD_IS ("threads"))
556 submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
560 * Number of items stored
562 else if (FIELD_IS ("curr_items"))
564 submit_gauge ("memcached_items", "current", atof (fields[2]), st);
568 * Number of bytes used and available (total - used)
570 else if (FIELD_IS ("bytes"))
572 bytes_used = atof (fields[2]);
574 else if (FIELD_IS ("limit_maxbytes"))
576 bytes_total = atof(fields[2]);
582 else if (FIELD_IS ("curr_connections"))
584 submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
590 else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
592 const char *name = fields[1] + 4;
593 submit_derive ("memcached_command", name, atoll (fields[2]), st);
594 if (strcmp (name, "get") == 0)
595 gets = atof (fields[2]);
599 * Operations on the cache, i. e. cache hits, cache misses and evictions of items
601 else if (FIELD_IS ("get_hits"))
603 submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
604 hits = atof (fields[2]);
606 else if (FIELD_IS ("get_misses"))
608 submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
610 else if (FIELD_IS ("evictions"))
612 submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
618 else if (FIELD_IS ("bytes_read"))
620 octets_rx = atoll (fields[2]);
622 else if (FIELD_IS ("bytes_written"))
624 octets_tx = atoll (fields[2]);
626 } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
628 if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
629 submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
631 if ((rusage_user != 0) || (rusage_syst != 0))
632 submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
634 if ((octets_rx != 0) || (octets_tx != 0))
635 submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
637 if (!isnan (gets) && !isnan (hits))
642 rate = 100.0 * hits / gets;
644 submit_gauge ("percent", "hitratio", rate, st);
650 void module_register (void)
652 plugin_register_complex_config ("memcached", config);