aggregation plugin: Change the reported plugin name to "aggregation".
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
34
35 #include <pthread.h>
36
37 struct aggregation_s /* {{{ */
38 {
39   identifier_t ident;
40
41   _Bool calc_num;
42   _Bool calc_sum;
43   _Bool calc_average;
44   _Bool calc_min;
45   _Bool calc_max;
46   _Bool calc_stddev;
47 }; /* }}} */
48 typedef struct aggregation_s aggregation_t;
49
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
53 {
54   pthread_mutex_t lock;
55   identifier_t ident;
56
57   int ds_type;
58
59   derive_t num;
60   gauge_t sum;
61   gauge_t squares_sum;
62
63   gauge_t min;
64   gauge_t max;
65
66   rate_to_value_state_t *state_num;
67   rate_to_value_state_t *state_sum;
68   rate_to_value_state_t *state_average;
69   rate_to_value_state_t *state_min;
70   rate_to_value_state_t *state_max;
71   rate_to_value_state_t *state_stddev;
72
73   agg_instance_t *next;
74 }; /* }}} */
75
76 static lookup_t *lookup = NULL;
77
78 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
79 static agg_instance_t *agg_instance_list_head = NULL;
80
81 static void agg_destroy (aggregation_t *agg) /* {{{ */
82 {
83   sfree (agg);
84 } /* }}} void agg_destroy */
85
86 /* Frees all dynamically allocated memory within the instance. */
87 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
88 {
89   if (inst == NULL)
90     return;
91
92   /* Remove this instance from the global list of instances. */
93   pthread_mutex_lock (&agg_instance_list_lock);
94   if (agg_instance_list_head == inst)
95     agg_instance_list_head = inst->next;
96   else if (agg_instance_list_head != NULL)
97   {
98     agg_instance_t *prev = agg_instance_list_head;
99     while ((prev != NULL) && (prev->next != inst))
100       prev = prev->next;
101     if (prev != NULL)
102       prev->next = inst->next;
103   }
104   pthread_mutex_unlock (&agg_instance_list_lock);
105
106   sfree (inst->state_num);
107   sfree (inst->state_sum);
108   sfree (inst->state_average);
109   sfree (inst->state_min);
110   sfree (inst->state_max);
111   sfree (inst->state_stddev);
112
113   memset (inst, 0, sizeof (*inst));
114   inst->ds_type = -1;
115   inst->min = NAN;
116   inst->max = NAN;
117 } /* }}} void agg_instance_destroy */
118
119 /* Create a new aggregation instance. */
120 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
121     value_list_t const *vl, aggregation_t *agg)
122 {
123   agg_instance_t *inst;
124
125   DEBUG ("aggregation plugin: Creating new instance.");
126
127   inst = malloc (sizeof (*inst));
128   if (inst == NULL)
129   {
130     ERROR ("aggregation plugin: malloc() failed.");
131     return (NULL);
132   }
133   memset (inst, 0, sizeof (*inst));
134   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
135
136   inst->ds_type = ds->ds[0].type;
137
138 #define COPY_FIELD(fld) do { \
139   sstrncpy (inst->ident.fld, \
140       LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
141       sizeof (inst->ident.fld)); \
142 } while (0)
143
144   COPY_FIELD (host);
145   COPY_FIELD (plugin);
146   COPY_FIELD (plugin_instance);
147   COPY_FIELD (type);
148   COPY_FIELD (type_instance);
149
150 #undef COPY_FIELD
151
152   inst->min = NAN;
153   inst->max = NAN;
154
155 #define INIT_STATE(field) do { \
156   inst->state_ ## field = NULL; \
157   if (agg->calc_ ## field) { \
158     inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
159     if (inst->state_ ## field == NULL) { \
160       agg_instance_destroy (inst); \
161       ERROR ("aggregation plugin: malloc() failed."); \
162       return (NULL); \
163     } \
164     memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
165   } \
166 } while (0)
167
168   INIT_STATE (num);
169   INIT_STATE (sum);
170   INIT_STATE (average);
171   INIT_STATE (min);
172   INIT_STATE (max);
173   INIT_STATE (stddev);
174
175 #undef INIT_STATE
176
177   pthread_mutex_lock (&agg_instance_list_lock);
178   inst->next = agg_instance_list_head;
179   agg_instance_list_head = inst;
180   pthread_mutex_unlock (&agg_instance_list_lock);
181
182   return (inst);
183 } /* }}} agg_instance_t *agg_instance_create */
184
185 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
186  * the rate of the value list is available. Value lists with more than one data
187  * source are not supported and will return an error. Returns zero on success
188  * and non-zero otherwise. */
189 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
190     data_set_t const *ds, value_list_t const *vl)
191 {
192   gauge_t *rate;
193
194   if (ds->ds_num != 1)
195     return (-1);
196
197   rate = uc_get_rate (ds, vl);
198   if (rate == NULL)
199   {
200     ERROR ("aggregation plugin: uc_get_rate() failed.");
201     return (-1);
202   }
203
204   if (isnan (rate[0]))
205   {
206     sfree (rate);
207     return (0);
208   }
209
210   pthread_mutex_lock (&inst->lock);
211
212   inst->num++;
213   inst->sum += rate[0];
214   inst->squares_sum += (rate[0] * rate[0]);
215
216   if (isnan (inst->min) || (inst->min > rate[0]))
217     inst->min = rate[0];
218   if (isnan (inst->max) || (inst->max < rate[0]))
219     inst->max = rate[0];
220
221   pthread_mutex_unlock (&inst->lock);
222
223   sfree (rate);
224   return (0);
225 } /* }}} int agg_instance_update */
226
227 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
228   char const *func, gauge_t rate, rate_to_value_state_t *state,
229   value_list_t *vl, char const *pi_prefix, cdtime_t t)
230 {
231   value_t v;
232   int status;
233
234   if (pi_prefix[0] != 0)
235     ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
236         pi_prefix, func);
237   else
238     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
239
240   memset (&v, 0, sizeof (v));
241   status = rate_to_value (&v, rate, state, inst->ds_type, t);
242   if (status != 0)
243   {
244     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
245         status);
246     return (-1);
247   }
248
249   vl->values = &v;
250   vl->values_len = 1;
251
252   plugin_dispatch_values_secure (vl);
253
254   vl->values = NULL;
255   vl->values_len = 0;
256
257   return (0);
258 } /* }}} int agg_instance_read_func */
259
260 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
261 {
262   value_list_t vl = VALUE_LIST_INIT;
263   char pi_prefix[DATA_MAX_NAME_LEN];
264
265   /* Pre-set all the fields in the value list that will not change per
266    * aggregation type (sum, average, ...). The struct will be re-used and must
267    * therefore be dispatched using the "secure" function. */
268
269   vl.time = t;
270   vl.interval = 0;
271
272   vl.meta = meta_data_create ();
273   if (vl.meta == NULL)
274   {
275     ERROR ("aggregation plugin: meta_data_create failed.");
276     return (-1);
277   }
278   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
279
280   if (LU_IS_ALL (inst->ident.host))
281     sstrncpy (vl.host, "global", sizeof (vl.host));
282   else
283     sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
284
285   sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
286
287   if (LU_IS_ALL (inst->ident.plugin))
288   {
289     if (LU_IS_ALL (inst->ident.plugin_instance))
290       sstrncpy (pi_prefix, "", sizeof (pi_prefix));
291     else
292       sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
293   }
294   else
295   {
296     if (LU_IS_ALL (inst->ident.plugin_instance))
297       sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
298     else
299       ssnprintf (pi_prefix, sizeof (pi_prefix),
300           "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
301   }
302
303   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
304
305   if (!LU_IS_ALL (inst->ident.type_instance))
306     sstrncpy (vl.type_instance, inst->ident.type_instance,
307         sizeof (vl.type_instance));
308
309 #define READ_FUNC(func, rate) do { \
310   if (inst->state_ ## func != NULL) { \
311     agg_instance_read_func (inst, #func, rate, \
312         inst->state_ ## func, &vl, pi_prefix, t); \
313   } \
314 } while (0)
315
316   pthread_mutex_lock (&inst->lock);
317
318   READ_FUNC (num, (gauge_t) inst->num);
319
320   /* All other aggregations are only defined when there have been any values
321    * at all. */
322   if (inst->num > 0)
323   {
324     READ_FUNC (sum, inst->sum);
325     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
326     READ_FUNC (min, inst->min);
327     READ_FUNC (max, inst->max);
328     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
329           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
330   }
331
332   /* Reset internal state. */
333   inst->num = 0;
334   inst->sum = 0.0;
335   inst->squares_sum = 0.0;
336   inst->min = NAN;
337   inst->max = NAN;
338
339   pthread_mutex_unlock (&inst->lock);
340
341   meta_data_destroy (vl.meta);
342   vl.meta = NULL;
343
344   return (0);
345 } /* }}} int agg_instance_read */
346
347 /* lookup_class_callback_t for utils_vl_lookup */
348 static void *agg_lookup_class_callback ( /* {{{ */
349     __attribute__((unused)) data_set_t const *ds,
350     value_list_t const *vl, void *user_class)
351 {
352   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
353 } /* }}} void *agg_class_callback */
354
355 /* lookup_obj_callback_t for utils_vl_lookup */
356 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
357     value_list_t const *vl,
358     __attribute__((unused)) void *user_class,
359     void *user_obj)
360 {
361   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
362 } /* }}} int agg_lookup_obj_callback */
363
364 /* lookup_free_class_callback_t for utils_vl_lookup */
365 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
366 {
367   agg_destroy ((aggregation_t *) user_class);
368 } /* }}} void agg_lookup_free_class_callback */
369
370 /* lookup_free_obj_callback_t for utils_vl_lookup */
371 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
372 {
373   agg_instance_destroy ((agg_instance_t *) user_obj);
374 } /* }}} void agg_lookup_free_obj_callback */
375
376 /*
377  * <Plugin "aggregation">
378  *   <Aggregation>
379  *     Host "/any/"
380  *     Plugin "cpu"
381  *     PluginInstance "/all/"
382  *     Type "cpu"
383  *     TypeInstance "/any/"
384  *
385  *     CalculateNum true
386  *     CalculateSum true
387  *     CalculateAverage true
388  *     CalculateMinimum true
389  *     CalculateMaximum true
390  *     CalculateStddev true
391  *   </Aggregation>
392  * </Plugin>
393  */
394 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
395 {
396   aggregation_t *agg;
397   int status;
398   int i;
399
400   agg = malloc (sizeof (*agg));
401   if (agg == NULL)
402   {
403     ERROR ("aggregation plugin: malloc failed.");
404     return (-1);
405   }
406   memset (agg, 0, sizeof (*agg));
407
408   for (i = 0; i < ci->children_num; i++)
409   {
410     oconfig_item_t *child = ci->children + i;
411
412     if (strcasecmp ("Host", child->key) == 0)
413       cf_util_get_string_buffer (child, agg->ident.host,
414           sizeof (agg->ident.host));
415     else if (strcasecmp ("Plugin", child->key) == 0)
416       cf_util_get_string_buffer (child, agg->ident.plugin,
417           sizeof (agg->ident.plugin));
418     else if (strcasecmp ("PluginInstance", child->key) == 0)
419       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
420           sizeof (agg->ident.plugin_instance));
421     else if (strcasecmp ("Type", child->key) == 0)
422       cf_util_get_string_buffer (child, agg->ident.type,
423           sizeof (agg->ident.type));
424     else if (strcasecmp ("TypeInstance", child->key) == 0)
425       cf_util_get_string_buffer (child, agg->ident.type_instance,
426           sizeof (agg->ident.type_instance));
427     else if (strcasecmp ("CalculateNum", child->key) == 0)
428       cf_util_get_boolean (child, &agg->calc_num);
429     else if (strcasecmp ("CalculateSum", child->key) == 0)
430       cf_util_get_boolean (child, &agg->calc_sum);
431     else if (strcasecmp ("CalculateAverage", child->key) == 0)
432       cf_util_get_boolean (child, &agg->calc_average);
433     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
434       cf_util_get_boolean (child, &agg->calc_min);
435     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
436       cf_util_get_boolean (child, &agg->calc_max);
437     else if (strcasecmp ("CalculateStddev", child->key) == 0)
438       cf_util_get_boolean (child, &agg->calc_stddev);
439     else
440       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
441           "<Aggregation /> blocks and will be ignored.", child->key);
442   }
443
444   /* TODO(octo): Check identifier:
445    * - At least one wildcard.
446    * - Type is set.
447    */
448
449   status = lookup_add (lookup, &agg->ident, agg);
450   if (status != 0)
451   {
452     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
453     sfree (agg);
454     return (-1);
455   }
456
457   return (0);
458 } /* }}} int agg_config_aggregation */
459
460 static int agg_config (oconfig_item_t *ci) /* {{{ */
461 {
462   int i;
463
464   if (lookup == NULL)
465   {
466     lookup = lookup_create (agg_lookup_class_callback,
467         agg_lookup_obj_callback,
468         agg_lookup_free_class_callback,
469         agg_lookup_free_obj_callback);
470     if (lookup == NULL)
471     {
472       ERROR ("aggregation plugin: lookup_create failed.");
473       return (-1);
474     }
475   }
476
477   for (i = 0; i < ci->children_num; i++)
478   {
479     oconfig_item_t *child = ci->children + i;
480
481     if (strcasecmp ("Aggregation", child->key) == 0)
482       agg_config_aggregation (child);
483     else
484       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
485           "<Plugin aggregation /> blocks and will be ignored.", child->key);
486   }
487
488   return (0);
489 } /* }}} int agg_config */
490
491 static int agg_read (void) /* {{{ */
492 {
493   agg_instance_t *this;
494   cdtime_t t;
495   int success;
496
497   t = cdtime ();
498   success = 0;
499
500   pthread_mutex_lock (&agg_instance_list_lock);
501
502   for (this = agg_instance_list_head; this != NULL; this = this->next)
503   {
504     int status;
505
506     status = agg_instance_read (this, t);
507     if (status != 0)
508       WARNING ("aggregation plugin: Reading an aggregation instance "
509           "failed with status %i.", status);
510     else
511       success++;
512   }
513   pthread_mutex_unlock (&agg_instance_list_lock);
514
515   return ((success > 0) ? 0 : -1);
516 } /* }}} int agg_read */
517
518 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
519     __attribute__((unused)) user_data_t *user_data)
520 {
521   _Bool created_by_aggregation = 0;
522   int status;
523
524   /* Ignore values that were created by the aggregation plugin to avoid weird
525    * effects. */
526   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
527       &created_by_aggregation);
528   if (created_by_aggregation)
529     return (0);
530
531   if (lookup == NULL)
532     status = ENOENT;
533   else
534   {
535     status = lookup_search (lookup, ds, vl);
536     if (status > 0)
537       status = 0;
538   }
539
540   return (status);
541 } /* }}} int agg_write */
542
543 void module_register (void)
544 {
545   plugin_register_complex_config ("aggregation", agg_config);
546   plugin_register_read ("aggregation", agg_read);
547   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
548 }
549
550 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */