m4_divert_once([HELP_ENABLE], [])
+AC_PLUGIN([aggregation], [yes], [Aggregation plugin])
AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin])
AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics])
AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC])
perl . . . . . . . . $with_perl_bindings
Modules:
+ aggregation . . . . . $enable_aggregation
amqp . . . . . . . $enable_amqp
apache . . . . . . . $enable_apache
apcups . . . . . . . $enable_apcups
BUILT_SOURCES =
CLEANFILES =
+if BUILD_PLUGIN_AGGREGATION
+pkglib_LTLIBRARIES += aggregation.la
+aggregation_la_SOURCES = aggregation.c \
+ utils_vl_lookup.c utils_vl_lookup.h
+aggregation_la_LDFLAGS = -module -avoid-version
+aggregation_la_LIBADD =
+collectd_LDADD += "-dlopen" aggregation.la
+collectd_DEPENDENCIES += aggregation.la
+endif
+
if BUILD_PLUGIN_AMQP
pkglib_LTLIBRARIES += amqp.la
amqp_la_SOURCES = amqp.c \
rm -f $(DESTDIR)$(pkgdatadir)/types.db;
rm -f $(DESTDIR)$(sysconfdir)/collectd.conf
rm -f $(DESTDIR)$(pkgdatadir)/postgresql_default.conf;
+
+if BUILD_FEATURE_DEBUG
+bin_PROGRAMS += utils_vl_lookup_test
+utils_vl_lookup_test_SOURCES = utils_vl_lookup_test.c \
+ utils_vl_lookup.h utils_vl_lookup.c \
+ utils_avltree.c utils_avltree.h \
+ common.h
+
+utils_vl_lookup_test_CPPFLAGS = $(AM_CPPFLAGS) $(LTDLINCL) -DBUILD_TEST=1
+utils_vl_lookup_test_CFLAGS = $(AM_CFLAGS)
+utils_vl_lookup_test_LDFLAGS = -export-dynamic
+utils_vl_lookup_test_LDADD =
+endif
--- /dev/null
+/**
+ * collectd - src/aggregation.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "meta_data.h"
+#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_vl_lookup.h"
+
+#include <pthread.h>
+
+struct aggregation_s /* {{{ */
+{
+ identifier_t ident;
+
+ _Bool calc_num;
+ _Bool calc_sum;
+ _Bool calc_average;
+ _Bool calc_min;
+ _Bool calc_max;
+ _Bool calc_stddev;
+}; /* }}} */
+typedef struct aggregation_s aggregation_t;
+
+struct agg_instance_s;
+typedef struct agg_instance_s agg_instance_t;
+struct agg_instance_s /* {{{ */
+{
+ pthread_mutex_t lock;
+ identifier_t ident;
+
+ int ds_type;
+
+ derive_t num;
+ gauge_t sum;
+ gauge_t squares_sum;
+
+ gauge_t min;
+ gauge_t max;
+
+ rate_to_value_state_t *state_num;
+ rate_to_value_state_t *state_sum;
+ rate_to_value_state_t *state_average;
+ rate_to_value_state_t *state_min;
+ rate_to_value_state_t *state_max;
+ rate_to_value_state_t *state_stddev;
+
+ agg_instance_t *next;
+}; /* }}} */
+
+static lookup_t *lookup = NULL;
+
+static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static agg_instance_t *agg_instance_list_head = NULL;
+
+static void agg_destroy (aggregation_t *agg) /* {{{ */
+{
+ sfree (agg);
+} /* }}} void agg_destroy */
+
+/* Frees all dynamically allocated memory within the instance. */
+static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
+{
+ if (inst == NULL)
+ return;
+
+ /* Remove this instance from the global list of instances. */
+ pthread_mutex_lock (&agg_instance_list_lock);
+ if (agg_instance_list_head == inst)
+ agg_instance_list_head = inst->next;
+ else if (agg_instance_list_head != NULL)
+ {
+ agg_instance_t *prev = agg_instance_list_head;
+ while ((prev != NULL) && (prev->next != inst))
+ prev = prev->next;
+ if (prev != NULL)
+ prev->next = inst->next;
+ }
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ sfree (inst->state_num);
+ sfree (inst->state_sum);
+ sfree (inst->state_average);
+ sfree (inst->state_min);
+ sfree (inst->state_max);
+ sfree (inst->state_stddev);
+
+ memset (inst, 0, sizeof (*inst));
+ inst->ds_type = -1;
+ inst->min = NAN;
+ inst->max = NAN;
+} /* }}} void agg_instance_destroy */
+
+/* Create a new aggregation instance. */
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+ value_list_t const *vl, aggregation_t *agg)
+{
+ agg_instance_t *inst;
+
+ DEBUG ("aggregation plugin: Creating new instance.");
+
+ inst = malloc (sizeof (*inst));
+ if (inst == NULL)
+ {
+ ERROR ("aggregation plugin: malloc() failed.");
+ return (NULL);
+ }
+ memset (inst, 0, sizeof (*inst));
+ pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+ inst->ds_type = ds->ds[0].type;
+
+#define COPY_FIELD(fld) do { \
+ sstrncpy (inst->ident.fld, \
+ LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
+ sizeof (inst->ident.fld)); \
+} while (0)
+
+ COPY_FIELD (host);
+ COPY_FIELD (plugin);
+ COPY_FIELD (plugin_instance);
+ COPY_FIELD (type);
+ COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+ inst->min = NAN;
+ inst->max = NAN;
+
+#define INIT_STATE(field) do { \
+ inst->state_ ## field = NULL; \
+ if (agg->calc_ ## field) { \
+ inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
+ if (inst->state_ ## field == NULL) { \
+ agg_instance_destroy (inst); \
+ ERROR ("aggregation plugin: malloc() failed."); \
+ return (NULL); \
+ } \
+ memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
+ } \
+} while (0)
+
+ INIT_STATE (num);
+ INIT_STATE (sum);
+ INIT_STATE (average);
+ INIT_STATE (min);
+ INIT_STATE (max);
+ INIT_STATE (stddev);
+
+#undef INIT_STATE
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+ inst->next = agg_instance_list_head;
+ agg_instance_list_head = inst;
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return (inst);
+} /* }}} agg_instance_t *agg_instance_create */
+
+/* Update the num, sum, min, max, ... fields of the aggregation instance, if
+ * the rate of the value list is available. Value lists with more than one data
+ * source are not supported and will return an error. Returns zero on success
+ * and non-zero otherwise. */
+static int agg_instance_update (agg_instance_t *inst, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl)
+{
+ gauge_t *rate;
+
+ if (ds->ds_num != 1)
+ return (-1);
+
+ rate = uc_get_rate (ds, vl);
+ if (rate == NULL)
+ {
+ ERROR ("aggregation plugin: uc_get_rate() failed.");
+ return (-1);
+ }
+
+ if (isnan (rate[0]))
+ {
+ sfree (rate);
+ return (0);
+ }
+
+ pthread_mutex_lock (&inst->lock);
+
+ inst->num++;
+ inst->sum += rate[0];
+ inst->squares_sum += (rate[0] * rate[0]);
+
+ if (isnan (inst->min) || (inst->min > rate[0]))
+ inst->min = rate[0];
+ if (isnan (inst->max) || (inst->max < rate[0]))
+ inst->max = rate[0];
+
+ pthread_mutex_unlock (&inst->lock);
+
+ sfree (rate);
+ return (0);
+} /* }}} int agg_instance_update */
+
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+ char const *func, gauge_t rate, rate_to_value_state_t *state,
+ value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+ value_t v;
+ int status;
+
+ if (pi_prefix[0] != 0)
+ ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+ pi_prefix, func);
+ else
+ sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+ memset (&v, 0, sizeof (v));
+ status = rate_to_value (&v, rate, state, inst->ds_type, t);
+ if (status != 0)
+ {
+ /* If this is the first iteration and rate_to_value() was asked to return a
+ * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+ * gracefully. */
+ if (status == EAGAIN)
+ return (0);
+
+ WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+ status);
+ return (-1);
+ }
+
+ vl->values = &v;
+ vl->values_len = 1;
+
+ plugin_dispatch_values_secure (vl);
+
+ vl->values = NULL;
+ vl->values_len = 0;
+
+ return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ char pi_prefix[DATA_MAX_NAME_LEN];
+
+ /* Pre-set all the fields in the value list that will not change per
+ * aggregation type (sum, average, ...). The struct will be re-used and must
+ * therefore be dispatched using the "secure" function. */
+
+ vl.time = t;
+ vl.interval = 0;
+
+ vl.meta = meta_data_create ();
+ if (vl.meta == NULL)
+ {
+ ERROR ("aggregation plugin: meta_data_create failed.");
+ return (-1);
+ }
+ meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+ if (LU_IS_ALL (inst->ident.host))
+ sstrncpy (vl.host, "global", sizeof (vl.host));
+ else
+ sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+ sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
+
+ if (LU_IS_ALL (inst->ident.plugin))
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+ else
+ sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+ }
+ else
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+ else
+ ssnprintf (pi_prefix, sizeof (pi_prefix),
+ "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+ }
+
+ sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+ if (!LU_IS_ALL (inst->ident.type_instance))
+ sstrncpy (vl.type_instance, inst->ident.type_instance,
+ sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+ if (inst->state_ ## func != NULL) { \
+ agg_instance_read_func (inst, #func, rate, \
+ inst->state_ ## func, &vl, pi_prefix, t); \
+ } \
+} while (0)
+
+ pthread_mutex_lock (&inst->lock);
+
+ READ_FUNC (num, (gauge_t) inst->num);
+
+ /* All other aggregations are only defined when there have been any values
+ * at all. */
+ if (inst->num > 0)
+ {
+ READ_FUNC (sum, inst->sum);
+ READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+ READ_FUNC (min, inst->min);
+ READ_FUNC (max, inst->max);
+ READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+ - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+ }
+
+ /* Reset internal state. */
+ inst->num = 0;
+ inst->sum = 0.0;
+ inst->squares_sum = 0.0;
+ inst->min = NAN;
+ inst->max = NAN;
+
+ pthread_mutex_unlock (&inst->lock);
+
+ meta_data_destroy (vl.meta);
+ vl.meta = NULL;
+
+ return (0);
+} /* }}} int agg_instance_read */
+
+/* lookup_class_callback_t for utils_vl_lookup */
+static void *agg_lookup_class_callback ( /* {{{ */
+ __attribute__((unused)) data_set_t const *ds,
+ value_list_t const *vl, void *user_class)
+{
+ return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
+} /* }}} void *agg_class_callback */
+
+/* lookup_obj_callback_t for utils_vl_lookup */
+static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
+ value_list_t const *vl,
+ __attribute__((unused)) void *user_class,
+ void *user_obj)
+{
+ return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
+} /* }}} int agg_lookup_obj_callback */
+
+/* lookup_free_class_callback_t for utils_vl_lookup */
+static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
+{
+ agg_destroy ((aggregation_t *) user_class);
+} /* }}} void agg_lookup_free_class_callback */
+
+/* lookup_free_obj_callback_t for utils_vl_lookup */
+static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
+{
+ agg_instance_destroy ((agg_instance_t *) user_obj);
+} /* }}} void agg_lookup_free_obj_callback */
+
+/*
+ * <Plugin "aggregation">
+ * <Aggregation>
+ * Plugin "cpu"
+ * Type "cpu"
+ *
+ * GroupBy Host
+ * GroupBy TypeInstance
+ *
+ * CalculateNum true
+ * CalculateSum true
+ * CalculateAverage true
+ * CalculateMinimum true
+ * CalculateMaximum true
+ * CalculateStddev true
+ * </Aggregation>
+ * </Plugin>
+ */
+static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
+ aggregation_t *agg)
+{
+ int i;
+
+ for (i = 0; i < ci->values_num; i++)
+ {
+ char const *value;
+
+ if (ci->values[i].type != OCONFIG_TYPE_STRING)
+ {
+ ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
+ "is not a string.", i + 1);
+ continue;
+ }
+
+ value = ci->values[i].value.string;
+
+ if (strcasecmp ("Host", value) == 0)
+ sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+ else if (strcasecmp ("Plugin", value) == 0)
+ sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+ else if (strcasecmp ("PluginInstance", value) == 0)
+ sstrncpy (agg->ident.plugin_instance, LU_ANY,
+ sizeof (agg->ident.plugin_instance));
+ else if (strcasecmp ("TypeInstance", value) == 0)
+ sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("Type", value) == 0)
+ ERROR ("aggregation plugin: Grouping by type is not supported.");
+ else
+ WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
+ "option is invalid and will be ignored.", value);
+ } /* for (ci->values) */
+
+ return (0);
+} /* }}} int agg_config_handle_group_by */
+
+static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
+{
+ aggregation_t *agg;
+ _Bool is_valid;
+ int status;
+ int i;
+
+ agg = malloc (sizeof (*agg));
+ if (agg == NULL)
+ {
+ ERROR ("aggregation plugin: malloc failed.");
+ return (-1);
+ }
+ memset (agg, 0, sizeof (*agg));
+
+ sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
+ sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
+ sstrncpy (agg->ident.plugin_instance, LU_ALL,
+ sizeof (agg->ident.plugin_instance));
+ sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
+ sstrncpy (agg->ident.type_instance, LU_ALL,
+ sizeof (agg->ident.type_instance));
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.host,
+ sizeof (agg->ident.host));
+ else if (strcasecmp ("Plugin", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.plugin,
+ sizeof (agg->ident.plugin));
+ else if (strcasecmp ("PluginInstance", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.plugin_instance,
+ sizeof (agg->ident.plugin_instance));
+ else if (strcasecmp ("Type", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.type,
+ sizeof (agg->ident.type));
+ else if (strcasecmp ("TypeInstance", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.type_instance,
+ sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("GroupBy", child->key) == 0)
+ agg_config_handle_group_by (child, agg);
+ else if (strcasecmp ("CalculateNum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_num);
+ else if (strcasecmp ("CalculateSum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_sum);
+ else if (strcasecmp ("CalculateAverage", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_average);
+ else if (strcasecmp ("CalculateMinimum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_min);
+ else if (strcasecmp ("CalculateMaximum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_max);
+ else if (strcasecmp ("CalculateStddev", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_stddev);
+ else
+ WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+ "<Aggregation /> blocks and will be ignored.", child->key);
+ }
+
+ /* Sanity checking */
+ is_valid = 1;
+ if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+ {
+ ERROR ("aggregation plugin: It appears you did not specify the required "
+ "\"Type\" option in this aggregation. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ }
+ else if (strchr (agg->ident.type, '/') != NULL)
+ {
+ ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
+ "character. Especially, it may not be a wildcard. The current "
+ "value is \"%s\".", agg->ident.type);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!LU_IS_ALL (agg->ident.host) /* {{{ */
+ && !LU_IS_ALL (agg->ident.plugin)
+ && !LU_IS_ALL (agg->ident.plugin_instance)
+ && !LU_IS_ALL (agg->ident.type_instance))
+ {
+ ERROR ("aggregation plugin: An aggregation must contain at least one "
+ "wildcard. This is achieved by leaving at least one of the \"Host\", "
+ "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
+ "and not grouping by that field. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
+ && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
+ {
+ ERROR ("aggregation plugin: No aggregation function has been specified. "
+ "Without this, I don't know what I should be calculating. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!is_valid) /* {{{ */
+ {
+ sfree (agg);
+ return (-1);
+ } /* }}} */
+
+ status = lookup_add (lookup, &agg->ident, agg);
+ if (status != 0)
+ {
+ ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
+ sfree (agg);
+ return (-1);
+ }
+
+ DEBUG ("aggregation plugin: Successfully added aggregation: "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ return (0);
+} /* }}} int agg_config_aggregation */
+
+static int agg_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+
+ if (lookup == NULL)
+ {
+ lookup = lookup_create (agg_lookup_class_callback,
+ agg_lookup_obj_callback,
+ agg_lookup_free_class_callback,
+ agg_lookup_free_obj_callback);
+ if (lookup == NULL)
+ {
+ pthread_mutex_unlock (&agg_instance_list_lock);
+ ERROR ("aggregation plugin: lookup_create failed.");
+ return (-1);
+ }
+ }
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Aggregation", child->key) == 0)
+ agg_config_aggregation (child);
+ else
+ WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+ "<Plugin aggregation /> blocks and will be ignored.", child->key);
+ }
+
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return (0);
+} /* }}} int agg_config */
+
+static int agg_read (void) /* {{{ */
+{
+ agg_instance_t *this;
+ cdtime_t t;
+ int success;
+
+ t = cdtime ();
+ success = 0;
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+
+ /* agg_instance_list_head only holds data, after the "write" callback has
+ * been called with a matching value list at least once. So on startup,
+ * there's a race between the aggregations read() and write() callback. If
+ * the read() callback is called first, agg_instance_list_head is NULL and
+ * "success" may be zero. This is expected and should not result in an error.
+ * Therefore we need to handle this case separately. */
+ if (agg_instance_list_head == NULL)
+ {
+ pthread_mutex_unlock (&agg_instance_list_lock);
+ return (0);
+ }
+
+ for (this = agg_instance_list_head; this != NULL; this = this->next)
+ {
+ int status;
+
+ status = agg_instance_read (this, t);
+ if (status != 0)
+ WARNING ("aggregation plugin: Reading an aggregation instance "
+ "failed with status %i.", status);
+ else
+ success++;
+ }
+
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return ((success > 0) ? 0 : -1);
+} /* }}} int agg_read */
+
+static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
+ __attribute__((unused)) user_data_t *user_data)
+{
+ _Bool created_by_aggregation = 0;
+ int status;
+
+ /* Ignore values that were created by the aggregation plugin to avoid weird
+ * effects. */
+ (void) meta_data_get_boolean (vl->meta, "aggregation:created",
+ &created_by_aggregation);
+ if (created_by_aggregation)
+ return (0);
+
+ if (lookup == NULL)
+ status = ENOENT;
+ else
+ {
+ status = lookup_search (lookup, ds, vl);
+ if (status > 0)
+ status = 0;
+ }
+
+ return (status);
+} /* }}} int agg_write */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("aggregation", agg_config);
+ plugin_register_read ("aggregation", agg_read);
+ plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
+}
+
+/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */
# to missing dependencies or because they have been deactivated explicitly. #
##############################################################################
+#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
#@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
#@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
# ription of those options is available in the collectd.conf(5) manual page. #
##############################################################################
+#<Plugin "aggregation">
+# <Aggregation>
+# #Host "unspecified"
+# Plugin "cpu"
+# #PluginInstance "unspecified"
+# Type "cpu"
+# #TypeInstance "unspecified"
+#
+# GroupBy "Host"
+# GroupBy "TypeInstance"
+#
+# CalculateNum false
+# CalculateSum false
+# CalculateAverage true
+# CalculateMinimum false
+# CalculateMaximum false
+# CalculateStddev false
+# </Aggregation>
+#</Plugin>
+
#<Plugin "amqp">
# <Publish "name">
# Host "localhost"
require external configuration, too. The C<apache plugin>, for example,
required C<mod_status> to be configured in the webserver you're going to
collect data from. These plugins are listed below as well, even if they don't
-require any configuration within collectd's configfile.
+require any configuration within collectd's configuration file.
A list of all plugins and a short summary for each plugin can be found in the
F<README> file shipped with the sourcecode and hopefully binary packets as
well.
+=head2 Plugin C<aggregation>
+
+The I<Aggregation plugin> makes it possible to aggregate several values into
+one using aggregation functions such as I<sum>, I<average>, I<min> and I<max>.
+This can be put to a wide variety of uses, e.g. average and total CPU
+statistics for your entire fleet.
+
+The grouping is powerful but, as with many powerful tools, may be a bit
+difficult to wrap your head around. The grouping will therefore be
+demonstrated using an example: The average and sum of the CPU usage across
+all CPUs of each host is to be calculated.
+
+To select all the affected values for our example, set C<Plugin cpu> and
+C<Type cpu>. The other values are left unspecified, meaning "all values". The
+I<Host>, I<Plugin>, I<PluginInstance>, I<Type> and I<TypeInstance> options
+work as if they were specified in the C<WHERE> clause of an C<SELECT> SQL
+statement.
+
+ Plugin "cpu"
+ Type "cpu"
+
+Although the I<Host>, I<PluginInstance> (CPU number, i.e. 0, 1, 2, ...) and
+I<TypeInstance> (idle, user, system, ...) fields are left unspecified in the
+example, the intention is to have a new value for each host / type instance
+pair. This is achieved by "grouping" the values using the C<GroupBy> option.
+It can be specified multiple times to group by more than one field.
+
+ GroupBy "Host"
+ GroupBy "TypeInstance"
+
+We do neither specify nor group by I<plugin instance> (the CPU number), so all
+metrics that differ in the CPU number only will be aggregated. Each
+aggregation needs I<at least one> such field, otherwise no aggregation would
+take place.
+
+The full example configuration looks like this:
+
+ <Plugin "aggregation">
+ <Aggregation>
+ Plugin "cpu"
+ Type "cpu"
+
+ GroupBy "Host"
+ GroupBy "TypeInstance"
+
+ CalculateSum true
+ CalculateAverage true
+ </Aggregation>
+ </Plugin>
+
+There are a couple of limitations you should be aware of:
+
+=over 4
+
+=item
+
+The I<Type> cannot be left unspecified, because it is not reasonable to add
+apples to oranges. Also, the internal lookup structure won't work if you try
+to group by type.
+
+=item
+
+There must be at least one unspecified, ungrouped field. Otherwise nothing
+will be aggregated.
+
+=back
+
+As you can see in the example above, each aggregation has its own
+B<Aggregation> block. You can have multiple aggregation blocks and aggregation
+blocks may match the same values, i.e. one value list can update multiple
+aggregations. The following options are valid inside B<Aggregation> blocks:
+
+=over 4
+
+=item B<Host> I<Host>
+
+=item B<Plugin> I<Plugin>
+
+=item B<PluginInstance> I<PluginInstance>
+
+=item B<Type> I<Type>
+
+=item B<TypeInstance> I<TypeInstance>
+
+Selects the value lists to be added to this aggregation. B<Type> must be a
+valid data set name, see L<types.db(5)> for details.
+
+=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
+
+Group valued by the specified field. The B<GroupBy> option may be repeated to
+group by multiple fields.
+
+=item B<CalculateNum> B<true>|B<false>
+
+=item B<CalculateSum> B<true>|B<false>
+
+=item B<CalculateAverage> B<true>|B<false>
+
+=item B<CalculateMinimum> B<true>|B<false>
+
+=item B<CalculateMaximum> B<true>|B<false>
+
+=item B<CalculateStddev> B<true>|B<false>
+
+Boolean options for enabling calculation of the number of value lists, their
+sum, average, minimum, maximum andE<nbsp>/ or standard deviation. All options
+are disabled by default.
+
+=back
+
=head2 Plugin C<amqp>
The I<AMQMP plugin> can be used to communicate with other instances of
}
return (diff);
-} /* counter_t counter_to_gauge */
+} /* counter_t counter_diff */
+
+int rate_to_value (value_t *ret_value, gauge_t rate, /* {{{ */
+ rate_to_value_state_t *state,
+ int ds_type, cdtime_t t)
+{
+ gauge_t delta_gauge;
+ cdtime_t delta_t;
+
+ if (ds_type == DS_TYPE_GAUGE)
+ {
+ state->last_value.gauge = rate;
+ state->last_time = t;
+
+ *ret_value = state->last_value;
+ return (0);
+ }
+
+ /* Counter and absolute can't handle negative rates. Reset "last time"
+ * to zero, so that the next valid rate will re-initialize the
+ * structure. */
+ if ((rate < 0.0)
+ && ((ds_type == DS_TYPE_COUNTER)
+ || (ds_type == DS_TYPE_ABSOLUTE)))
+ {
+ memset (state, 0, sizeof (*state));
+ return (EINVAL);
+ }
+
+ /* Another invalid state: The time is not increasing. */
+ if (t <= state->last_time)
+ {
+ memset (state, 0, sizeof (*state));
+ return (EINVAL);
+ }
+
+ delta_t = t - state->last_time;
+ delta_gauge = (rate * CDTIME_T_TO_DOUBLE (delta_t)) + state->residual;
+
+ /* Previous value is invalid. */
+ if (state->last_time == 0) /* {{{ */
+ {
+ if (ds_type == DS_TYPE_DERIVE)
+ {
+ state->last_value.derive = (derive_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.derive);
+ }
+ else if (ds_type == DS_TYPE_COUNTER)
+ {
+ state->last_value.counter = (counter_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.counter);
+ }
+ else if (ds_type == DS_TYPE_ABSOLUTE)
+ {
+ state->last_value.absolute = (absolute_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.absolute);
+ }
+ else
+ {
+ assert (23 == 42);
+ }
+
+ state->last_time = t;
+ return (EAGAIN);
+ } /* }}} */
+
+ if (ds_type == DS_TYPE_DERIVE)
+ {
+ derive_t delta_derive = (derive_t) delta_gauge;
+
+ state->last_value.derive += delta_derive;
+ state->residual = delta_gauge - ((gauge_t) delta_derive);
+ }
+ else if (ds_type == DS_TYPE_COUNTER)
+ {
+ counter_t delta_counter = (counter_t) delta_gauge;
+
+ state->last_value.counter += delta_counter;
+ state->residual = delta_gauge - ((gauge_t) delta_counter);
+ }
+ else if (ds_type == DS_TYPE_ABSOLUTE)
+ {
+ absolute_t delta_absolute = (absolute_t) delta_gauge;
+
+ state->last_value.absolute = delta_absolute;
+ state->residual = delta_gauge - ((gauge_t) delta_absolute);
+ }
+ else
+ {
+ assert (23 == 42);
+ }
+
+ state->last_time = t;
+ *ret_value = state->last_value;
+ return (0);
+} /* }}} value_t rate_to_value */
int service_name_to_port_number (const char *service_name)
{
|| (strcasecmp ("no", (s)) == 0) \
|| (strcasecmp ("off", (s)) == 0))
+struct rate_to_value_state_s
+{
+ value_t last_value;
+ cdtime_t last_time;
+ gauge_t residual;
+};
+typedef struct rate_to_value_state_s rate_to_value_state_t;
+
char *sstrncpy (char *dest, const char *src, size_t n);
int ssnprintf (char *dest, size_t n, const char *format, ...);
char *sstrdup(const char *s);
counter_t counter_diff (counter_t old_value, counter_t new_value);
+/* Convert a rate back to a value_t. When converting to a derive_t, counter_t
+ * or absoltue_t, take fractional residuals into account. This is important
+ * when scaling counters, for example.
+ * Returns zero on success. Returns EAGAIN when called for the first time; in
+ * this case the value_t is invalid and the next call should succeed. Other
+ * return values indicate an error. */
+int rate_to_value (value_t *ret_value, gauge_t rate,
+ rate_to_value_state_t *state, int ds_type, cdtime_t t);
+
/* Converts a service name (a string) to a port number
* (in the range [1-65535]). Returns less than zero on error. */
int service_name_to_port_number (const char *service_name);
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "utils_vl_lookup.h"
+#include "utils_avltree.h"
+
+#if BUILD_TEST
+# define sstrncpy strncpy
+# define plugin_log(s, ...) do { \
+ printf ("[severity %i] ", s); \
+ printf (__VA_ARGS__); \
+ printf ("\n"); \
+} while (0)
+#endif
+
+/*
+ * Types
+ */
+struct lookup_s
+{
+ c_avl_tree_t *by_type_tree;
+
+ lookup_class_callback_t cb_user_class;
+ lookup_obj_callback_t cb_user_obj;
+ lookup_free_class_callback_t cb_free_class;
+ lookup_free_obj_callback_t cb_free_obj;
+};
+
+struct user_obj_s;
+typedef struct user_obj_s user_obj_t;
+struct user_obj_s
+{
+ void *user_obj;
+ identifier_t ident;
+
+ user_obj_t *next;
+};
+
+struct user_class_s
+{
+ void *user_class;
+ identifier_t ident;
+ user_obj_t *user_obj_list; /* list of user_obj */
+};
+typedef struct user_class_s user_class_t;
+
+struct user_class_list_s;
+typedef struct user_class_list_s user_class_list_t;
+struct user_class_list_s
+{
+ user_class_t entry;
+ user_class_list_t *next;
+};
+
+struct by_type_entry_s
+{
+ c_avl_tree_t *by_plugin_tree; /* plugin -> user_class_list_t */
+ user_class_list_t *wildcard_plugin_list;
+};
+typedef struct by_type_entry_s by_type_entry_t;
+
+/*
+ * Private functions
+ */
+static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_t *user_class)
+{
+ user_obj_t *user_obj;
+
+ user_obj = malloc (sizeof (*user_obj));
+ if (user_obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (NULL);
+ }
+ memset (user_obj, 0, sizeof (*user_obj));
+ user_obj->next = NULL;
+
+ user_obj->user_obj = obj->cb_user_class (ds, vl, user_class->user_class);
+ if (user_obj->user_obj == NULL)
+ {
+ sfree (user_obj);
+ WARNING("utils_vl_lookup: User-provided constructor failed.");
+ return (NULL);
+ }
+
+ sstrncpy (user_obj->ident.host,
+ LU_IS_ALL (user_class->ident.host) ? "/all/" : vl->host,
+ sizeof (user_obj->ident.host));
+ sstrncpy (user_obj->ident.plugin,
+ LU_IS_ALL (user_class->ident.plugin) ? "/all/" : vl->plugin,
+ sizeof (user_obj->ident.plugin));
+ sstrncpy (user_obj->ident.plugin_instance,
+ LU_IS_ALL (user_class->ident.plugin_instance) ? "/all/" : vl->plugin_instance,
+ sizeof (user_obj->ident.plugin_instance));
+ sstrncpy (user_obj->ident.type,
+ LU_IS_ALL (user_class->ident.type) ? "/all/" : vl->type,
+ sizeof (user_obj->ident.type));
+ sstrncpy (user_obj->ident.type_instance,
+ LU_IS_ALL (user_class->ident.type_instance) ? "/all/" : vl->type_instance,
+ sizeof (user_obj->ident.type_instance));
+
+ if (user_class->user_obj_list == NULL)
+ {
+ user_class->user_obj_list = user_obj;
+ }
+ else
+ {
+ user_obj_t *last = user_class->user_obj_list;
+ while (last->next != NULL)
+ last = last->next;
+ last->next = user_obj;
+ }
+
+ return (user_obj);
+} /* }}} void *lu_create_user_obj */
+
+static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
+ value_list_t const *vl)
+{
+ user_obj_t *ptr;
+
+ for (ptr = user_class->user_obj_list;
+ ptr != NULL;
+ ptr = ptr->next)
+ {
+ if (!LU_IS_ALL (ptr->ident.host)
+ && (strcmp (ptr->ident.host, vl->host) != 0))
+ continue;
+ if (!LU_IS_ALL (ptr->ident.plugin_instance)
+ && (strcmp (ptr->ident.plugin_instance, vl->plugin_instance) != 0))
+ continue;
+ if (!LU_IS_ALL (ptr->ident.type_instance)
+ && (strcmp (ptr->ident.type_instance, vl->type_instance) != 0))
+ continue;
+
+ return (ptr);
+ }
+
+ return (NULL);
+} /* }}} user_obj_t *lu_find_user_obj */
+
+static int lu_handle_user_class (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_t *user_class)
+{
+ user_obj_t *user_obj;
+ int status;
+
+ assert (strcmp (vl->type, user_class->ident.type) == 0);
+ assert (LU_IS_WILDCARD (user_class->ident.plugin)
+ || (strcmp (vl->plugin, user_class->ident.plugin) == 0));
+
+ /* When we get here, type and plugin already match the user class. Now check
+ * the rest of the fields. */
+ if (!LU_IS_WILDCARD (user_class->ident.type_instance)
+ && (strcmp (vl->type_instance, user_class->ident.type_instance) != 0))
+ return (1);
+ if (!LU_IS_WILDCARD (user_class->ident.plugin_instance)
+ && (strcmp (vl->plugin_instance,
+ user_class->ident.plugin_instance) != 0))
+ return (1);
+ if (!LU_IS_WILDCARD (user_class->ident.host)
+ && (strcmp (vl->host, user_class->ident.host) != 0))
+ return (1);
+
+ user_obj = lu_find_user_obj (user_class, vl);
+ if (user_obj == NULL)
+ {
+ /* call lookup_class_callback_t() and insert into the list of user objects. */
+ user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+ if (user_obj == NULL)
+ return (-1);
+ }
+
+ status = obj->cb_user_obj (ds, vl,
+ user_class->user_class, user_obj->user_obj);
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: The user object callback failed with status %i.",
+ status);
+ /* Returning a negative value means: abort! */
+ if (status < 0)
+ return (status);
+ else
+ return (1);
+ }
+
+ return (0);
+} /* }}} int lu_handle_user_class */
+
+static int lu_handle_user_class_list (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_list_t *user_class_list)
+{
+ user_class_list_t *ptr;
+ int retval = 0;
+
+ for (ptr = user_class_list; ptr != NULL; ptr = ptr->next)
+ {
+ int status;
+
+ status = lu_handle_user_class (obj, ds, vl, &ptr->entry);
+ if (status < 0)
+ return (status);
+ else if (status == 0)
+ retval++;
+ }
+
+ return (retval);
+} /* }}} int lu_handle_user_class_list */
+
+static by_type_entry_t *lu_search_by_type (lookup_t *obj, /* {{{ */
+ char const *type, _Bool allocate_if_missing)
+{
+ by_type_entry_t *by_type;
+ char *type_copy;
+ int status;
+
+ status = c_avl_get (obj->by_type_tree, type, (void *) &by_type);
+ if (status == 0)
+ return (by_type);
+
+ if (!allocate_if_missing)
+ return (NULL);
+
+ type_copy = strdup (type);
+ if (type_copy == NULL)
+ {
+ ERROR ("utils_vl_lookup: strdup failed.");
+ return (NULL);
+ }
+
+ by_type = malloc (sizeof (*by_type));
+ if (by_type == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ sfree (type_copy);
+ return (NULL);
+ }
+ memset (by_type, 0, sizeof (*by_type));
+ by_type->wildcard_plugin_list = NULL;
+
+ by_type->by_plugin_tree = c_avl_create ((void *) strcmp);
+ if (by_type->by_plugin_tree == NULL)
+ {
+ ERROR ("utils_vl_lookup: c_avl_create failed.");
+ sfree (by_type);
+ sfree (type_copy);
+ return (NULL);
+ }
+
+ status = c_avl_insert (obj->by_type_tree,
+ /* key = */ type_copy, /* value = */ by_type);
+ assert (status <= 0); /* >0 => entry exists => race condition. */
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: c_avl_insert failed.");
+ c_avl_destroy (by_type->by_plugin_tree);
+ sfree (by_type);
+ sfree (type_copy);
+ return (NULL);
+ }
+
+ return (by_type);
+} /* }}} by_type_entry_t *lu_search_by_type */
+
+static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
+ identifier_t const *ident, user_class_list_t *user_class_list)
+{
+ user_class_list_t *ptr = NULL;
+
+ /* Lookup user_class_list from the per-plugin structure. If this is the first
+ * user_class to be added, the blocks return immediately. Otherwise they will
+ * set "ptr" to non-NULL. */
+ if (LU_IS_WILDCARD (ident->plugin))
+ {
+ if (by_type->wildcard_plugin_list == NULL)
+ {
+ by_type->wildcard_plugin_list = user_class_list;
+ return (0);
+ }
+
+ ptr = by_type->wildcard_plugin_list;
+ } /* if (plugin is wildcard) */
+ else /* (plugin is not wildcard) */
+ {
+ int status;
+
+ status = c_avl_get (by_type->by_plugin_tree,
+ ident->plugin, (void *) &ptr);
+
+ if (status != 0) /* plugin not yet in tree */
+ {
+ char *plugin_copy = strdup (ident->plugin);
+
+ if (plugin_copy == NULL)
+ {
+ ERROR ("utils_vl_lookup: strdup failed.");
+ sfree (user_class_list);
+ return (ENOMEM);
+ }
+
+ status = c_avl_insert (by_type->by_plugin_tree,
+ plugin_copy, user_class_list);
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: c_avl_insert(\"%s\") failed with status %i.",
+ plugin_copy, status);
+ sfree (plugin_copy);
+ sfree (user_class_list);
+ return (status);
+ }
+ else
+ {
+ return (0);
+ }
+ } /* if (plugin not yet in tree) */
+ } /* if (plugin is not wildcard) */
+
+ assert (ptr != NULL);
+
+ while (ptr->next != NULL)
+ ptr = ptr->next;
+ ptr->next = user_class_list;
+
+ return (0);
+} /* }}} int lu_add_by_plugin */
+
+static void lu_destroy_user_obj (lookup_t *obj, /* {{{ */
+ user_obj_t *user_obj)
+{
+ while (user_obj != NULL)
+ {
+ user_obj_t *next = user_obj->next;
+
+ if (obj->cb_free_obj != NULL)
+ obj->cb_free_obj (user_obj->user_obj);
+ user_obj->user_obj = NULL;
+
+ sfree (user_obj);
+ user_obj = next;
+ }
+} /* }}} void lu_destroy_user_obj */
+
+static void lu_destroy_user_class_list (lookup_t *obj, /* {{{ */
+ user_class_list_t *user_class_list)
+{
+ while (user_class_list != NULL)
+ {
+ user_class_list_t *next = user_class_list->next;
+
+ if (obj->cb_free_class != NULL)
+ obj->cb_free_class (user_class_list->entry.user_class);
+ user_class_list->entry.user_class = NULL;
+
+ lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
+ user_class_list->entry.user_obj_list = NULL;
+
+ sfree (user_class_list);
+ user_class_list = next;
+ }
+} /* }}} void lu_destroy_user_class_list */
+
+static void lu_destroy_by_type (lookup_t *obj, /* {{{ */
+ by_type_entry_t *by_type)
+{
+
+ while (42)
+ {
+ char *plugin = NULL;
+ user_class_list_t *user_class_list = NULL;
+ int status;
+
+ status = c_avl_pick (by_type->by_plugin_tree,
+ (void *) &plugin, (void *) &user_class_list);
+ if (status != 0)
+ break;
+
+ DEBUG ("utils_vl_lookup: lu_destroy_by_type: Destroying plugin \"%s\".",
+ plugin);
+ sfree (plugin);
+ lu_destroy_user_class_list (obj, user_class_list);
+ }
+
+ c_avl_destroy (by_type->by_plugin_tree);
+ by_type->by_plugin_tree = NULL;
+
+ lu_destroy_user_class_list (obj, by_type->wildcard_plugin_list);
+ by_type->wildcard_plugin_list = NULL;
+
+ sfree (by_type);
+} /* }}} int lu_destroy_by_type */
+
+/*
+ * Public functions
+ */
+lookup_t *lookup_create (lookup_class_callback_t cb_user_class, /* {{{ */
+ lookup_obj_callback_t cb_user_obj,
+ lookup_free_class_callback_t cb_free_class,
+ lookup_free_obj_callback_t cb_free_obj)
+{
+ lookup_t *obj = malloc (sizeof (*obj));
+ if (obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (NULL);
+ }
+ memset (obj, 0, sizeof (*obj));
+
+ obj->by_type_tree = c_avl_create ((void *) strcmp);
+ if (obj->by_type_tree == NULL)
+ {
+ ERROR ("utils_vl_lookup: c_avl_create failed.");
+ sfree (obj);
+ return (NULL);
+ }
+
+ obj->cb_user_class = cb_user_class;
+ obj->cb_user_obj = cb_user_obj;
+ obj->cb_free_class = cb_free_class;
+ obj->cb_free_obj = cb_free_obj;
+
+ return (obj);
+} /* }}} lookup_t *lookup_create */
+
+void lookup_destroy (lookup_t *obj) /* {{{ */
+{
+ int status;
+
+ if (obj == NULL)
+ return;
+
+ while (42)
+ {
+ char *type = NULL;
+ by_type_entry_t *by_type = NULL;
+
+ status = c_avl_pick (obj->by_type_tree, (void *) &type, (void *) &by_type);
+ if (status != 0)
+ break;
+
+ DEBUG ("utils_vl_lookup: lookup_destroy: Destroying type \"%s\".", type);
+ sfree (type);
+ lu_destroy_by_type (obj, by_type);
+ }
+
+ c_avl_destroy (obj->by_type_tree);
+ obj->by_type_tree = NULL;
+
+ sfree (obj);
+} /* }}} void lookup_destroy */
+
+int lookup_add (lookup_t *obj, /* {{{ */
+ identifier_t const *ident, void *user_class)
+{
+ by_type_entry_t *by_type = NULL;
+ user_class_list_t *user_class_obj;
+
+ by_type = lu_search_by_type (obj, ident->type, /* allocate = */ 1);
+ if (by_type == NULL)
+ return (-1);
+
+ user_class_obj = malloc (sizeof (*user_class_obj));
+ if (user_class_obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (ENOMEM);
+ }
+ memset (user_class_obj, 0, sizeof (*user_class_obj));
+ user_class_obj->entry.user_class = user_class;
+ memmove (&user_class_obj->entry.ident, ident, sizeof (*ident));
+ user_class_obj->entry.user_obj_list = NULL;
+ user_class_obj->next = NULL;
+
+ return (lu_add_by_plugin (by_type, ident, user_class_obj));
+} /* }}} int lookup_add */
+
+/* returns the number of successful calls to the callback function */
+int lookup_search (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl)
+{
+ by_type_entry_t *by_type = NULL;
+ user_class_list_t *user_class_list = NULL;
+ int retval = 0;
+ int status;
+
+ if ((obj == NULL) || (ds == NULL) || (vl == NULL))
+ return (-EINVAL);
+
+ by_type = lu_search_by_type (obj, vl->type, /* allocate = */ 0);
+ if (by_type == NULL)
+ return (0);
+
+ status = c_avl_get (by_type->by_plugin_tree,
+ vl->plugin, (void *) &user_class_list);
+ if (status == 0)
+ {
+ status = lu_handle_user_class_list (obj, ds, vl, user_class_list);
+ if (status < 0)
+ return (status);
+ retval += status;
+ }
+
+ if (by_type->wildcard_plugin_list != NULL)
+ {
+ status = lu_handle_user_class_list (obj, ds, vl,
+ by_type->wildcard_plugin_list);
+ if (status < 0)
+ return (status);
+ retval += status;
+ }
+
+ return (retval);
+} /* }}} lookup_search */
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup.h
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#ifndef UTILS_VL_LOOKUP_H
+#define UTILS_VL_LOOKUP_H 1
+
+#include "plugin.h"
+
+/*
+ * Types
+ */
+struct lookup_s;
+typedef struct lookup_s lookup_t;
+
+/* Given a user_class, constructs a new user_obj. */
+typedef void *(*lookup_class_callback_t) (data_set_t const *ds,
+ value_list_t const *vl, void *user_class);
+
+/* Given a user_class and a ds/vl combination, does stuff with the data.
+ * This is the main working horse of the module. */
+typedef int (*lookup_obj_callback_t) (data_set_t const *ds,
+ value_list_t const *vl,
+ void *user_class, void *user_obj);
+
+/* Used to free user_class pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_class_callback_t) (void *user_class);
+
+/* Used to free user_obj pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_obj_callback_t) (void *user_obj);
+
+struct identifier_s
+{
+ char host[DATA_MAX_NAME_LEN];
+ char plugin[DATA_MAX_NAME_LEN];
+ char plugin_instance[DATA_MAX_NAME_LEN];
+ char type[DATA_MAX_NAME_LEN];
+ char type_instance[DATA_MAX_NAME_LEN];
+};
+typedef struct identifier_s identifier_t;
+
+#define LU_ANY "/any/"
+#define LU_ALL "/all/"
+
+#define LU_IS_ANY(str) (strcmp (str, LU_ANY) == 0)
+#define LU_IS_ALL(str) (strcmp (str, LU_ALL) == 0)
+#define LU_IS_WILDCARD(str) (LU_IS_ANY(str) || LU_IS_ALL(str))
+
+/*
+ * Functions
+ */
+__attribute__((nonnull(1,2)))
+lookup_t *lookup_create (lookup_class_callback_t,
+ lookup_obj_callback_t,
+ lookup_free_class_callback_t,
+ lookup_free_obj_callback_t);
+void lookup_destroy (lookup_t *obj);
+
+int lookup_add (lookup_t *obj,
+ identifier_t const *ident, void *user_class);
+
+/* TODO(octo): Pass lookup_obj_callback_t to lookup_search()? */
+int lookup_search (lookup_t *obj,
+ data_set_t const *ds, value_list_t const *vl);
+
+#endif /* UTILS_VL_LOOKUP_H */
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup_test.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "utils_vl_lookup.h"
+
+static _Bool expect_new_obj = 0;
+static _Bool have_new_obj = 0;
+
+static identifier_t last_class_ident;
+static identifier_t last_obj_ident;
+
+static data_source_t dsrc_test = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_test = { "test", 1, &dsrc_test };
+
+static data_source_t dsrc_unknown = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_unknown = { "unknown", 1, &dsrc_unknown };
+
+static int lookup_obj_callback (data_set_t const *ds,
+ value_list_t const *vl,
+ void *user_class, void *user_obj)
+{
+ identifier_t *class = user_class;
+ identifier_t *obj = user_obj;
+
+ assert (expect_new_obj == have_new_obj);
+
+ memcpy (&last_class_ident, class, sizeof (last_class_ident));
+ memcpy (&last_obj_ident, obj, sizeof (last_obj_ident));
+
+ if (strcmp (obj->plugin_instance, "failure") == 0)
+ return (-1);
+
+ return (0);
+}
+
+static void *lookup_class_callback (data_set_t const *ds,
+ value_list_t const *vl, void *user_class)
+{
+ identifier_t *class = user_class;
+ identifier_t *obj;
+
+ assert (expect_new_obj);
+
+ memcpy (&last_class_ident, class, sizeof (last_class_ident));
+
+ obj = malloc (sizeof (*obj));
+ strncpy (obj->host, vl->host, sizeof (obj->host));
+ strncpy (obj->plugin, vl->plugin, sizeof (obj->plugin));
+ strncpy (obj->plugin_instance, vl->plugin_instance, sizeof (obj->plugin_instance));
+ strncpy (obj->type, vl->type, sizeof (obj->type));
+ strncpy (obj->type_instance, vl->type_instance, sizeof (obj->type_instance));
+
+ have_new_obj = 1;
+
+ return ((void *) obj);
+}
+
+static void checked_lookup_add (lookup_t *obj, /* {{{ */
+ char const *host,
+ char const *plugin, char const *plugin_instance,
+ char const *type, char const *type_instance)
+{
+ identifier_t ident;
+ void *user_class;
+ int status;
+
+ memset (&ident, 0, sizeof (ident));
+ strncpy (ident.host, host, sizeof (ident.host));
+ strncpy (ident.plugin, plugin, sizeof (ident.plugin));
+ strncpy (ident.plugin_instance, plugin_instance, sizeof (ident.plugin_instance));
+ strncpy (ident.type, type, sizeof (ident.type));
+ strncpy (ident.type_instance, type_instance, sizeof (ident.type_instance));
+
+ user_class = malloc (sizeof (ident));
+ memmove (user_class, &ident, sizeof (ident));
+
+ status = lookup_add (obj, &ident, user_class);
+ assert (status == 0);
+} /* }}} void test_add */
+
+static int checked_lookup_search (lookup_t *obj,
+ char const *host,
+ char const *plugin, char const *plugin_instance,
+ char const *type, char const *type_instance,
+ _Bool expect_new)
+{
+ int status;
+ value_list_t vl = VALUE_LIST_STATIC;
+ data_set_t const *ds = &ds_unknown;
+
+ strncpy (vl.host, host, sizeof (vl.host));
+ strncpy (vl.plugin, plugin, sizeof (vl.plugin));
+ strncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
+ strncpy (vl.type, type, sizeof (vl.type));
+ strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+ if (strcmp (vl.type, "test") == 0)
+ ds = &ds_test;
+
+ expect_new_obj = expect_new;
+ have_new_obj = 0;
+
+ status = lookup_search (obj, ds, &vl);
+ return (status);
+}
+
+static lookup_t *checked_lookup_create (void)
+{
+ lookup_t *obj = lookup_create (
+ lookup_class_callback,
+ lookup_obj_callback,
+ (void *) free,
+ (void *) free);
+ assert (obj != NULL);
+ return (obj);
+}
+
+static void testcase0 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+
+ checked_lookup_add (obj, "/any/", "test", "", "test", "/all/");
+ checked_lookup_search (obj, "host0", "test", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host0", "test", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "test", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host1", "test", "", "test", "1",
+ /* expect new = */ 0);
+
+ lookup_destroy (obj);
+}
+
+static void testcase1 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+
+ checked_lookup_add (obj, "/any/", "/all/", "/all/", "test", "/all/");
+ checked_lookup_search (obj, "host0", "plugin0", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host0", "plugin0", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host0", "plugin1", "", "test", "0",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host0", "plugin1", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin0", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host1", "plugin0", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin1", "", "test", "0",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin1", "", "test", "1",
+ /* expect new = */ 0);
+
+ lookup_destroy (obj);
+}
+
+static void testcase2 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+ int status;
+
+ checked_lookup_add (obj, "/any/", "plugin0", "", "test", "/all/");
+ checked_lookup_add (obj, "/any/", "/all/", "", "test", "ti0");
+
+ status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "",
+ /* expect new = */ 0);
+ assert (status == 0);
+ status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "",
+ /* expect new = */ 1);
+ assert (status == 1);
+ status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "ti0",
+ /* expect new = */ 1);
+ assert (status == 1);
+ status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "ti0",
+ /* expect new = */ 0);
+ assert (status == 2);
+
+ lookup_destroy (obj);
+}
+
+int main (int argc, char **argv) /* {{{ */
+{
+ testcase0 ();
+ testcase1 ();
+ testcase2 ();
+ return (EXIT_SUCCESS);
+} /* }}} int main */