Merge branch 'collectd-4.2'
[collectd.git] / src / unixsock.c
1 /**
2  * collectd - src/unixsock.c
3  * Copyright (C) 2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25 #include "configfile.h"
26 #include "utils_cmd_putval.h"
27
28 /* Folks without pthread will need to disable this plugin. */
29 #include <pthread.h>
30
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <sys/un.h>
34
35 #include <grp.h>
36
37 #ifndef UNIX_PATH_MAX
38 # define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path)
39 #endif
40
41 #define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock"
42
43 /*
44  * Private data structures
45  */
46 /* linked list of cached values */
47 typedef struct value_cache_s
48 {
49         char       name[4*DATA_MAX_NAME_LEN];
50         int        values_num;
51         gauge_t   *gauge;
52         counter_t *counter;
53         const data_set_t *ds;
54         time_t     time;
55         struct value_cache_s *next;
56 } value_cache_t;
57
58 /*
59  * Private variables
60  */
61 /* valid configuration file keys */
62 static const char *config_keys[] =
63 {
64         "SocketFile",
65         "SocketGroup",
66         "SocketPerms",
67         NULL
68 };
69 static int config_keys_num = 3;
70
71 static int loop = 0;
72
73 /* socket configuration */
74 static int   sock_fd    = -1;
75 static char *sock_file  = NULL;
76 static char *sock_group = NULL;
77 static int   sock_perms = S_IRWXU | S_IRWXG;
78
79 static pthread_t listen_thread = (pthread_t) 0;
80
81 /* Linked list and auxilliary variables for saving values */
82 static value_cache_t   *cache_head = NULL;
83 static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
84 static unsigned int     cache_oldest = UINT_MAX;
85
86 /*
87  * Functions
88  */
89 static value_cache_t *cache_search (const char *name)
90 {
91         value_cache_t *vc;
92
93         for (vc = cache_head; vc != NULL; vc = vc->next)
94         {
95                 if (strcmp (vc->name, name) == 0)
96                         break;
97         } /* for vc = cache_head .. NULL */
98
99         return (vc);
100 } /* value_cache_t *cache_search */
101
102 static int cache_insert (const data_set_t *ds, const value_list_t *vl)
103 {
104         /* We're called from `cache_update' so we don't need to lock the mutex */
105         value_cache_t *vc;
106         int i;
107
108         DEBUG ("unixsock plugin: cache_insert: ds->type = %s; ds->ds_num = %i;"
109                         " vl->values_len = %i;",
110                         ds->type, ds->ds_num, vl->values_len);
111 #if COLLECT_DEBUG
112         assert (ds->ds_num == vl->values_len);
113 #else
114         if (ds->ds_num != vl->values_len)
115         {
116                 ERROR ("unixsock plugin: ds->type = %s: (ds->ds_num = %i) != "
117                                 "(vl->values_len = %i)",
118                                 ds->type, ds->ds_num, vl->values_len);
119                 return (-1);
120         }
121 #endif
122
123         vc = (value_cache_t *) malloc (sizeof (value_cache_t));
124         if (vc == NULL)
125         {
126                 char errbuf[1024];
127                 pthread_mutex_unlock (&cache_lock);
128                 ERROR ("unixsock plugin: malloc failed: %s",
129                                 sstrerror (errno, errbuf, sizeof (errbuf)));
130                 return (-1);
131         }
132
133         vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len);
134         if (vc->gauge == NULL)
135         {
136                 char errbuf[1024];
137                 pthread_mutex_unlock (&cache_lock);
138                 ERROR ("unixsock plugin: malloc failed: %s",
139                                 sstrerror (errno, errbuf, sizeof (errbuf)));
140                 free (vc);
141                 return (-1);
142         }
143
144         vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len);
145         if (vc->counter == NULL)
146         {
147                 char errbuf[1024];
148                 pthread_mutex_unlock (&cache_lock);
149                 ERROR ("unixsock plugin: malloc failed: %s",
150                                 sstrerror (errno, errbuf, sizeof (errbuf)));
151                 free (vc->gauge);
152                 free (vc);
153                 return (-1);
154         }
155
156         if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds))
157         {
158                 pthread_mutex_unlock (&cache_lock);
159                 ERROR ("unixsock plugin: FORMAT_VL failed.");
160                 free (vc->counter);
161                 free (vc->gauge);
162                 free (vc);
163                 return (-1);
164         }
165
166         for (i = 0; i < ds->ds_num; i++)
167         {
168                 if (ds->ds[i].type == DS_TYPE_COUNTER)
169                 {
170                         vc->gauge[i] = 0.0;
171                         vc->counter[i] = vl->values[i].counter;
172                 }
173                 else if (ds->ds[i].type == DS_TYPE_GAUGE)
174                 {
175                         vc->gauge[i] = vl->values[i].gauge;
176                         vc->counter[i] = 0;
177                 }
178                 else
179                 {
180                         vc->gauge[i] = 0.0;
181                         vc->counter[i] = 0;
182                 }
183         }
184         vc->values_num = ds->ds_num;
185         vc->ds = ds;
186
187         vc->next = cache_head;
188         cache_head = vc;
189
190         vc->time = vl->time;
191         if (vc->time < cache_oldest)
192                 cache_oldest = vc->time;
193
194         pthread_mutex_unlock (&cache_lock);
195         return (0);
196 } /* int cache_insert */
197
198 static int cache_update (const data_set_t *ds, const value_list_t *vl)
199 {
200         char name[4*DATA_MAX_NAME_LEN];;
201         value_cache_t *vc;
202         int i;
203
204         if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
205                 return (-1);
206
207         pthread_mutex_lock (&cache_lock);
208
209         vc = cache_search (name);
210
211         /* pthread_mutex_lock is called by cache_insert. */
212         if (vc == NULL)
213                 return (cache_insert (ds, vl));
214
215         assert (vc->values_num == ds->ds_num);
216         assert (vc->values_num == vl->values_len);
217
218         /* Avoid floating-point exceptions due to division by zero. */
219         if (vc->time >= vl->time)
220         {
221                 pthread_mutex_unlock (&cache_lock);
222                 ERROR ("unixsock plugin: vc->time >= vl->time. vc->time = %u; "
223                                 "vl->time = %u; vl = %s;",
224                                 (unsigned int) vc->time, (unsigned int) vl->time,
225                                 name);
226                 return (-1);
227         } /* if (vc->time >= vl->time) */
228
229         /*
230          * Update the values. This is possibly a lot more that you'd expect
231          * because we honor min and max values and handle counter overflows here.
232          */
233         for (i = 0; i < ds->ds_num; i++)
234         {
235                 if (ds->ds[i].type == DS_TYPE_COUNTER)
236                 {
237                         if (vl->values[i].counter < vc->counter[i])
238                         {
239                                 if (vl->values[i].counter <= 4294967295U)
240                                 {
241                                         vc->gauge[i] = ((4294967295U - vl->values[i].counter)
242                                                         + vc->counter[i]) / (vl->time - vc->time);
243                                 }
244                                 else
245                                 {
246                                         vc->gauge[i] = ((18446744073709551615ULL - vl->values[i].counter)
247                                                 + vc->counter[i]) / (vl->time - vc->time);
248                                 }
249                         }
250                         else
251                         {
252                                 vc->gauge[i] = (vl->values[i].counter - vc->counter[i])
253                                         / (vl->time - vc->time);
254                         }
255
256                         vc->counter[i] = vl->values[i].counter;
257                 }
258                 else if (ds->ds[i].type == DS_TYPE_GAUGE)
259                 {
260                         vc->gauge[i] = vl->values[i].gauge;
261                         vc->counter[i] = 0;
262                 }
263                 else
264                 {
265                         vc->gauge[i] = NAN;
266                         vc->counter[i] = 0;
267                 }
268
269                 if (isnan (vc->gauge[i])
270                                 || (!isnan (ds->ds[i].min) && (vc->gauge[i] < ds->ds[i].min))
271                                 || (!isnan (ds->ds[i].max) && (vc->gauge[i] > ds->ds[i].max)))
272                         vc->gauge[i] = NAN;
273         } /* for i = 0 .. ds->ds_num */
274
275         vc->ds = ds;
276         vc->time = vl->time;
277
278         if (vc->time < cache_oldest)
279                 cache_oldest = vc->time;
280
281         pthread_mutex_unlock (&cache_lock);
282         return (0);
283 } /* int cache_update */
284
285 static void cache_flush (int max_age)
286 {
287         value_cache_t *this;
288         value_cache_t *prev;
289         time_t now;
290
291         pthread_mutex_lock (&cache_lock);
292
293         now = time (NULL);
294
295         if ((now - cache_oldest) <= max_age)
296         {
297                 pthread_mutex_unlock (&cache_lock);
298                 return;
299         }
300         
301         cache_oldest = now;
302
303         prev = NULL;
304         this = cache_head;
305
306         while (this != NULL)
307         {
308                 if ((now - this->time) <= max_age)
309                 {
310                         if (this->time < cache_oldest)
311                                 cache_oldest = this->time;
312
313                         prev = this;
314                         this = this->next;
315                         continue;
316                 }
317
318                 if (prev == NULL)
319                         cache_head = this->next;
320                 else
321                         prev->next = this->next;
322
323                 free (this->gauge);
324                 free (this->counter);
325                 free (this);
326
327                 if (prev == NULL)
328                         this = cache_head;
329                 else
330                         this = prev->next;
331         } /* while (this != NULL) */
332
333         pthread_mutex_unlock (&cache_lock);
334 } /* void cache_flush */
335
336 static int us_open_socket (void)
337 {
338         struct sockaddr_un sa;
339         int status;
340
341         sock_fd = socket (PF_UNIX, SOCK_STREAM, 0);
342         if (sock_fd < 0)
343         {
344                 char errbuf[1024];
345                 ERROR ("unixsock plugin: socket failed: %s",
346                                 sstrerror (errno, errbuf, sizeof (errbuf)));
347                 return (-1);
348         }
349
350         memset (&sa, '\0', sizeof (sa));
351         sa.sun_family = AF_UNIX;
352         strncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
353                         sizeof (sa.sun_path) - 1);
354         /* unlink (sa.sun_path); */
355
356         DEBUG ("unixsock plugin: socket path = %s", sa.sun_path);
357
358         status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa));
359         if (status != 0)
360         {
361                 char errbuf[1024];
362                 sstrerror (errno, errbuf, sizeof (errbuf));
363                 ERROR ("unixsock plugin: bind failed: %s", errbuf);
364                 close (sock_fd);
365                 sock_fd = -1;
366                 return (-1);
367         }
368
369         chmod (sa.sun_path, sock_perms);
370
371         status = listen (sock_fd, 8);
372         if (status != 0)
373         {
374                 char errbuf[1024];
375                 ERROR ("unixsock plugin: listen failed: %s",
376                                 sstrerror (errno, errbuf, sizeof (errbuf)));
377                 close (sock_fd);
378                 sock_fd = -1;
379                 return (-1);
380         }
381
382         do
383         {
384                 char *grpname;
385                 struct group *g;
386                 struct group sg;
387                 char grbuf[2048];
388
389                 grpname = (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME;
390                 g = NULL;
391
392                 status = getgrnam_r (grpname, &sg, grbuf, sizeof (grbuf), &g);
393                 if (status != 0)
394                 {
395                         char errbuf[1024];
396                         WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", grpname,
397                                         sstrerror (errno, errbuf, sizeof (errbuf)));
398                         break;
399                 }
400                 if (g == NULL)
401                 {
402                         WARNING ("unixsock plugin: No such group: `%s'",
403                                         grpname);
404                         break;
405                 }
406
407                 if (chown ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
408                                         (uid_t) -1, g->gr_gid) != 0)
409                 {
410                         char errbuf[1024];
411                         WARNING ("unixsock plugin: chown (%s, -1, %i) failed: %s",
412                                         (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
413                                         (int) g->gr_gid,
414                                         sstrerror (errno, errbuf, sizeof (errbuf)));
415                 }
416         } while (0);
417
418         return (0);
419 } /* int us_open_socket */
420
421 static int us_handle_getval (FILE *fh, char **fields, int fields_num)
422 {
423         char *hostname;
424         char *plugin;
425         char *plugin_instance;
426         char *type;
427         char *type_instance;
428         char  name[4*DATA_MAX_NAME_LEN];
429         value_cache_t *vc;
430         int   status;
431         int   i;
432
433         if (fields_num != 2)
434         {
435                 DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num);
436                 fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n",
437                                 fields_num);
438                 fflush (fh);
439                 return (-1);
440         }
441         DEBUG ("unixsock plugin: Got query for `%s'", fields[1]);
442
443         status = parse_identifier (fields[1], &hostname,
444                         &plugin, &plugin_instance,
445                         &type, &type_instance);
446         if (status != 0)
447         {
448                 DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]);
449                 fprintf (fh, "-1 Cannot parse identifier.\n");
450                 fflush (fh);
451                 return (-1);
452         }
453
454         status = format_name (name, sizeof (name),
455                         hostname, plugin, plugin_instance, type, type_instance);
456         if (status != 0)
457         {
458                 fprintf (fh, "-1 format_name failed.\n");
459                 return (-1);
460         }
461
462         pthread_mutex_lock (&cache_lock);
463
464         DEBUG ("vc = cache_search (%s)", name);
465         vc = cache_search (name);
466
467         if (vc == NULL)
468         {
469                 DEBUG ("Did not find cache entry.");
470                 fprintf (fh, "-1 No such value");
471         }
472         else
473         {
474                 DEBUG ("Found cache entry.");
475                 fprintf (fh, "%i", vc->values_num);
476                 for (i = 0; i < vc->values_num; i++)
477                 {
478                         fprintf (fh, " %s=", vc->ds->ds[i].name);
479                         if (isnan (vc->gauge[i]))
480                                 fprintf (fh, "NaN");
481                         else
482                                 fprintf (fh, "%12e", vc->gauge[i]);
483                 }
484         }
485
486         /* Free the mutex as soon as possible and definitely before flushing */
487         pthread_mutex_unlock (&cache_lock);
488
489         fprintf (fh, "\n");
490         fflush (fh);
491
492         return (0);
493 } /* int us_handle_getval */
494
495 static int us_handle_listval (FILE *fh, char **fields, int fields_num)
496 {
497         char buffer[1024];
498         char **value_list = NULL;
499         int value_list_len = 0;
500         value_cache_t *entry;
501         int i;
502
503         if (fields_num != 1)
504         {
505                 DEBUG ("unixsock plugin: us_handle_listval: "
506                                 "Wrong number of fields: %i", fields_num);
507                 fprintf (fh, "-1 Wrong number of fields: Got %i, expected 1.\n",
508                                 fields_num);
509                 fflush (fh);
510                 return (-1);
511         }
512
513         pthread_mutex_lock (&cache_lock);
514
515         for (entry = cache_head; entry != NULL; entry = entry->next)
516         {
517                 char **tmp;
518
519                 snprintf (buffer, sizeof (buffer), "%u %s\n",
520                                 (unsigned int) entry->time, entry->name);
521                 buffer[sizeof (buffer) - 1] = '\0';
522                 
523                 tmp = realloc (value_list, sizeof (char *) * (value_list_len + 1));
524                 if (tmp == NULL)
525                         continue;
526                 value_list = tmp;
527
528                 value_list[value_list_len] = strdup (buffer);
529
530                 if (value_list[value_list_len] != NULL)
531                         value_list_len++;
532         } /* for (entry) */
533
534         pthread_mutex_unlock (&cache_lock);
535
536         DEBUG ("unixsock plugin: us_handle_listval: value_list_len = %i", value_list_len);
537         fprintf (fh, "%i Values found\n", value_list_len);
538         for (i = 0; i < value_list_len; i++)
539                 fputs (value_list[i], fh);
540         fflush (fh);
541
542         return (0);
543 } /* int us_handle_listval */
544
545 static void *us_handle_client (void *arg)
546 {
547         int fd;
548         FILE *fh;
549         char buffer[1024];
550         char *fields[128];
551         int   fields_num;
552
553         fd = *((int *) arg);
554         free (arg);
555         arg = NULL;
556
557         DEBUG ("Reading from fd #%i", fd);
558
559         fh = fdopen (fd, "r+");
560         if (fh == NULL)
561         {
562                 char errbuf[1024];
563                 ERROR ("unixsock plugin: fdopen failed: %s",
564                                 sstrerror (errno, errbuf, sizeof (errbuf)));
565                 close (fd);
566                 pthread_exit ((void *) 1);
567         }
568
569         while (fgets (buffer, sizeof (buffer), fh) != NULL)
570         {
571                 int len;
572
573                 len = strlen (buffer);
574                 while ((len > 0)
575                                 && ((buffer[len - 1] == '\n') || (buffer[len - 1] == '\r')))
576                         buffer[--len] = '\0';
577
578                 if (len == 0)
579                         continue;
580
581                 DEBUG ("fgets -> buffer = %s; len = %i;", buffer, len);
582
583                 fields_num = strsplit (buffer, fields,
584                                 sizeof (fields) / sizeof (fields[0]));
585
586                 if (fields_num < 1)
587                 {
588                         close (fd);
589                         break;
590                 }
591
592                 if (strcasecmp (fields[0], "getval") == 0)
593                 {
594                         us_handle_getval (fh, fields, fields_num);
595                 }
596                 else if (strcasecmp (fields[0], "putval") == 0)
597                 {
598                         handle_putval (fh, fields, fields_num);
599                 }
600                 else if (strcasecmp (fields[0], "listval") == 0)
601                 {
602                         us_handle_listval (fh, fields, fields_num);
603                 }
604                 else
605                 {
606                         fprintf (fh, "-1 Unknown command: %s\n", fields[0]);
607                         fflush (fh);
608                 }
609         } /* while (fgets) */
610
611         DEBUG ("Exiting..");
612         close (fd);
613
614         pthread_exit ((void *) 0);
615 } /* void *us_handle_client */
616
617 static void *us_server_thread (void *arg)
618 {
619         int  status;
620         int *remote_fd;
621         pthread_t th;
622         pthread_attr_t th_attr;
623
624         if (us_open_socket () != 0)
625                 pthread_exit ((void *) 1);
626
627         while (loop != 0)
628         {
629                 DEBUG ("unixsock plugin: Calling accept..");
630                 status = accept (sock_fd, NULL, NULL);
631                 if (status < 0)
632                 {
633                         char errbuf[1024];
634
635                         if (errno == EINTR)
636                                 continue;
637
638                         ERROR ("unixsock plugin: accept failed: %s",
639                                         sstrerror (errno, errbuf, sizeof (errbuf)));
640                         close (sock_fd);
641                         sock_fd = -1;
642                         pthread_exit ((void *) 1);
643                 }
644
645                 remote_fd = (int *) malloc (sizeof (int));
646                 if (remote_fd == NULL)
647                 {
648                         char errbuf[1024];
649                         WARNING ("unixsock plugin: malloc failed: %s",
650                                         sstrerror (errno, errbuf, sizeof (errbuf)));
651                         close (status);
652                         continue;
653                 }
654                 *remote_fd = status;
655
656                 DEBUG ("Spawning child to handle connection on fd #%i", *remote_fd);
657
658                 pthread_attr_init (&th_attr);
659                 pthread_attr_setdetachstate (&th_attr, PTHREAD_CREATE_DETACHED);
660
661                 status = pthread_create (&th, &th_attr, us_handle_client, (void *) remote_fd);
662                 if (status != 0)
663                 {
664                         char errbuf[1024];
665                         WARNING ("unixsock plugin: pthread_create failed: %s",
666                                         sstrerror (errno, errbuf, sizeof (errbuf)));
667                         close (*remote_fd);
668                         free (remote_fd);
669                         continue;
670                 }
671         } /* while (loop) */
672
673         close (sock_fd);
674         sock_fd = -1;
675
676         status = unlink ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH);
677         if (status != 0)
678         {
679                 char errbuf[1024];
680                 NOTICE ("unixsock plugin: unlink (%s) failed: %s",
681                                 (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
682                                 sstrerror (errno, errbuf, sizeof (errbuf)));
683         }
684
685         return ((void *) 0);
686 } /* void *us_server_thread */
687
688 static int us_config (const char *key, const char *val)
689 {
690         if (strcasecmp (key, "SocketFile") == 0)
691         {
692                 sfree (sock_file);
693                 sock_file = strdup (val);
694         }
695         else if (strcasecmp (key, "SocketGroup") == 0)
696         {
697                 sfree (sock_group);
698                 sock_group = strdup (val);
699         }
700         else if (strcasecmp (key, "SocketPerms") == 0)
701         {
702                 sock_perms = (int) strtol (val, NULL, 8);
703         }
704         else
705         {
706                 return (-1);
707         }
708
709         return (0);
710 } /* int us_config */
711
712 static int us_init (void)
713 {
714         int status;
715
716         loop = 1;
717
718         status = pthread_create (&listen_thread, NULL, us_server_thread, NULL);
719         if (status != 0)
720         {
721                 char errbuf[1024];
722                 ERROR ("unixsock plugin: pthread_create failed: %s",
723                                 sstrerror (errno, errbuf, sizeof (errbuf)));
724                 return (-1);
725         }
726
727         return (0);
728 } /* int us_init */
729
730 static int us_shutdown (void)
731 {
732         void *ret;
733
734         loop = 0;
735
736         if (listen_thread != (pthread_t) 0)
737         {
738                 pthread_kill (listen_thread, SIGTERM);
739                 pthread_join (listen_thread, &ret);
740                 listen_thread = (pthread_t) 0;
741         }
742
743         plugin_unregister_init ("unixsock");
744         plugin_unregister_write ("unixsock");
745         plugin_unregister_shutdown ("unixsock");
746
747         return (0);
748 } /* int us_shutdown */
749
750 static int us_write (const data_set_t *ds, const value_list_t *vl)
751 {
752         cache_update (ds, vl);
753         cache_flush (2 * interval_g);
754
755         return (0);
756 }
757
758 void module_register (void)
759 {
760         plugin_register_config ("unixsock", us_config,
761                         config_keys, config_keys_num);
762         plugin_register_init ("unixsock", us_init);
763         plugin_register_write ("unixsock", us_write);
764         plugin_register_shutdown ("unixsock", us_shutdown);
765 } /* void module_register (void) */
766
767 /* vim: set sw=4 ts=4 sts=4 tw=78 : */