2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
24 * Florian Forster <octo at collectd.org>
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
37 struct aggregation_s /* {{{ */
48 typedef struct aggregation_s aggregation_t;
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
66 rate_to_value_state_t *state_num;
67 rate_to_value_state_t *state_sum;
68 rate_to_value_state_t *state_average;
69 rate_to_value_state_t *state_min;
70 rate_to_value_state_t *state_max;
71 rate_to_value_state_t *state_stddev;
76 static lookup_t *lookup = NULL;
78 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
79 static agg_instance_t *agg_instance_list_head = NULL;
81 static void agg_destroy (aggregation_t *agg) /* {{{ */
84 } /* }}} void agg_destroy */
86 /* Frees all dynamically allocated memory within the instance. */
87 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
92 /* Remove this instance from the global list of instances. */
93 pthread_mutex_lock (&agg_instance_list_lock);
94 if (agg_instance_list_head == inst)
95 agg_instance_list_head = inst->next;
96 else if (agg_instance_list_head != NULL)
98 agg_instance_t *prev = agg_instance_list_head;
99 while ((prev != NULL) && (prev->next != inst))
102 prev->next = inst->next;
104 pthread_mutex_unlock (&agg_instance_list_lock);
106 sfree (inst->state_num);
107 sfree (inst->state_sum);
108 sfree (inst->state_average);
109 sfree (inst->state_min);
110 sfree (inst->state_max);
111 sfree (inst->state_stddev);
113 memset (inst, 0, sizeof (*inst));
117 } /* }}} void agg_instance_destroy */
119 /* Create a new aggregation instance. */
120 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
121 value_list_t const *vl, aggregation_t *agg)
123 agg_instance_t *inst;
125 DEBUG ("aggregation plugin: Creating new instance.");
127 inst = malloc (sizeof (*inst));
130 ERROR ("aggregation plugin: malloc() failed.");
133 memset (inst, 0, sizeof (*inst));
134 pthread_mutex_init (&inst->lock, /* attr = */ NULL);
136 inst->ds_type = ds->ds[0].type;
138 #define COPY_FIELD(fld) do { \
139 sstrncpy (inst->ident.fld, \
140 LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
141 sizeof (inst->ident.fld)); \
146 COPY_FIELD (plugin_instance);
148 COPY_FIELD (type_instance);
155 #define INIT_STATE(field) do { \
156 inst->state_ ## field = NULL; \
157 if (agg->calc_ ## field) { \
158 inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
159 if (inst->state_ ## field == NULL) { \
160 agg_instance_destroy (inst); \
161 ERROR ("aggregation plugin: malloc() failed."); \
164 memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
170 INIT_STATE (average);
177 pthread_mutex_lock (&agg_instance_list_lock);
178 inst->next = agg_instance_list_head;
179 agg_instance_list_head = inst;
180 pthread_mutex_unlock (&agg_instance_list_lock);
183 } /* }}} agg_instance_t *agg_instance_create */
185 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
186 * the rate of the value list is available. Value lists with more than one data
187 * source are not supported and will return an error. Returns zero on success
188 * and non-zero otherwise. */
189 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
190 data_set_t const *ds, value_list_t const *vl)
197 rate = uc_get_rate (ds, vl);
200 ERROR ("aggregation plugin: uc_get_rate() failed.");
210 pthread_mutex_lock (&inst->lock);
213 inst->sum += rate[0];
214 inst->squares_sum += (rate[0] * rate[0]);
216 if (isnan (inst->min) || (inst->min > rate[0]))
218 if (isnan (inst->max) || (inst->max < rate[0]))
221 pthread_mutex_unlock (&inst->lock);
225 } /* }}} int agg_instance_update */
227 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
228 char const *func, gauge_t rate, rate_to_value_state_t *state,
229 value_list_t *vl, char const *pi_prefix, cdtime_t t)
234 if (pi_prefix[0] != 0)
235 ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
238 sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
240 memset (&v, 0, sizeof (v));
241 status = rate_to_value (&v, rate, state, inst->ds_type, t);
244 WARNING ("aggregation plugin: rate_to_value failed with status %i.",
252 plugin_dispatch_values_secure (vl);
258 } /* }}} int agg_instance_read_func */
260 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
262 value_list_t vl = VALUE_LIST_INIT;
263 char pi_prefix[DATA_MAX_NAME_LEN];
265 /* Pre-set all the fields in the value list that will not change per
266 * aggregation type (sum, average, ...). The struct will be re-used and must
267 * therefore be dispatched using the "secure" function. */
272 vl.meta = meta_data_create ();
275 ERROR ("aggregation plugin: meta_data_create failed.");
278 meta_data_add_boolean (vl.meta, "aggregation:created", 1);
280 if (LU_IS_ALL (inst->ident.host))
281 sstrncpy (vl.host, "global", sizeof (vl.host));
283 sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
285 sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
287 if (LU_IS_ALL (inst->ident.plugin))
289 if (LU_IS_ALL (inst->ident.plugin_instance))
290 sstrncpy (pi_prefix, "", sizeof (pi_prefix));
292 sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
296 if (LU_IS_ALL (inst->ident.plugin_instance))
297 sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
299 ssnprintf (pi_prefix, sizeof (pi_prefix),
300 "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
303 sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
305 if (!LU_IS_ALL (inst->ident.type_instance))
306 sstrncpy (vl.type_instance, inst->ident.type_instance,
307 sizeof (vl.type_instance));
309 #define READ_FUNC(func, rate) do { \
310 if (inst->state_ ## func != NULL) { \
311 agg_instance_read_func (inst, #func, rate, \
312 inst->state_ ## func, &vl, pi_prefix, t); \
316 pthread_mutex_lock (&inst->lock);
318 READ_FUNC (num, (gauge_t) inst->num);
320 /* All other aggregations are only defined when there have been any values
324 READ_FUNC (sum, inst->sum);
325 READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
326 READ_FUNC (min, inst->min);
327 READ_FUNC (max, inst->max);
328 READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
329 - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
332 /* Reset internal state. */
335 inst->squares_sum = 0.0;
339 pthread_mutex_unlock (&inst->lock);
341 meta_data_destroy (vl.meta);
345 } /* }}} int agg_instance_read */
347 /* lookup_class_callback_t for utils_vl_lookup */
348 static void *agg_lookup_class_callback ( /* {{{ */
349 __attribute__((unused)) data_set_t const *ds,
350 value_list_t const *vl, void *user_class)
352 return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
353 } /* }}} void *agg_class_callback */
355 /* lookup_obj_callback_t for utils_vl_lookup */
356 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
357 value_list_t const *vl,
358 __attribute__((unused)) void *user_class,
361 return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
362 } /* }}} int agg_lookup_obj_callback */
364 /* lookup_free_class_callback_t for utils_vl_lookup */
365 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
367 agg_destroy ((aggregation_t *) user_class);
368 } /* }}} void agg_lookup_free_class_callback */
370 /* lookup_free_obj_callback_t for utils_vl_lookup */
371 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
373 agg_instance_destroy ((agg_instance_t *) user_obj);
374 } /* }}} void agg_lookup_free_obj_callback */
377 * <Plugin "aggregation">
381 * PluginInstance "/all/"
383 * TypeInstance "/any/"
387 * CalculateAverage true
388 * CalculateMinimum true
389 * CalculateMaximum true
390 * CalculateStddev true
394 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
401 agg = malloc (sizeof (*agg));
404 ERROR ("aggregation plugin: malloc failed.");
407 memset (agg, 0, sizeof (*agg));
409 for (i = 0; i < ci->children_num; i++)
411 oconfig_item_t *child = ci->children + i;
413 if (strcasecmp ("Host", child->key) == 0)
414 cf_util_get_string_buffer (child, agg->ident.host,
415 sizeof (agg->ident.host));
416 else if (strcasecmp ("Plugin", child->key) == 0)
417 cf_util_get_string_buffer (child, agg->ident.plugin,
418 sizeof (agg->ident.plugin));
419 else if (strcasecmp ("PluginInstance", child->key) == 0)
420 cf_util_get_string_buffer (child, agg->ident.plugin_instance,
421 sizeof (agg->ident.plugin_instance));
422 else if (strcasecmp ("Type", child->key) == 0)
423 cf_util_get_string_buffer (child, agg->ident.type,
424 sizeof (agg->ident.type));
425 else if (strcasecmp ("TypeInstance", child->key) == 0)
426 cf_util_get_string_buffer (child, agg->ident.type_instance,
427 sizeof (agg->ident.type_instance));
428 else if (strcasecmp ("CalculateNum", child->key) == 0)
429 cf_util_get_boolean (child, &agg->calc_num);
430 else if (strcasecmp ("CalculateSum", child->key) == 0)
431 cf_util_get_boolean (child, &agg->calc_sum);
432 else if (strcasecmp ("CalculateAverage", child->key) == 0)
433 cf_util_get_boolean (child, &agg->calc_average);
434 else if (strcasecmp ("CalculateMinimum", child->key) == 0)
435 cf_util_get_boolean (child, &agg->calc_min);
436 else if (strcasecmp ("CalculateMaximum", child->key) == 0)
437 cf_util_get_boolean (child, &agg->calc_max);
438 else if (strcasecmp ("CalculateStddev", child->key) == 0)
439 cf_util_get_boolean (child, &agg->calc_stddev);
441 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
442 "<Aggregation /> blocks and will be ignored.", child->key);
445 /* Sanity checking */
447 if (strchr (agg->ident.type, '/') != NULL) /* {{{ */
449 ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
450 "character. Especially, it may not be a wildcard. The current "
451 "value is \"%s\".", agg->ident.type);
455 if (!LU_IS_ALL (agg->ident.host) /* {{{ */
456 && !LU_IS_ALL (agg->ident.plugin)
457 && !LU_IS_ALL (agg->ident.plugin_instance)
458 && !LU_IS_ALL (agg->ident.type_instance))
460 ERROR ("aggregation plugin: An aggregation must contain at least one "
461 "\"/all/\" wildcard. Otherwise, nothing will be aggregated. "
462 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
463 "Type \"%s\", TypeInstance \"%s\")",
464 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
465 agg->ident.type, agg->ident.type_instance);
469 if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
470 && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
472 ERROR ("aggregation plugin: No aggregation function has been specified. "
473 "Without this, I don't know what I should be calculating. "
474 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
475 "Type \"%s\", TypeInstance \"%s\")",
476 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
477 agg->ident.type, agg->ident.type_instance);
481 if (!is_valid) /* {{{ */
487 status = lookup_add (lookup, &agg->ident, agg);
490 ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
496 } /* }}} int agg_config_aggregation */
498 static int agg_config (oconfig_item_t *ci) /* {{{ */
504 lookup = lookup_create (agg_lookup_class_callback,
505 agg_lookup_obj_callback,
506 agg_lookup_free_class_callback,
507 agg_lookup_free_obj_callback);
510 ERROR ("aggregation plugin: lookup_create failed.");
515 for (i = 0; i < ci->children_num; i++)
517 oconfig_item_t *child = ci->children + i;
519 if (strcasecmp ("Aggregation", child->key) == 0)
520 agg_config_aggregation (child);
522 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
523 "<Plugin aggregation /> blocks and will be ignored.", child->key);
527 } /* }}} int agg_config */
529 static int agg_read (void) /* {{{ */
531 agg_instance_t *this;
538 pthread_mutex_lock (&agg_instance_list_lock);
540 for (this = agg_instance_list_head; this != NULL; this = this->next)
544 status = agg_instance_read (this, t);
546 WARNING ("aggregation plugin: Reading an aggregation instance "
547 "failed with status %i.", status);
551 pthread_mutex_unlock (&agg_instance_list_lock);
553 return ((success > 0) ? 0 : -1);
554 } /* }}} int agg_read */
556 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
557 __attribute__((unused)) user_data_t *user_data)
559 _Bool created_by_aggregation = 0;
562 /* Ignore values that were created by the aggregation plugin to avoid weird
564 (void) meta_data_get_boolean (vl->meta, "aggregation:created",
565 &created_by_aggregation);
566 if (created_by_aggregation)
573 status = lookup_search (lookup, ds, vl);
579 } /* }}} int agg_write */
581 void module_register (void)
583 plugin_register_complex_config ("aggregation", agg_config);
584 plugin_register_read ("aggregation", agg_read);
585 plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
588 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */