rrdtool plugin: Use the AVL-tree to implement the cache.
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26 #include "utils_debug.h"
27
28 /*
29  * This weird macro cascade forces the glibc to define `NAN'. I don't know
30  * another way to solve this, so more intelligent solutions are welcome. -octo
31  */
32 #ifndef __USE_ISOC99
33 # define DISABLE__USE_ISOC99 1
34 # define __USE_ISOC99 1
35 #endif
36 #include <math.h>
37 #ifdef DISABLE__USE_ISOC99
38 # undef DISABLE__USE_ISOC99
39 # undef __USE_ISOC99
40 #endif
41
42 /*
43  * Private types
44  */
45 struct rrd_cache_s
46 {
47         int    values_num;
48         char **values;
49         time_t first_value;
50 };
51 typedef struct rrd_cache_s rrd_cache_t;
52
53 /*
54  * Private variables
55  */
56 static int rra_timespans[] =
57 {
58         3600,
59         86400,
60         604800,
61         2678400,
62         31622400,
63         0
64 };
65 static int rra_timespans_num = 5;
66
67 static char *rra_types[] =
68 {
69         "AVERAGE",
70         "MIN",
71         "MAX",
72         NULL
73 };
74 static int rra_types_num = 3;
75
76 static const char *config_keys[] =
77 {
78         "CacheTimeout",
79         "CacheFlush",
80         "DataDir",
81         NULL
82 };
83 static int config_keys_num = 3;
84
85 static char *datadir = NULL;
86
87 static int         cache_timeout = 0;
88 static int         cache_flush_timeout = 0;
89 static time_t      cache_flush_last;
90 static avl_tree_t *cache = NULL;
91
92 /* * * * * * * * * *
93  * WARNING:  Magic *
94  * * * * * * * * * */
95 static int rra_get (char ***ret)
96 {
97         static char **rra_def = NULL;
98         static int rra_num = 0;
99
100         int rra_max = rra_timespans_num * rra_types_num;
101
102         int step;
103         int rows;
104         int span;
105
106         int cdp_num;
107         int cdp_len;
108         int i, j;
109
110         char buffer[64];
111
112         if ((rra_num != 0) && (rra_def != NULL))
113         {
114                 *ret = rra_def;
115                 return (rra_num);
116         }
117
118         if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
119                 return (-1);
120         memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
121
122         step = atoi (COLLECTD_STEP);
123         rows = atoi (COLLECTD_ROWS);
124
125         if ((step <= 0) || (rows <= 0))
126         {
127                 *ret = NULL;
128                 return (-1);
129         }
130
131         cdp_len = 0;
132         for (i = 0; i < rra_timespans_num; i++)
133         {
134                 span = rra_timespans[i];
135
136                 if ((span / step) < rows)
137                         continue;
138
139                 if (cdp_len == 0)
140                         cdp_len = 1;
141                 else
142                         cdp_len = (int) floor (((double) span) / ((double) (rows * step)));
143
144                 cdp_num = (int) ceil (((double) span) / ((double) (cdp_len * step)));
145
146                 for (j = 0; j < rra_types_num; j++)
147                 {
148                         if (rra_num >= rra_max)
149                                 break;
150
151                         if (snprintf (buffer, sizeof(buffer), "RRA:%s:%3.1f:%u:%u",
152                                                 rra_types[j], COLLECTD_XFF,
153                                                 cdp_len, cdp_num) >= sizeof (buffer))
154                         {
155                                 syslog (LOG_ERR, "rra_get: Buffer would have been truncated.");
156                                 continue;
157                         }
158
159                         rra_def[rra_num++] = sstrdup (buffer);
160                 }
161         }
162
163 #if COLLECT_DEBUG
164         DBG ("rra_num = %i", rra_num);
165         for (i = 0; i < rra_num; i++)
166                 DBG ("  %s", rra_def[i]);
167 #endif
168
169         *ret = rra_def;
170         return (rra_num);
171 }
172
173 static void ds_free (int ds_num, char **ds_def)
174 {
175         int i;
176
177         for (i = 0; i < ds_num; i++)
178                 if (ds_def[i] != NULL)
179                         free (ds_def[i]);
180         free (ds_def);
181 }
182
183 static int ds_get (char ***ret, const data_set_t *ds)
184 {
185         char **ds_def;
186         int ds_num;
187
188         char min[32];
189         char max[32];
190         char buffer[128];
191
192         DBG ("ds->ds_num = %i", ds->ds_num);
193
194         ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
195         if (ds_def == NULL)
196         {
197                 syslog (LOG_ERR, "rrdtool plugin: malloc failed: %s",
198                                 strerror (errno));
199                 return (-1);
200         }
201         memset (ds_def, '\0', ds->ds_num * sizeof (char *));
202
203         for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
204         {
205                 data_source_t *d = ds->ds + ds_num;
206                 char *type;
207                 int status;
208
209                 ds_def[ds_num] = NULL;
210
211                 if (d->type == DS_TYPE_COUNTER)
212                         type = "COUNTER";
213                 else if (d->type == DS_TYPE_GAUGE)
214                         type = "GAUGE";
215                 else
216                 {
217                         syslog (LOG_ERR, "rrdtool plugin: Unknown DS type: %i",
218                                         d->type);
219                         break;
220                 }
221
222                 if (d->min == NAN)
223                 {
224                         strcpy (min, "U");
225                 }
226                 else
227                 {
228                         snprintf (min, sizeof (min), "%lf", d->min);
229                         min[sizeof (min) - 1] = '\0';
230                 }
231
232                 if (d->max == NAN)
233                 {
234                         strcpy (max, "U");
235                 }
236                 else
237                 {
238                         snprintf (max, sizeof (max), "%lf", d->max);
239                         max[sizeof (max) - 1] = '\0';
240                 }
241
242                 status = snprintf (buffer, sizeof (buffer),
243                                 "DS:%s:%s:%s:%s:%s",
244                                 d->name, type, COLLECTD_HEARTBEAT,
245                                 min, max);
246                 if ((status < 1) || (status >= sizeof (buffer)))
247                         break;
248
249                 ds_def[ds_num] = sstrdup (buffer);
250         } /* for ds_num = 0 .. ds->ds_num */
251
252 #if COLLECT_DEBUG
253 {
254         int i;
255         DBG ("ds_num = %i", ds_num);
256         for (i = 0; i < ds_num; i++)
257                 DBG ("  %s", ds_def[i]);
258 }
259 #endif
260
261         if (ds_num != ds->ds_num)
262         {
263                 ds_free (ds_num, ds_def);
264                 return (-1);
265         }
266
267         *ret = ds_def;
268         return (ds_num);
269 }
270
271 static int rrd_create_file (char *filename, const data_set_t *ds)
272 {
273         char **argv;
274         int argc;
275         char **rra_def;
276         int rra_num;
277         char **ds_def;
278         int ds_num;
279         int i, j;
280         int status = 0;
281
282         if (check_create_dir (filename))
283                 return (-1);
284
285         if ((rra_num = rra_get (&rra_def)) < 1)
286         {
287                 syslog (LOG_ERR, "rrd_create_file failed: Could not calculate RRAs");
288                 return (-1);
289         }
290
291         if ((ds_num = ds_get (&ds_def, ds)) < 1)
292         {
293                 syslog (LOG_ERR, "rrd_create_file failed: Could not calculate DSes");
294                 return (-1);
295         }
296
297         argc = ds_num + rra_num + 4;
298
299         if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
300         {
301                 syslog (LOG_ERR, "rrd_create failed: %s", strerror (errno));
302                 return (-1);
303         }
304
305         argv[0] = "create";
306         argv[1] = filename;
307         argv[2] = "-s";
308         argv[3] = COLLECTD_STEP;
309
310         j = 4;
311         for (i = 0; i < ds_num; i++)
312                 argv[j++] = ds_def[i];
313         for (i = 0; i < rra_num; i++)
314                 argv[j++] = rra_def[i];
315         argv[j] = NULL;
316
317         optind = 0; /* bug in librrd? */
318         rrd_clear_error ();
319         if (rrd_create (argc, argv) == -1)
320         {
321                 syslog (LOG_ERR, "rrd_create failed: %s: %s", filename, rrd_get_error ());
322                 status = -1;
323         }
324
325         free (argv);
326         ds_free (ds_num, ds_def);
327
328         return (status);
329 }
330
331 static int value_list_to_string (char *buffer, int buffer_len,
332                 const data_set_t *ds, const value_list_t *vl)
333 {
334         int offset;
335         int status;
336         int i;
337
338         memset (buffer, '\0', sizeof (buffer_len));
339
340         status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
341         if ((status < 1) || (status >= buffer_len))
342                 return (-1);
343         offset = status;
344
345         for (i = 0; i < ds->ds_num; i++)
346         {
347                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
348                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
349                         return (-1);
350
351                 if (ds->ds[i].type == DS_TYPE_COUNTER)
352                         status = snprintf (buffer + offset, buffer_len - offset,
353                                         ":%llu", vl->values[i].counter);
354                 else
355                         status = snprintf (buffer + offset, buffer_len - offset,
356                                         ":%lf", vl->values[i].gauge);
357
358                 if ((status < 1) || (status >= (buffer_len - offset)))
359                         return (-1);
360
361                 offset += status;
362         } /* for ds->ds_num */
363
364         return (0);
365 } /* int value_list_to_string */
366
367 static int value_list_to_filename (char *buffer, int buffer_len,
368                 const data_set_t *ds, const value_list_t *vl)
369 {
370         int offset = 0;
371         int status;
372
373         if (datadir != NULL)
374         {
375                 status = snprintf (buffer + offset, buffer_len - offset,
376                                 "%s/", datadir);
377                 if ((status < 1) || (status >= buffer_len - offset))
378                         return (-1);
379                 offset += status;
380         }
381
382         status = snprintf (buffer + offset, buffer_len - offset,
383                         "%s/", vl->host);
384         if ((status < 1) || (status >= buffer_len - offset))
385                 return (-1);
386         offset += status;
387
388         if (strlen (vl->plugin_instance) > 0)
389                 status = snprintf (buffer + offset, buffer_len - offset,
390                                 "%s-%s/", vl->plugin, vl->plugin_instance);
391         else
392                 status = snprintf (buffer + offset, buffer_len - offset,
393                                 "%s/", vl->plugin);
394         if ((status < 1) || (status >= buffer_len - offset))
395                 return (-1);
396         offset += status;
397
398         if (strlen (vl->type_instance) > 0)
399                 status = snprintf (buffer + offset, buffer_len - offset,
400                                 "%s-%s.rrd", ds->type, vl->type_instance);
401         else
402                 status = snprintf (buffer + offset, buffer_len - offset,
403                                 "%s.rrd", ds->type);
404         if ((status < 1) || (status >= buffer_len - offset))
405                 return (-1);
406         offset += status;
407
408         return (0);
409 } /* int value_list_to_filename */
410
411 static rrd_cache_t *rrd_cache_insert (const char *filename,
412                 const char *value)
413 {
414         rrd_cache_t *rc = NULL;
415
416         if (cache != NULL)
417                 avl_get (cache, filename, (void *) &rc);
418
419         if (rc == NULL)
420         {
421                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
422                 if (rc == NULL)
423                         return (NULL);
424                 rc->values_num = 0;
425                 rc->values = NULL;
426                 rc->first_value = 0;
427         }
428
429         rc->values = (char **) realloc ((void *) rc->values,
430                         (rc->values_num + 1) * sizeof (char *));
431         if (rc->values == NULL)
432         {
433                 syslog (LOG_ERR, "rrdtool plugin: realloc failed: %s",
434                                 strerror (errno));
435                 if (cache != NULL)
436                 {
437                         void *cache_key = NULL;
438                         avl_remove (cache, filename, &cache_key, NULL);
439                         sfree (cache_key);
440                 }
441                 free (rc);
442                 return (NULL);
443         }
444
445         rc->values[rc->values_num] = strdup (value);
446         if (rc->values[rc->values_num] != NULL)
447                 rc->values_num++;
448
449         if (rc->values_num == 1)
450                 rc->first_value = time (NULL);
451
452         /* Insert if this is the first value */
453         if ((cache != NULL) && (rc->values_num == 1))
454         {
455                 void *cache_key = strdup (filename);
456
457                 if (cache_key == NULL)
458                 {
459                         syslog (LOG_ERR, "rrdtool plugin: strdup failed: %s",
460                                         strerror (errno));
461                         sfree (rc->values);
462                         sfree (rc);
463                         return (NULL);
464                 }
465
466                 avl_insert (cache, cache_key, rc);
467         }
468
469         DBG ("rrd_cache_insert (%s, %s) = %p", filename, value, (void *) rc);
470
471         return (rc);
472 } /* rrd_cache_t *rrd_cache_insert */
473
474 static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc)
475 {
476         char **argv;
477         int    argc;
478
479         char *fn;
480         int status;
481
482         argc = rc->values_num + 2;
483         argv = (char **) malloc ((argc + 1) * sizeof (char *));
484         if (argv == NULL)
485                 return (-1);
486
487         fn = strdup (filename);
488         if (fn == NULL)
489         {
490                 free (argv);
491                 return (-1);
492         }
493
494         argv[0] = "update";
495         argv[1] = fn;
496         memcpy (argv + 2, rc->values, rc->values_num * sizeof (char *));
497         argv[argc] = NULL;
498
499         DBG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
500
501         optind = 0; /* bug in librrd? */
502         rrd_clear_error ();
503         status = rrd_update (argc, argv);
504
505         free (argv);
506         free (fn);
507
508         free (rc->values);
509         rc->values = NULL;
510         rc->values_num = 0;
511
512         if (status != 0)
513         {
514                 syslog (LOG_WARNING, "rrd_update failed: %s: %s",
515                                 filename, rrd_get_error ());
516                 return (-1);
517         }
518
519         return (0);
520 } /* int rrd_update_file */
521
522 static void rrd_cache_flush (int timeout)
523 {
524         rrd_cache_t *rc;
525         time_t       now;
526
527         char **keys = NULL;
528         int    keys_num = 0;
529
530         char *key;
531         avl_iterator_t *iter;
532         int i;
533
534         if (cache == NULL)
535                 return;
536
537         DBG ("Flushing cache, timeout = %i", timeout);
538
539         now = time (NULL);
540
541         /* Build a list of entries to be flushed */
542         iter = avl_get_iterator (cache);
543         while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
544         {
545                 DBG ("key = %s; age = %i;", key, now - rc->first_value);
546                 if ((now - rc->first_value) >= timeout)
547                 {
548                         keys = (char **) realloc ((void *) keys,
549                                         (keys_num + 1) * sizeof (char *));
550                         if (keys == NULL)
551                         {
552                                 DBG ("realloc failed: %s", strerror (errno));
553                                 syslog (LOG_ERR, "rrdtool plugin: "
554                                                 "realloc failed: %s",
555                                                 strerror (errno));
556                                 avl_iterator_destroy (iter);
557                                 return;
558                         }
559                         keys[keys_num] = key;
560                         keys_num++;
561                 }
562         } /* while (avl_iterator_next) */
563         avl_iterator_destroy (iter);
564         
565         for (i = 0; i < keys_num; i++)
566         {
567                 if (avl_remove (cache, keys[i], NULL, (void *) &rc) != 0)
568                 {
569                         DBG ("avl_remove (%s) failed.", keys[i]);
570                         continue;
571                 }
572
573                 rrd_write_cache_entry (keys[i], rc);
574                 sfree (keys[i]); keys[i] = NULL;
575                 sfree (rc->values);
576                 sfree (rc);
577         } /* for (i = 0..keys_num) */
578
579         free (keys);
580         DBG ("Flushed %i value(s)", keys_num);
581
582         cache_flush_last = now;
583 } /* void rrd_cache_flush */
584
585 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
586 {
587         struct stat  statbuf;
588         char         filename[512];
589         char         values[512];
590         rrd_cache_t *rc;
591         time_t       now;
592
593         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
594                 return (-1);
595
596         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
597                 return (-1);
598
599         if (stat (filename, &statbuf) == -1)
600         {
601                 if (errno == ENOENT)
602                 {
603                         if (rrd_create_file (filename, ds))
604                                 return (-1);
605                 }
606                 else
607                 {
608                         syslog (LOG_ERR, "stat(%s) failed: %s",
609                                         filename, strerror (errno));
610                         return (-1);
611                 }
612         }
613         else if (!S_ISREG (statbuf.st_mode))
614         {
615                 syslog (LOG_ERR, "stat(%s): Not a regular file!",
616                                 filename);
617                 return (-1);
618         }
619
620         rc = rrd_cache_insert (filename, values);
621         if (rc == NULL)
622                 return (-1);
623
624         if (cache == NULL)
625         {
626                 rrd_write_cache_entry (filename, rc);
627                 free (rc->values);
628                 free (rc);
629                 return (0);
630         }
631
632         now = time (NULL);
633
634         DBG ("age (%s) = %i", filename, now - rc->first_value);
635
636         if ((now - rc->first_value) >= cache_timeout)
637                 rrd_write_cache_entry (filename, rc);
638
639         if ((now - cache_flush_last) >= cache_flush_timeout)
640         {
641                 rrd_cache_flush (cache_flush_timeout);
642         }
643
644         return (0);
645 } /* int rrd_write */
646
647 static int rrd_config (const char *key, const char *val)
648 {
649         if (strcasecmp ("CacheTimeout", key) == 0)
650         {
651                 int tmp = atoi (val);
652                 if (tmp < 0)
653                 {
654                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
655                                         "be greater than 0.\n");
656                         return (1);
657                 }
658                 cache_timeout = tmp;
659         }
660         else if (strcasecmp ("CacheFlush", key) == 0)
661         {
662                 int tmp = atoi (val);
663                 if (tmp < 0)
664                 {
665                         fprintf (stderr, "rrdtool: `CacheFlush' must "
666                                         "be greater than 0.\n");
667                         return (1);
668                 }
669                 cache_flush_timeout = tmp;
670         }
671         else if (strcasecmp ("DataDir", key) == 0)
672         {
673                 if (datadir != NULL)
674                         free (datadir);
675                 datadir = strdup (val);
676                 if (datadir != NULL)
677                 {
678                         int len = strlen (datadir);
679                         while ((len > 0) && (datadir[len - 1] == '/'))
680                         {
681                                 len--;
682                                 datadir[len] = '\0';
683                         }
684                         if (len <= 0)
685                         {
686                                 free (datadir);
687                                 datadir = NULL;
688                         }
689                 }
690         }
691         else
692         {
693                 return (-1);
694         }
695         return (0);
696 } /* int rrd_config */
697
698 static int rrd_shutdown (void)
699 {
700         rrd_cache_flush (-1);
701
702         return (0);
703 } /* int rrd_shutdown */
704
705 static int rrd_init (void)
706 {
707         if (cache_timeout < 2)
708         {
709                 cache_timeout = 0;
710                 cache_flush_timeout = 0;
711         }
712         else
713         {
714                 if (cache_flush_timeout < cache_timeout)
715                         cache_flush_timeout = 10 * cache_timeout;
716
717                 cache = avl_create ((int (*) (const void *, const void *)) strcmp);
718                 cache_flush_last = time (NULL);
719                 plugin_register_shutdown ("rrdtool", rrd_shutdown);
720         }
721         return (0);
722 } /* int rrd_init */
723
724 void module_register (void)
725 {
726         plugin_register_config ("rrdtool", rrd_config,
727                         config_keys, config_keys_num);
728         plugin_register_init ("rrdtool", rrd_init);
729         plugin_register_write ("rrdtool", rrd_write);
730 }