memcached plugin: Added a plugin to query stats from memcached servers.
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c
3  * Copyright (C) 2007  Antony Dovgal, heavily based on hddtemp.c
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Antony Dovgal <tony at daylessday dot org>
21  *
22  **/
23
24 #include "collectd.h"
25 #include "common.h"
26 #include "plugin.h"
27 #include "configfile.h"
28
29 # include <poll.h>
30 # include <netdb.h>
31 # include <sys/socket.h>
32 # include <netinet/in.h>
33 # include <netinet/tcp.h>
34 # include <libgen.h> /* for basename */
35
36 #if HAVE_LINUX_MAJOR_H
37 # include <linux/major.h>
38 #endif
39
40 #define MEMCACHED_DEF_HOST "127.0.0.1"
41 #define MEMCACHED_DEF_PORT "11211"
42
43 #define MEMCACHED_RETRY_COUNT 100
44
45 static const char *config_keys[] =
46 {
47         "Host",
48         "Port",
49         NULL
50 };
51 static int config_keys_num = 2;
52
53 static char *memcached_host = NULL;
54 static char memcached_port[16];
55
56 static int memcached_query_daemon (char *buffer, int buffer_size) /* {{{ */
57 {
58         int fd;
59         ssize_t status;
60         int buffer_fill;
61
62         const char *host;
63         const char *port;
64
65         struct addrinfo  ai_hints;
66         struct addrinfo *ai_list, *ai_ptr;
67         int              ai_return, i = 0;
68
69         memset (&ai_hints, '\0', sizeof (ai_hints));
70         ai_hints.ai_flags    = 0;
71 #ifdef AI_ADDRCONFIG
72 /*      ai_hints.ai_flags   |= AI_ADDRCONFIG; */
73 #endif
74         ai_hints.ai_family   = AF_INET;
75         ai_hints.ai_socktype = SOCK_STREAM;
76         ai_hints.ai_protocol = 0;
77
78         host = memcached_host;
79         if (host == NULL) {
80                 host = MEMCACHED_DEF_HOST;
81         }
82
83         port = memcached_port;
84         if (strlen (port) == 0) {
85                 port = MEMCACHED_DEF_PORT;
86         }
87
88         if ((ai_return = getaddrinfo (host, port, NULL, &ai_list)) != 0) {
89                 char errbuf[1024];
90                 ERROR ("memcached: getaddrinfo (%s, %s): %s",
91                                 host, port,
92                                 (ai_return == EAI_SYSTEM)
93                                 ? sstrerror (errno, errbuf, sizeof (errbuf))
94                                 : gai_strerror (ai_return));
95                 return -1;
96         }
97
98         fd = -1;
99         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
100                 /* create our socket descriptor */
101                 if ((fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol)) < 0) {
102                         char errbuf[1024];
103                         ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
104                         continue;
105                 }
106
107                 /* connect to the memcached daemon */
108                 if (connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen)) {
109                         char errbuf[1024];
110                         shutdown(fd, SHUT_RDWR);
111                         close(fd);
112                         fd = -1;
113                         continue;
114                 }
115
116                 /* A socket could be opened and connecting succeeded. We're
117                  * done. */
118                 break;
119         }
120
121         freeaddrinfo (ai_list);
122
123         if (fd < 0) {
124                 ERROR ("memcached: Could not connect to daemon.");
125                 return -1;
126         }
127
128         if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
129                 ERROR ("memcached: Could not send command to the memcached daemon.");
130                 return -1;
131         }
132
133         {
134                 struct pollfd p;
135                 int n;
136
137                 p.fd = fd;
138                 p.events = POLLIN|POLLERR|POLLHUP;
139                 p.revents = 0;
140
141                 n = poll(&p, 1, 3);
142
143                 if (n <= 0) {
144                         ERROR ("memcached: poll() failed or timed out");
145                         return -1;
146                 }
147         }
148
149         /* receive data from the memcached daemon */
150         memset (buffer, '\0', buffer_size);
151
152         buffer_fill = 0;
153         while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
154                 if (i > MEMCACHED_RETRY_COUNT) {
155                         ERROR("recv() timed out");
156                         break;
157                 }
158                 i++;
159
160                 if (status == -1) {
161                         char errbuf[1024];
162
163                         if (errno == EAGAIN) {
164                                 continue;
165                         }
166
167                         ERROR ("memcached: Error reading from socket: %s",
168                                         sstrerror (errno, errbuf, sizeof (errbuf)));
169                         shutdown(fd, SHUT_RDWR);
170                         close (fd);
171                         return -1;
172                 }
173                 buffer_fill += status;
174
175                 if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
176                         /* we got all the data */
177                         break;
178                 }
179         }
180
181         if (buffer_fill >= buffer_size) {
182                 buffer[buffer_size - 1] = '\0';
183                 WARNING ("memcached: Message from memcached has been truncated.");
184         } else if (buffer_fill == 0) {
185                 WARNING ("memcached: Peer has unexpectedly shut down the socket. "
186                                 "Buffer: `%s'", buffer);
187                 shutdown(fd, SHUT_RDWR);
188                 close(fd);
189                 return -1;
190         }
191
192         shutdown(fd, SHUT_RDWR);
193         close(fd);
194         return 0;
195 }
196 /* }}} */
197
198 static int memcached_config (const char *key, const char *value) /* {{{ */
199 {
200         if (strcasecmp (key, "Host") == 0) {
201                 if (memcached_host != NULL) {
202                         free (memcached_host);
203                 }
204                 memcached_host = strdup (value);
205         } else if (strcasecmp (key, "Port") == 0) {
206                 int port = (int) (atof (value));
207                 if ((port > 0) && (port <= 65535)) {
208                         snprintf (memcached_port, sizeof (memcached_port), "%i", port);
209                 } else {
210                         strncpy (memcached_port, value, sizeof (memcached_port));
211                 }
212                 memcached_port[sizeof (memcached_port) - 1] = '\0';
213         } else {
214                 return -1;
215         }
216
217         return 0;
218 }
219 /* }}} */
220
221 #if 0
222 static void memcached_submit_items(double curr_items, unsigned long long total_items) /* {{{ */
223 {
224         value_t values[2];
225         value_list_t vl = VALUE_LIST_INIT;
226
227         values[0].gauge = curr_items;
228         values[1].counter = total_items;
229
230         vl.values = values;
231         vl.values_len = 2;
232         vl.time = time (NULL);
233         strcpy (vl.host, hostname_g);
234         strcpy (vl.plugin, "memcached");
235
236         plugin_dispatch_values ("memcached_items", &vl);
237 }
238 /* }}} */
239 #endif
240
241 static void memcached_submit_connections(double curr_connections, unsigned long long total_connections) /* {{{ */
242 {
243         value_t values[2];
244         value_list_t vl = VALUE_LIST_INIT;
245
246         values[0].gauge = curr_connections;
247         values[1].counter = total_connections;
248
249         vl.values = values;
250         vl.values_len = 2;
251         vl.time = time (NULL);
252         strcpy (vl.host, hostname_g);
253         strcpy (vl.plugin, "memcached");
254
255         plugin_dispatch_values ("memcached_connections", &vl);
256 }
257 /* }}} */
258
259 static void memcached_submit_bytes(unsigned long long bytes_read, unsigned long long bytes_written) /* {{{ */
260 {
261         value_t values[2];
262         value_list_t vl = VALUE_LIST_INIT;
263
264         values[0].counter = bytes_read;
265         values[1].counter = bytes_written;
266
267         vl.values = values;
268         vl.values_len = 2;
269         vl.time = time (NULL);
270         strcpy (vl.host, hostname_g);
271         strcpy (vl.plugin, "memcached");
272
273         plugin_dispatch_values ("memcached_bytes", &vl);
274 }
275 /* }}} */
276
277 static void memcached_submit_cmd(unsigned long long cmd_get, unsigned long long cmd_set, unsigned long long get_hits, unsigned long long get_misses) /* {{{ */
278 {
279         value_t values[4];
280         value_list_t vl = VALUE_LIST_INIT;
281
282         values[0].counter = cmd_get;
283         values[1].counter = cmd_set;
284         values[2].counter = get_hits;
285         values[3].counter = get_misses;
286
287         vl.values = values;
288         vl.values_len = 4;
289         vl.time = time (NULL);
290         strcpy (vl.host, hostname_g);
291         strcpy (vl.plugin, "memcached");
292
293         plugin_dispatch_values ("memcached_cmd", &vl);
294 }
295 /* }}} */
296
297 static void memcached_submit_rusage(unsigned long long rusage_user, unsigned long long rusage_system) /* {{{ */
298 {
299         value_t values[2];
300         value_list_t vl = VALUE_LIST_INIT;
301
302         values[0].counter = rusage_user;
303         values[1].counter = rusage_system;
304
305         vl.values = values;
306         vl.values_len = 2;
307         vl.time = time (NULL);
308         strcpy (vl.host, hostname_g);
309         strcpy (vl.plugin, "memcached");
310
311         plugin_dispatch_values ("memcached_rusage", &vl);
312 }
313 /* }}} */
314
315 static int memcached_read (void) /* {{{ */
316 {
317         char buf[1024];
318         char *lines[128];
319         char *fields[3];
320         char *ptr;
321         char *saveptr;
322         int fields_num;
323         int lines_num = 0;
324         int i;
325         unsigned long long total_connections = 0, bytes_read = 0, bytes_written = 0, cmd_get = 0, cmd_set = 0, get_hits = 0, get_misses = 0, rusage_user = 0, rusage_system = 0;
326         double curr_connections = 0;
327
328         /* get data from daemon */
329         if (memcached_query_daemon (buf, sizeof (buf)) < 0) {
330                 return -1;
331         }
332
333     ptr = buf;
334     saveptr = NULL;
335     while ((lines[lines_num] = strtok_r (ptr, "\n\r", &saveptr)) != NULL) {
336         ptr = NULL;
337         lines_num++;
338
339         if (lines_num >= 127) break;
340     }
341
342 #define FIELD_IS(cnst) \
343         (sizeof(cnst) - 1) == name_len && memcmp(cnst, fields[1], sizeof(cnst)) == 0
344
345         for (i = 0; i < lines_num; i++) {
346                 int name_len;
347
348                 fields_num = strsplit(lines[i], fields, 3);
349                 if (fields_num != 3) continue;
350
351                 name_len = strlen(fields[1]);
352                 if (name_len == 0) continue;
353
354                 if (FIELD_IS("rusage_user")) {
355                         rusage_user = atoll(fields[2]);
356                 } else if (FIELD_IS("rusage_system")) {
357                         rusage_system = atoll(fields[2]);
358 /*              } else if (FIELD_IS("curr_items")) {
359                         curr_items = atof(fields[2]);
360                 } else if (FIELD_IS("total_items")) {
361                         total_items = atoll(fields[2]);
362                 } else if (FIELD_IS("bytes")) {
363                          bytes = atof(fields[2]); */
364                 } else if (FIELD_IS("curr_connections")) {
365                         curr_connections = atof(fields[2]);
366                 } else if (FIELD_IS("total_connections")) {
367                         total_connections = atoll(fields[2]);
368 /*              } else if (FIELD_IS("connection_structures")) {
369                         connection_structures = atof(fields[2]); */
370                 } else if (FIELD_IS("cmd_get")) {
371                         cmd_get = atoll(fields[2]);
372                 } else if (FIELD_IS("cmd_set")) {
373                         cmd_set = atoll(fields[2]);
374                 } else if (FIELD_IS("get_hits")) {
375                         get_hits = atoll(fields[2]);
376                 } else if (FIELD_IS("get_misses")) {
377                         get_misses = atoll(fields[2]);
378 /*              } else if (FIELD_IS("evictions")) {
379                         evictions = atoll(fields[2]); */
380                 } else if (FIELD_IS("bytes_read")) {
381                         bytes_read = atoll(fields[2]);
382                 } else if (FIELD_IS("bytes_written")) {
383                         bytes_written = atoll(fields[2]);
384 /*              } else if (FIELD_IS("limit_maxbytes")) {
385                         limit_maxbytes = atof(fields[2]);
386                 } else if (FIELD_IS("threads")) {
387                         threads = atof(fields[2]);  */
388                 }
389         }
390
391 #if 0
392         memcached_submit_items(curr_items, total_items);
393 #endif
394         memcached_submit_connections(curr_connections, total_connections);
395         memcached_submit_bytes(bytes_read, bytes_written);
396         memcached_submit_cmd(cmd_get, cmd_set, get_hits, get_misses);
397         memcached_submit_rusage(rusage_user, rusage_system);
398
399         return 0;
400 }
401 /* }}} */
402
403 void module_register (void) /* {{{ */
404 {
405         plugin_register_config ("memcached", memcached_config, config_keys, config_keys_num);
406         plugin_register_read ("memcached", memcached_read);
407 }
408 /* }}} */
409
410 /*
411  * Local variables:
412  * tab-width: 4
413  * c-basic-offset: 4
414  * End:
415  * vim600: sw=4 ts=4 fdm=marker
416  * vim<600: sw=4 ts=4
417  */
418