2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
24 * Florian Forster <octo at collectd.org>
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
37 struct aggregation_s /* {{{ */
48 typedef struct aggregation_s aggregation_t;
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
65 rate_to_value_state_t *state_num;
66 rate_to_value_state_t *state_sum;
67 rate_to_value_state_t *state_average;
68 rate_to_value_state_t *state_min;
69 rate_to_value_state_t *state_max;
70 rate_to_value_state_t *state_stddev;
75 static lookup_t *lookup = NULL;
77 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
78 static agg_instance_t *agg_instance_list_head = NULL;
80 static void agg_destroy (aggregation_t *agg) /* {{{ */
83 } /* }}} void agg_destroy */
85 /* Frees all dynamically allocated memory within the instance. */
86 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
91 /* Remove this instance from the global list of instances. */
92 pthread_mutex_lock (&agg_instance_list_lock);
93 if (agg_instance_list_head == inst)
94 agg_instance_list_head = inst->next;
95 else if (agg_instance_list_head != NULL)
97 agg_instance_t *prev = agg_instance_list_head;
98 while ((prev != NULL) && (prev->next != inst))
101 prev->next = inst->next;
103 pthread_mutex_unlock (&agg_instance_list_lock);
105 sfree (inst->state_num);
106 sfree (inst->state_sum);
107 sfree (inst->state_average);
108 sfree (inst->state_min);
109 sfree (inst->state_max);
110 sfree (inst->state_stddev);
112 memset (inst, 0, sizeof (*inst));
116 } /* }}} void agg_instance_destroy */
118 /* Create a new aggregation instance. */
119 static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
122 agg_instance_t *inst;
124 DEBUG ("aggregation plugin: Creating new instance.");
126 inst = malloc (sizeof (*inst));
129 ERROR ("aggregation plugin: malloc() failed.");
132 memset (inst, 0, sizeof (*inst));
134 #define COPY_FIELD(fld) do { \
135 sstrncpy (inst->ident.fld, \
136 LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
137 sizeof (inst->ident.fld)); \
142 COPY_FIELD (plugin_instance);
144 COPY_FIELD (type_instance);
152 #define INIT_STATE(field) do { \
153 inst->state_ ## field = NULL; \
154 if (agg->calc_ ## field) { \
155 inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
156 if (inst->state_ ## field == NULL) { \
157 agg_instance_destroy (inst); \
158 ERROR ("aggregation plugin: malloc() failed."); \
161 memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
167 INIT_STATE (average);
174 pthread_mutex_lock (&agg_instance_list_lock);
175 inst->next = agg_instance_list_head;
176 agg_instance_list_head = inst;
177 pthread_mutex_unlock (&agg_instance_list_lock);
180 } /* }}} agg_instance_t *agg_instance_create */
182 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
183 * the rate of the value list is available. Value lists with more than one data
184 * source are not supported and will return an error. Returns zero on success
185 * and non-zero otherwise. */
186 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
187 data_set_t const *ds, value_list_t const *vl)
194 rate = uc_get_rate (ds, vl);
197 ERROR ("aggregation plugin: uc_get_rate() failed.");
208 inst->sum += rate[0];
209 inst->squares_sum += (rate[0] * rate[0]);
211 if (isnan (inst->min) || (inst->min > rate[0]))
213 if (isnan (inst->max) || (inst->max < rate[0]))
218 } /* }}} int agg_instance_update */
220 /* lookup_class_callback_t for utils_vl_lookup */
221 static void *agg_lookup_class_callback ( /* {{{ */
222 __attribute__((unused)) data_set_t const *ds,
223 value_list_t const *vl, void *user_class)
225 return (agg_instance_create (vl, (aggregation_t *) user_class));
226 } /* }}} void *agg_class_callback */
228 /* lookup_obj_callback_t for utils_vl_lookup */
229 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
230 value_list_t const *vl,
231 __attribute__((unused)) void *user_class,
234 return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
235 } /* }}} int agg_lookup_obj_callback */
237 /* lookup_free_class_callback_t for utils_vl_lookup */
238 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
240 agg_destroy ((aggregation_t *) user_class);
241 } /* }}} void agg_lookup_free_class_callback */
243 /* lookup_free_obj_callback_t for utils_vl_lookup */
244 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
246 agg_instance_destroy ((agg_instance_t *) user_obj);
247 } /* }}} void agg_lookup_free_obj_callback */
250 * <Plugin "aggregation">
254 * PluginInstance "/all/"
256 * TypeInstance "/any/"
260 * CalculateAverage true
261 * CalculateMinimum true
262 * CalculateMaximum true
263 * CalculateStddev true
267 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
273 agg = malloc (sizeof (*agg));
276 ERROR ("aggregation plugin: malloc failed.");
279 memset (agg, 0, sizeof (*agg));
281 for (i = 0; i < ci->children_num; i++)
283 oconfig_item_t *child = ci->children + i;
285 if (strcasecmp ("Host", child->key) == 0)
286 cf_util_get_string_buffer (child, agg->ident.host,
287 sizeof (agg->ident.host));
288 else if (strcasecmp ("Plugin", child->key) == 0)
289 cf_util_get_string_buffer (child, agg->ident.plugin,
290 sizeof (agg->ident.plugin));
291 else if (strcasecmp ("PluginInstance", child->key) == 0)
292 cf_util_get_string_buffer (child, agg->ident.plugin_instance,
293 sizeof (agg->ident.plugin_instance));
294 else if (strcasecmp ("Type", child->key) == 0)
295 cf_util_get_string_buffer (child, agg->ident.type,
296 sizeof (agg->ident.type));
297 else if (strcasecmp ("TypeInstance", child->key) == 0)
298 cf_util_get_string_buffer (child, agg->ident.type_instance,
299 sizeof (agg->ident.type_instance));
300 else if (strcasecmp ("CalculateNum", child->key) == 0)
301 cf_util_get_boolean (child, &agg->calc_num);
302 else if (strcasecmp ("CalculateSum", child->key) == 0)
303 cf_util_get_boolean (child, &agg->calc_sum);
304 else if (strcasecmp ("CalculateAverage", child->key) == 0)
305 cf_util_get_boolean (child, &agg->calc_average);
306 else if (strcasecmp ("CalculateMinimum", child->key) == 0)
307 cf_util_get_boolean (child, &agg->calc_min);
308 else if (strcasecmp ("CalculateMaximum", child->key) == 0)
309 cf_util_get_boolean (child, &agg->calc_max);
310 else if (strcasecmp ("CalculateStddev", child->key) == 0)
311 cf_util_get_boolean (child, &agg->calc_stddev);
313 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
314 "<Aggregation /> blocks and will be ignored.", child->key);
317 /* TODO(octo): Check identifier:
318 * - At least one wildcard.
322 status = lookup_add (lookup, &agg->ident, agg);
325 ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
331 } /* }}} int agg_config_aggregation */
333 static int agg_config (oconfig_item_t *ci) /* {{{ */
339 lookup = lookup_create (agg_lookup_class_callback,
340 agg_lookup_obj_callback,
341 agg_lookup_free_class_callback,
342 agg_lookup_free_obj_callback);
345 ERROR ("aggregation plugin: lookup_create failed.");
350 for (i = 0; i < ci->children_num; i++)
352 oconfig_item_t *child = ci->children + i;
354 if (strcasecmp ("Aggregation", child->key) == 0)
355 agg_config_aggregation (child);
357 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
358 "<Plugin aggregation /> blocks and will be ignored.", child->key);
362 } /* }}} int agg_config */
364 static int agg_read (void) /* {{{ */
366 agg_instance_t *this;
369 pthread_mutex_lock (&agg_instance_list_lock);
370 for (this = agg_instance_list_head; this != NULL; this = this->next)
372 DEBUG ("aggregation plugin: Handling instance: host = \"%s\", "
373 "plugin = \"%s\", plugin_instance = \"%s\", "
374 "type = \"%s\", type_instance = \"%s\"",
376 this->ident.plugin, this->ident.plugin_instance,
377 this->ident.type, this->ident.type_instance);
380 pthread_mutex_unlock (&agg_instance_list_lock);
382 DEBUG ("aggregation plugin: There are currently %zu instances.", i);
385 } /* }}} int agg_read */
387 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
388 __attribute__((unused)) user_data_t *user_data)
390 _Bool created_by_aggregation = 0;
393 /* Ignore values that were created by the aggregation plugin to avoid weird
395 (void) meta_data_get_boolean (vl->meta, "aggregation:created",
396 &created_by_aggregation);
397 if (created_by_aggregation)
404 status = lookup_search (lookup, ds, vl);
410 } /* }}} int agg_write */
412 void module_register (void)
414 plugin_register_complex_config ("aggregation", agg_config);
415 plugin_register_read ("aggregation", agg_read);
416 plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
419 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */