Merge branch 'ff/aggregate'
authorFlorian Forster <octo@collectd.org>
Sun, 11 Nov 2012 07:58:18 +0000 (08:58 +0100)
committerFlorian Forster <octo@collectd.org>
Sun, 11 Nov 2012 07:58:18 +0000 (08:58 +0100)
Conflicts:
src/Makefile.am

configure.in
src/Makefile.am
src/aggregation.c [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod
src/common.c
src/common.h
src/utils_vl_lookup.c [new file with mode: 0644]
src/utils_vl_lookup.h [new file with mode: 0644]
src/utils_vl_lookup_test.c [new file with mode: 0644]

index 2a187b2..8355107 100644 (file)
@@ -4853,6 +4853,7 @@ AC_ARG_ENABLE([all-plugins],
 
 m4_divert_once([HELP_ENABLE], [])
 
+AC_PLUGIN([aggregation], [yes],                [Aggregation plugin])
 AC_PLUGIN([amqp],        [$with_librabbitmq],  [AMQP output plugin])
 AC_PLUGIN([apache],      [$with_libcurl],      [Apache httpd statistics])
 AC_PLUGIN([apcups],      [yes],                [Statistics of UPSes by APC])
@@ -5185,6 +5186,7 @@ Configuration:
     perl  . . . . . . . . $with_perl_bindings
 
   Modules:
+    aggregation . . . . . $enable_aggregation
     amqp    . . . . . . . $enable_amqp
     apache  . . . . . . . $enable_apache
     apcups  . . . . . . . $enable_apcups
index 5bfe930..f4c699a 100644 (file)
@@ -119,6 +119,16 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES = 
 CLEANFILES = 
 
+if BUILD_PLUGIN_AGGREGATION
+pkglib_LTLIBRARIES += aggregation.la
+aggregation_la_SOURCES = aggregation.c \
+                         utils_vl_lookup.c utils_vl_lookup.h
+aggregation_la_LDFLAGS = -module -avoid-version
+aggregation_la_LIBADD =
+collectd_LDADD += "-dlopen" aggregation.la
+collectd_DEPENDENCIES += aggregation.la
+endif
+
 if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
@@ -1397,3 +1407,16 @@ uninstall-hook:
        rm -f $(DESTDIR)$(pkgdatadir)/types.db;
        rm -f $(DESTDIR)$(sysconfdir)/collectd.conf
        rm -f $(DESTDIR)$(pkgdatadir)/postgresql_default.conf;
+
+if BUILD_FEATURE_DEBUG
+bin_PROGRAMS += utils_vl_lookup_test
+utils_vl_lookup_test_SOURCES = utils_vl_lookup_test.c \
+                               utils_vl_lookup.h utils_vl_lookup.c \
+                               utils_avltree.c utils_avltree.h \
+                               common.h
+
+utils_vl_lookup_test_CPPFLAGS =  $(AM_CPPFLAGS) $(LTDLINCL) -DBUILD_TEST=1
+utils_vl_lookup_test_CFLAGS = $(AM_CFLAGS)
+utils_vl_lookup_test_LDFLAGS = -export-dynamic
+utils_vl_lookup_test_LDADD =
+endif
diff --git a/src/aggregation.c b/src/aggregation.c
new file mode 100644 (file)
index 0000000..e50557d
--- /dev/null
@@ -0,0 +1,676 @@
+/**
+ * collectd - src/aggregation.c
+ * Copyright (C) 2012       Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "meta_data.h"
+#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_vl_lookup.h"
+
+#include <pthread.h>
+
+struct aggregation_s /* {{{ */
+{
+  identifier_t ident;
+
+  _Bool calc_num;
+  _Bool calc_sum;
+  _Bool calc_average;
+  _Bool calc_min;
+  _Bool calc_max;
+  _Bool calc_stddev;
+}; /* }}} */
+typedef struct aggregation_s aggregation_t;
+
+struct agg_instance_s;
+typedef struct agg_instance_s agg_instance_t;
+struct agg_instance_s /* {{{ */
+{
+  pthread_mutex_t lock;
+  identifier_t ident;
+
+  int ds_type;
+
+  derive_t num;
+  gauge_t sum;
+  gauge_t squares_sum;
+
+  gauge_t min;
+  gauge_t max;
+
+  rate_to_value_state_t *state_num;
+  rate_to_value_state_t *state_sum;
+  rate_to_value_state_t *state_average;
+  rate_to_value_state_t *state_min;
+  rate_to_value_state_t *state_max;
+  rate_to_value_state_t *state_stddev;
+
+  agg_instance_t *next;
+}; /* }}} */
+
+static lookup_t *lookup = NULL;
+
+static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static agg_instance_t *agg_instance_list_head = NULL;
+
+static void agg_destroy (aggregation_t *agg) /* {{{ */
+{
+  sfree (agg);
+} /* }}} void agg_destroy */
+
+/* Frees all dynamically allocated memory within the instance. */
+static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
+{
+  if (inst == NULL)
+    return;
+
+  /* Remove this instance from the global list of instances. */
+  pthread_mutex_lock (&agg_instance_list_lock);
+  if (agg_instance_list_head == inst)
+    agg_instance_list_head = inst->next;
+  else if (agg_instance_list_head != NULL)
+  {
+    agg_instance_t *prev = agg_instance_list_head;
+    while ((prev != NULL) && (prev->next != inst))
+      prev = prev->next;
+    if (prev != NULL)
+      prev->next = inst->next;
+  }
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  sfree (inst->state_num);
+  sfree (inst->state_sum);
+  sfree (inst->state_average);
+  sfree (inst->state_min);
+  sfree (inst->state_max);
+  sfree (inst->state_stddev);
+
+  memset (inst, 0, sizeof (*inst));
+  inst->ds_type = -1;
+  inst->min = NAN;
+  inst->max = NAN;
+} /* }}} void agg_instance_destroy */
+
+/* Create a new aggregation instance. */
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl, aggregation_t *agg)
+{
+  agg_instance_t *inst;
+
+  DEBUG ("aggregation plugin: Creating new instance.");
+
+  inst = malloc (sizeof (*inst));
+  if (inst == NULL)
+  {
+    ERROR ("aggregation plugin: malloc() failed.");
+    return (NULL);
+  }
+  memset (inst, 0, sizeof (*inst));
+  pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+  inst->ds_type = ds->ds[0].type;
+
+#define COPY_FIELD(fld) do { \
+  sstrncpy (inst->ident.fld, \
+      LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
+      sizeof (inst->ident.fld)); \
+} while (0)
+
+  COPY_FIELD (host);
+  COPY_FIELD (plugin);
+  COPY_FIELD (plugin_instance);
+  COPY_FIELD (type);
+  COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+  inst->min = NAN;
+  inst->max = NAN;
+
+#define INIT_STATE(field) do { \
+  inst->state_ ## field = NULL; \
+  if (agg->calc_ ## field) { \
+    inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
+    if (inst->state_ ## field == NULL) { \
+      agg_instance_destroy (inst); \
+      ERROR ("aggregation plugin: malloc() failed."); \
+      return (NULL); \
+    } \
+    memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
+  } \
+} while (0)
+
+  INIT_STATE (num);
+  INIT_STATE (sum);
+  INIT_STATE (average);
+  INIT_STATE (min);
+  INIT_STATE (max);
+  INIT_STATE (stddev);
+
+#undef INIT_STATE
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+  inst->next = agg_instance_list_head;
+  agg_instance_list_head = inst;
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return (inst);
+} /* }}} agg_instance_t *agg_instance_create */
+
+/* Update the num, sum, min, max, ... fields of the aggregation instance, if
+ * the rate of the value list is available. Value lists with more than one data
+ * source are not supported and will return an error. Returns zero on success
+ * and non-zero otherwise. */
+static int agg_instance_update (agg_instance_t *inst, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl)
+{
+  gauge_t *rate;
+
+  if (ds->ds_num != 1)
+    return (-1);
+
+  rate = uc_get_rate (ds, vl);
+  if (rate == NULL)
+  {
+    ERROR ("aggregation plugin: uc_get_rate() failed.");
+    return (-1);
+  }
+
+  if (isnan (rate[0]))
+  {
+    sfree (rate);
+    return (0);
+  }
+
+  pthread_mutex_lock (&inst->lock);
+
+  inst->num++;
+  inst->sum += rate[0];
+  inst->squares_sum += (rate[0] * rate[0]);
+
+  if (isnan (inst->min) || (inst->min > rate[0]))
+    inst->min = rate[0];
+  if (isnan (inst->max) || (inst->max < rate[0]))
+    inst->max = rate[0];
+
+  pthread_mutex_unlock (&inst->lock);
+
+  sfree (rate);
+  return (0);
+} /* }}} int agg_instance_update */
+
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+  char const *func, gauge_t rate, rate_to_value_state_t *state,
+  value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+  value_t v;
+  int status;
+
+  if (pi_prefix[0] != 0)
+    ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+        pi_prefix, func);
+  else
+    sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+  memset (&v, 0, sizeof (v));
+  status = rate_to_value (&v, rate, state, inst->ds_type, t);
+  if (status != 0)
+  {
+    /* If this is the first iteration and rate_to_value() was asked to return a
+     * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+     * gracefully. */
+    if (status == EAGAIN)
+      return (0);
+
+    WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+        status);
+    return (-1);
+  }
+
+  vl->values = &v;
+  vl->values_len = 1;
+
+  plugin_dispatch_values_secure (vl);
+
+  vl->values = NULL;
+  vl->values_len = 0;
+
+  return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+  value_list_t vl = VALUE_LIST_INIT;
+  char pi_prefix[DATA_MAX_NAME_LEN];
+
+  /* Pre-set all the fields in the value list that will not change per
+   * aggregation type (sum, average, ...). The struct will be re-used and must
+   * therefore be dispatched using the "secure" function. */
+
+  vl.time = t;
+  vl.interval = 0;
+
+  vl.meta = meta_data_create ();
+  if (vl.meta == NULL)
+  {
+    ERROR ("aggregation plugin: meta_data_create failed.");
+    return (-1);
+  }
+  meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+  if (LU_IS_ALL (inst->ident.host))
+    sstrncpy (vl.host, "global", sizeof (vl.host));
+  else
+    sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+  sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
+
+  if (LU_IS_ALL (inst->ident.plugin))
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+    else
+      sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+  }
+  else
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+    else
+      ssnprintf (pi_prefix, sizeof (pi_prefix),
+          "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+  }
+
+  sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+  if (!LU_IS_ALL (inst->ident.type_instance))
+    sstrncpy (vl.type_instance, inst->ident.type_instance,
+        sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+  if (inst->state_ ## func != NULL) { \
+    agg_instance_read_func (inst, #func, rate, \
+        inst->state_ ## func, &vl, pi_prefix, t); \
+  } \
+} while (0)
+
+  pthread_mutex_lock (&inst->lock);
+
+  READ_FUNC (num, (gauge_t) inst->num);
+
+  /* All other aggregations are only defined when there have been any values
+   * at all. */
+  if (inst->num > 0)
+  {
+    READ_FUNC (sum, inst->sum);
+    READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+    READ_FUNC (min, inst->min);
+    READ_FUNC (max, inst->max);
+    READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+          - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+  }
+
+  /* Reset internal state. */
+  inst->num = 0;
+  inst->sum = 0.0;
+  inst->squares_sum = 0.0;
+  inst->min = NAN;
+  inst->max = NAN;
+
+  pthread_mutex_unlock (&inst->lock);
+
+  meta_data_destroy (vl.meta);
+  vl.meta = NULL;
+
+  return (0);
+} /* }}} int agg_instance_read */
+
+/* lookup_class_callback_t for utils_vl_lookup */
+static void *agg_lookup_class_callback ( /* {{{ */
+    __attribute__((unused)) data_set_t const *ds,
+    value_list_t const *vl, void *user_class)
+{
+  return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
+} /* }}} void *agg_class_callback */
+
+/* lookup_obj_callback_t for utils_vl_lookup */
+static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl,
+    __attribute__((unused)) void *user_class,
+    void *user_obj)
+{
+  return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
+} /* }}} int agg_lookup_obj_callback */
+
+/* lookup_free_class_callback_t for utils_vl_lookup */
+static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
+{
+  agg_destroy ((aggregation_t *) user_class);
+} /* }}} void agg_lookup_free_class_callback */
+
+/* lookup_free_obj_callback_t for utils_vl_lookup */
+static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
+{
+  agg_instance_destroy ((agg_instance_t *) user_obj);
+} /* }}} void agg_lookup_free_obj_callback */
+
+/*
+ * <Plugin "aggregation">
+ *   <Aggregation>
+ *     Plugin "cpu"
+ *     Type "cpu"
+ *
+ *     GroupBy Host
+ *     GroupBy TypeInstance
+ *
+ *     CalculateNum true
+ *     CalculateSum true
+ *     CalculateAverage true
+ *     CalculateMinimum true
+ *     CalculateMaximum true
+ *     CalculateStddev true
+ *   </Aggregation>
+ * </Plugin>
+ */
+static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
+    aggregation_t *agg)
+{
+  int i;
+
+  for (i = 0; i < ci->values_num; i++)
+  {
+    char const *value;
+
+    if (ci->values[i].type != OCONFIG_TYPE_STRING)
+    {
+      ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
+          "is not a string.", i + 1);
+      continue;
+    }
+
+    value = ci->values[i].value.string;
+
+    if (strcasecmp ("Host", value) == 0)
+      sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+    else if (strcasecmp ("Plugin", value) == 0)
+      sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+    else if (strcasecmp ("PluginInstance", value) == 0)
+      sstrncpy (agg->ident.plugin_instance, LU_ANY,
+          sizeof (agg->ident.plugin_instance));
+    else if (strcasecmp ("TypeInstance", value) == 0)
+      sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("Type", value) == 0)
+      ERROR ("aggregation plugin: Grouping by type is not supported.");
+    else
+      WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
+          "option is invalid and will be ignored.", value);
+  } /* for (ci->values) */
+
+  return (0);
+} /* }}} int agg_config_handle_group_by */
+
+static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
+{
+  aggregation_t *agg;
+  _Bool is_valid;
+  int status;
+  int i;
+
+  agg = malloc (sizeof (*agg));
+  if (agg == NULL)
+  {
+    ERROR ("aggregation plugin: malloc failed.");
+    return (-1);
+  }
+  memset (agg, 0, sizeof (*agg));
+
+  sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
+  sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
+  sstrncpy (agg->ident.plugin_instance, LU_ALL,
+      sizeof (agg->ident.plugin_instance));
+  sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
+  sstrncpy (agg->ident.type_instance, LU_ALL,
+      sizeof (agg->ident.type_instance));
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Host", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.host,
+          sizeof (agg->ident.host));
+    else if (strcasecmp ("Plugin", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin,
+          sizeof (agg->ident.plugin));
+    else if (strcasecmp ("PluginInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin_instance,
+          sizeof (agg->ident.plugin_instance));
+    else if (strcasecmp ("Type", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type,
+          sizeof (agg->ident.type));
+    else if (strcasecmp ("TypeInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type_instance,
+          sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("GroupBy", child->key) == 0)
+      agg_config_handle_group_by (child, agg);
+    else if (strcasecmp ("CalculateNum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_num);
+    else if (strcasecmp ("CalculateSum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_sum);
+    else if (strcasecmp ("CalculateAverage", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_average);
+    else if (strcasecmp ("CalculateMinimum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_min);
+    else if (strcasecmp ("CalculateMaximum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_max);
+    else if (strcasecmp ("CalculateStddev", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_stddev);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  /* Sanity checking */
+  is_valid = 1;
+  if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+  {
+    ERROR ("aggregation plugin: It appears you did not specify the required "
+        "\"Type\" option in this aggregation. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  }
+  else if (strchr (agg->ident.type, '/') != NULL)
+  {
+    ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
+        "character. Especially, it may not be a wildcard. The current "
+        "value is \"%s\".", agg->ident.type);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!LU_IS_ALL (agg->ident.host) /* {{{ */
+      && !LU_IS_ALL (agg->ident.plugin)
+      && !LU_IS_ALL (agg->ident.plugin_instance)
+      && !LU_IS_ALL (agg->ident.type_instance))
+  {
+    ERROR ("aggregation plugin: An aggregation must contain at least one "
+        "wildcard. This is achieved by leaving at least one of the \"Host\", "
+        "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
+        "and not grouping by that field. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
+      && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
+  {
+    ERROR ("aggregation plugin: No aggregation function has been specified. "
+        "Without this, I don't know what I should be calculating. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!is_valid) /* {{{ */
+  {
+    sfree (agg);
+    return (-1);
+  } /* }}} */
+
+  status = lookup_add (lookup, &agg->ident, agg);
+  if (status != 0)
+  {
+    ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
+    sfree (agg);
+    return (-1);
+  }
+
+  DEBUG ("aggregation plugin: Successfully added aggregation: "
+      "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+      "Type \"%s\", TypeInstance \"%s\")",
+      agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+      agg->ident.type, agg->ident.type_instance);
+  return (0);
+} /* }}} int agg_config_aggregation */
+
+static int agg_config (oconfig_item_t *ci) /* {{{ */
+{
+  int i;
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+
+  if (lookup == NULL)
+  {
+    lookup = lookup_create (agg_lookup_class_callback,
+        agg_lookup_obj_callback,
+        agg_lookup_free_class_callback,
+        agg_lookup_free_obj_callback);
+    if (lookup == NULL)
+    {
+      pthread_mutex_unlock (&agg_instance_list_lock);
+      ERROR ("aggregation plugin: lookup_create failed.");
+      return (-1);
+    }
+  }
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Aggregation", child->key) == 0)
+      agg_config_aggregation (child);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Plugin aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return (0);
+} /* }}} int agg_config */
+
+static int agg_read (void) /* {{{ */
+{
+  agg_instance_t *this;
+  cdtime_t t;
+  int success;
+
+  t = cdtime ();
+  success = 0;
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+
+  /* agg_instance_list_head only holds data, after the "write" callback has
+   * been called with a matching value list at least once. So on startup,
+   * there's a race between the aggregations read() and write() callback. If
+   * the read() callback is called first, agg_instance_list_head is NULL and
+   * "success" may be zero. This is expected and should not result in an error.
+   * Therefore we need to handle this case separately. */
+  if (agg_instance_list_head == NULL)
+  {
+    pthread_mutex_unlock (&agg_instance_list_lock);
+    return (0);
+  }
+
+  for (this = agg_instance_list_head; this != NULL; this = this->next)
+  {
+    int status;
+
+    status = agg_instance_read (this, t);
+    if (status != 0)
+      WARNING ("aggregation plugin: Reading an aggregation instance "
+          "failed with status %i.", status);
+    else
+      success++;
+  }
+
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return ((success > 0) ? 0 : -1);
+} /* }}} int agg_read */
+
+static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
+    __attribute__((unused)) user_data_t *user_data)
+{
+  _Bool created_by_aggregation = 0;
+  int status;
+
+  /* Ignore values that were created by the aggregation plugin to avoid weird
+   * effects. */
+  (void) meta_data_get_boolean (vl->meta, "aggregation:created",
+      &created_by_aggregation);
+  if (created_by_aggregation)
+    return (0);
+
+  if (lookup == NULL)
+    status = ENOENT;
+  else
+  {
+    status = lookup_search (lookup, ds, vl);
+    if (status > 0)
+      status = 0;
+  }
+
+  return (status);
+} /* }}} int agg_write */
+
+void module_register (void)
+{
+  plugin_register_complex_config ("aggregation", agg_config);
+  plugin_register_read ("aggregation", agg_read);
+  plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
+}
+
+/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */
index 37aceaf..9f5dedc 100644 (file)
@@ -52,6 +52,7 @@
 # to missing dependencies or because they have been deactivated explicitly.  #
 ##############################################################################
 
+#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
 #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 # ription of those options is available in the collectd.conf(5) manual page. #
 ##############################################################################
 
+#<Plugin "aggregation">
+#  <Aggregation>
+#    #Host "unspecified"
+#    Plugin "cpu"
+#    #PluginInstance "unspecified"
+#    Type "cpu"
+#    #TypeInstance "unspecified"
+#
+#    GroupBy "Host"
+#    GroupBy "TypeInstance"
+#
+#    CalculateNum false
+#    CalculateSum false
+#    CalculateAverage true
+#    CalculateMinimum false
+#    CalculateMaximum false
+#    CalculateStddev false
+#  </Aggregation>
+#</Plugin>
+
 #<Plugin "amqp">
 #  <Publish "name">
 #    Host "localhost"
index d7aba7b..f6f61c7 100644 (file)
@@ -193,12 +193,122 @@ C<Plugin>-Section. Which options exist depends on the plugin used. Some plugins
 require external configuration, too. The C<apache plugin>, for example,
 required C<mod_status> to be configured in the webserver you're going to
 collect data from. These plugins are listed below as well, even if they don't
-require any configuration within collectd's configfile.
+require any configuration within collectd's configuration file.
 
 A list of all plugins and a short summary for each plugin can be found in the
 F<README> file shipped with the sourcecode and hopefully binary packets as
 well.
 
+=head2 Plugin C<aggregation>
+
+The I<Aggregation plugin> makes it possible to aggregate several values into
+one using aggregation functions such as I<sum>, I<average>, I<min> and I<max>.
+This can be put to a wide variety of uses, e.g. average and total CPU
+statistics for your entire fleet.
+
+The grouping is powerful but, as with many powerful tools, may be a bit
+difficult to wrap your head around. The grouping will therefore be
+demonstrated using an example: The average and sum of the CPU usage across
+all CPUs of each host is to be calculated.
+
+To select all the affected values for our example, set C<Plugin cpu> and
+C<Type cpu>. The other values are left unspecified, meaning "all values". The
+I<Host>, I<Plugin>, I<PluginInstance>, I<Type> and I<TypeInstance> options
+work as if they were specified in the C<WHERE> clause of an C<SELECT> SQL
+statement.
+
+  Plugin "cpu"
+  Type "cpu"
+
+Although the I<Host>, I<PluginInstance> (CPU number, i.e. 0, 1, 2, ...)  and
+I<TypeInstance> (idle, user, system, ...) fields are left unspecified in the
+example, the intention is to have a new value for each host / type instance
+pair. This is achieved by "grouping" the values using the C<GroupBy> option.
+It can be specified multiple times to group by more than one field.
+
+  GroupBy "Host"
+  GroupBy "TypeInstance"
+
+We do neither specify nor group by I<plugin instance> (the CPU number), so all
+metrics that differ in the CPU number only will be aggregated. Each
+aggregation needs I<at least one> such field, otherwise no aggregation would
+take place.
+
+The full example configuration looks like this:
+
+ <Plugin "aggregation">
+   <Aggregation>
+     Plugin "cpu"
+     Type "cpu"
+     
+     GroupBy "Host"
+     GroupBy "TypeInstance"
+     
+     CalculateSum true
+     CalculateAverage true
+   </Aggregation>
+ </Plugin>
+
+There are a couple of limitations you should be aware of:
+
+=over 4
+
+=item
+
+The I<Type> cannot be left unspecified, because it is not reasonable to add
+apples to oranges. Also, the internal lookup structure won't work if you try
+to group by type.
+
+=item
+
+There must be at least one unspecified, ungrouped field. Otherwise nothing
+will be aggregated.
+
+=back
+
+As you can see in the example above, each aggregation has its own
+B<Aggregation> block. You can have multiple aggregation blocks and aggregation
+blocks may match the same values, i.e. one value list can update multiple
+aggregations. The following options are valid inside B<Aggregation> blocks:
+
+=over 4
+
+=item B<Host> I<Host>
+
+=item B<Plugin> I<Plugin>
+
+=item B<PluginInstance> I<PluginInstance>
+
+=item B<Type> I<Type>
+
+=item B<TypeInstance> I<TypeInstance>
+
+Selects the value lists to be added to this aggregation. B<Type> must be a
+valid data set name, see L<types.db(5)> for details.
+
+=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
+
+Group valued by the specified field. The B<GroupBy> option may be repeated to
+group by multiple fields.
+
+=item B<CalculateNum> B<true>|B<false>
+
+=item B<CalculateSum> B<true>|B<false>
+
+=item B<CalculateAverage> B<true>|B<false>
+
+=item B<CalculateMinimum> B<true>|B<false>
+
+=item B<CalculateMaximum> B<true>|B<false>
+
+=item B<CalculateStddev> B<true>|B<false>
+
+Boolean options for enabling calculation of the number of value lists, their
+sum, average, minimum, maximum andE<nbsp>/ or standard deviation. All options
+are disabled by default.
+
+=back
+
 =head2 Plugin C<amqp>
 
 The I<AMQMP plugin> can be used to communicate with other instances of
index 0bd9f68..b679bf7 100644 (file)
@@ -1229,7 +1229,102 @@ counter_t counter_diff (counter_t old_value, counter_t new_value)
        }
 
        return (diff);
-} /* counter_t counter_to_gauge */
+} /* counter_t counter_diff */
+
+int rate_to_value (value_t *ret_value, gauge_t rate, /* {{{ */
+               rate_to_value_state_t *state,
+               int ds_type, cdtime_t t)
+{
+       gauge_t delta_gauge;
+       cdtime_t delta_t;
+
+       if (ds_type == DS_TYPE_GAUGE)
+       {
+               state->last_value.gauge = rate;
+               state->last_time = t;
+
+               *ret_value = state->last_value;
+               return (0);
+       }
+
+       /* Counter and absolute can't handle negative rates. Reset "last time"
+        * to zero, so that the next valid rate will re-initialize the
+        * structure. */
+       if ((rate < 0.0)
+                       && ((ds_type == DS_TYPE_COUNTER)
+                               || (ds_type == DS_TYPE_ABSOLUTE)))
+       {
+               memset (state, 0, sizeof (*state));
+               return (EINVAL);
+       }
+
+       /* Another invalid state: The time is not increasing. */
+       if (t <= state->last_time)
+       {
+               memset (state, 0, sizeof (*state));
+               return (EINVAL);
+       }
+
+       delta_t = t - state->last_time;
+       delta_gauge = (rate * CDTIME_T_TO_DOUBLE (delta_t)) + state->residual;
+
+       /* Previous value is invalid. */
+       if (state->last_time == 0) /* {{{ */
+       {
+               if (ds_type == DS_TYPE_DERIVE)
+               {
+                       state->last_value.derive = (derive_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.derive);
+               }
+               else if (ds_type == DS_TYPE_COUNTER)
+               {
+                       state->last_value.counter = (counter_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.counter);
+               }
+               else if (ds_type == DS_TYPE_ABSOLUTE)
+               {
+                       state->last_value.absolute = (absolute_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.absolute);
+               }
+               else
+               {
+                       assert (23 == 42);
+               }
+
+               state->last_time = t;
+               return (EAGAIN);
+       } /* }}} */
+
+       if (ds_type == DS_TYPE_DERIVE)
+       {
+               derive_t delta_derive = (derive_t) delta_gauge;
+
+               state->last_value.derive += delta_derive;
+               state->residual = delta_gauge - ((gauge_t) delta_derive);
+       }
+       else if (ds_type == DS_TYPE_COUNTER)
+       {
+               counter_t delta_counter = (counter_t) delta_gauge;
+
+               state->last_value.counter += delta_counter;
+               state->residual = delta_gauge - ((gauge_t) delta_counter);
+       }
+       else if (ds_type == DS_TYPE_ABSOLUTE)
+       {
+               absolute_t delta_absolute = (absolute_t) delta_gauge;
+
+               state->last_value.absolute = delta_absolute;
+               state->residual = delta_gauge - ((gauge_t) delta_absolute);
+       }
+       else
+       {
+               assert (23 == 42);
+       }
+
+        state->last_time = t;
+       *ret_value = state->last_value;
+       return (0);
+} /* }}} value_t rate_to_value */
 
 int service_name_to_port_number (const char *service_name)
 {
index 6b11b53..8a7d986 100644 (file)
                || (strcasecmp ("no", (s)) == 0) \
                || (strcasecmp ("off", (s)) == 0))
 
+struct rate_to_value_state_s
+{
+  value_t last_value;
+  cdtime_t last_time;
+  gauge_t residual;
+};
+typedef struct rate_to_value_state_s rate_to_value_state_t;
+
 char *sstrncpy (char *dest, const char *src, size_t n);
 int ssnprintf (char *dest, size_t n, const char *format, ...);
 char *sstrdup(const char *s);
@@ -292,6 +300,15 @@ int read_file_contents (const char *filename, char *buf, int bufsize);
 
 counter_t counter_diff (counter_t old_value, counter_t new_value);
 
+/* Convert a rate back to a value_t. When converting to a derive_t, counter_t
+ * or absoltue_t, take fractional residuals into account. This is important
+ * when scaling counters, for example.
+ * Returns zero on success. Returns EAGAIN when called for the first time; in
+ * this case the value_t is invalid and the next call should succeed. Other
+ * return values indicate an error. */
+int rate_to_value (value_t *ret_value, gauge_t rate,
+               rate_to_value_state_t *state, int ds_type, cdtime_t t);
+
 /* Converts a service name (a string) to a port number
  * (in the range [1-65535]). Returns less than zero on error. */
 int service_name_to_port_number (const char *service_name);
diff --git a/src/utils_vl_lookup.c b/src/utils_vl_lookup.c
new file mode 100644 (file)
index 0000000..2dada24
--- /dev/null
@@ -0,0 +1,541 @@
+/**
+ * collectd - src/utils_vl_lookup.c
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "utils_vl_lookup.h"
+#include "utils_avltree.h"
+
+#if BUILD_TEST
+# define sstrncpy strncpy
+# define plugin_log(s, ...) do { \
+  printf ("[severity %i] ", s); \
+  printf (__VA_ARGS__); \
+  printf ("\n"); \
+} while (0)
+#endif
+
+/*
+ * Types
+ */
+struct lookup_s
+{
+  c_avl_tree_t *by_type_tree;
+
+  lookup_class_callback_t cb_user_class;
+  lookup_obj_callback_t cb_user_obj;
+  lookup_free_class_callback_t cb_free_class;
+  lookup_free_obj_callback_t cb_free_obj;
+};
+
+struct user_obj_s;
+typedef struct user_obj_s user_obj_t;
+struct user_obj_s
+{
+  void *user_obj;
+  identifier_t ident;
+
+  user_obj_t *next;
+};
+
+struct user_class_s
+{
+  void *user_class;
+  identifier_t ident;
+  user_obj_t *user_obj_list; /* list of user_obj */
+};
+typedef struct user_class_s user_class_t;
+
+struct user_class_list_s;
+typedef struct user_class_list_s user_class_list_t;
+struct user_class_list_s
+{
+  user_class_t entry;
+  user_class_list_t *next;
+};
+
+struct by_type_entry_s
+{
+  c_avl_tree_t *by_plugin_tree; /* plugin -> user_class_list_t */
+  user_class_list_t *wildcard_plugin_list;
+};
+typedef struct by_type_entry_s by_type_entry_t;
+
+/*
+ * Private functions
+ */
+static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_t *user_class)
+{
+  user_obj_t *user_obj;
+
+  user_obj = malloc (sizeof (*user_obj));
+  if (user_obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (NULL);
+  }
+  memset (user_obj, 0, sizeof (*user_obj));
+  user_obj->next = NULL;
+
+  user_obj->user_obj = obj->cb_user_class (ds, vl, user_class->user_class);
+  if (user_obj->user_obj == NULL)
+  {
+    sfree (user_obj);
+    WARNING("utils_vl_lookup: User-provided constructor failed.");
+    return (NULL);
+  }
+
+  sstrncpy (user_obj->ident.host,
+    LU_IS_ALL (user_class->ident.host) ?  "/all/" : vl->host,
+    sizeof (user_obj->ident.host));
+  sstrncpy (user_obj->ident.plugin,
+    LU_IS_ALL (user_class->ident.plugin) ?  "/all/" : vl->plugin,
+    sizeof (user_obj->ident.plugin));
+  sstrncpy (user_obj->ident.plugin_instance,
+    LU_IS_ALL (user_class->ident.plugin_instance) ?  "/all/" : vl->plugin_instance,
+    sizeof (user_obj->ident.plugin_instance));
+  sstrncpy (user_obj->ident.type,
+    LU_IS_ALL (user_class->ident.type) ?  "/all/" : vl->type,
+    sizeof (user_obj->ident.type));
+  sstrncpy (user_obj->ident.type_instance,
+    LU_IS_ALL (user_class->ident.type_instance) ?  "/all/" : vl->type_instance,
+    sizeof (user_obj->ident.type_instance));
+
+  if (user_class->user_obj_list == NULL)
+  {
+    user_class->user_obj_list = user_obj;
+  }
+  else
+  {
+    user_obj_t *last = user_class->user_obj_list;
+    while (last->next != NULL)
+      last = last->next;
+    last->next = user_obj;
+  }
+
+  return (user_obj);
+} /* }}} void *lu_create_user_obj */
+
+static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
+    value_list_t const *vl)
+{
+  user_obj_t *ptr;
+
+  for (ptr = user_class->user_obj_list;
+      ptr != NULL;
+      ptr = ptr->next)
+  {
+    if (!LU_IS_ALL (ptr->ident.host)
+        && (strcmp (ptr->ident.host, vl->host) != 0))
+      continue;
+    if (!LU_IS_ALL (ptr->ident.plugin_instance)
+        && (strcmp (ptr->ident.plugin_instance, vl->plugin_instance) != 0))
+      continue;
+    if (!LU_IS_ALL (ptr->ident.type_instance)
+        && (strcmp (ptr->ident.type_instance, vl->type_instance) != 0))
+      continue;
+
+    return (ptr);
+  }
+
+  return (NULL);
+} /* }}} user_obj_t *lu_find_user_obj */
+
+static int lu_handle_user_class (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_t *user_class)
+{
+  user_obj_t *user_obj;
+  int status;
+
+  assert (strcmp (vl->type, user_class->ident.type) == 0);
+  assert (LU_IS_WILDCARD (user_class->ident.plugin)
+      || (strcmp (vl->plugin, user_class->ident.plugin) == 0));
+
+  /* When we get here, type and plugin already match the user class. Now check
+   * the rest of the fields. */
+  if (!LU_IS_WILDCARD (user_class->ident.type_instance)
+      && (strcmp (vl->type_instance, user_class->ident.type_instance) != 0))
+    return (1);
+  if (!LU_IS_WILDCARD (user_class->ident.plugin_instance)
+      && (strcmp (vl->plugin_instance,
+          user_class->ident.plugin_instance) != 0))
+    return (1);
+  if (!LU_IS_WILDCARD (user_class->ident.host)
+      && (strcmp (vl->host, user_class->ident.host) != 0))
+    return (1);
+
+  user_obj = lu_find_user_obj (user_class, vl);
+  if (user_obj == NULL)
+  {
+    /* call lookup_class_callback_t() and insert into the list of user objects. */
+    user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+    if (user_obj == NULL)
+      return (-1);
+  }
+
+  status = obj->cb_user_obj (ds, vl,
+      user_class->user_class, user_obj->user_obj);
+  if (status != 0)
+  {
+    ERROR ("utils_vl_lookup: The user object callback failed with status %i.",
+        status);
+    /* Returning a negative value means: abort! */
+    if (status < 0)
+      return (status);
+    else
+      return (1);
+  }
+
+  return (0);
+} /* }}} int lu_handle_user_class */
+
+static int lu_handle_user_class_list (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_list_t *user_class_list)
+{
+  user_class_list_t *ptr;
+  int retval = 0;
+  
+  for (ptr = user_class_list; ptr != NULL; ptr = ptr->next)
+  {
+    int status;
+
+    status = lu_handle_user_class (obj, ds, vl, &ptr->entry);
+    if (status < 0)
+      return (status);
+    else if (status == 0)
+      retval++;
+  }
+
+  return (retval);
+} /* }}} int lu_handle_user_class_list */
+
+static by_type_entry_t *lu_search_by_type (lookup_t *obj, /* {{{ */
+    char const *type, _Bool allocate_if_missing)
+{
+  by_type_entry_t *by_type;
+  char *type_copy;
+  int status;
+
+  status = c_avl_get (obj->by_type_tree, type, (void *) &by_type);
+  if (status == 0)
+    return (by_type);
+
+  if (!allocate_if_missing)
+    return (NULL);
+
+  type_copy = strdup (type);
+  if (type_copy == NULL)
+  {
+    ERROR ("utils_vl_lookup: strdup failed.");
+    return (NULL);
+  }
+
+  by_type = malloc (sizeof (*by_type));
+  if (by_type == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    sfree (type_copy);
+    return (NULL);
+  }
+  memset (by_type, 0, sizeof (*by_type));
+  by_type->wildcard_plugin_list = NULL;
+  
+  by_type->by_plugin_tree = c_avl_create ((void *) strcmp);
+  if (by_type->by_plugin_tree == NULL)
+  {
+    ERROR ("utils_vl_lookup: c_avl_create failed.");
+    sfree (by_type);
+    sfree (type_copy);
+    return (NULL);
+  }
+
+  status = c_avl_insert (obj->by_type_tree,
+      /* key = */ type_copy, /* value = */ by_type);
+  assert (status <= 0); /* >0 => entry exists => race condition. */
+  if (status != 0)
+  {
+    ERROR ("utils_vl_lookup: c_avl_insert failed.");
+    c_avl_destroy (by_type->by_plugin_tree);
+    sfree (by_type);
+    sfree (type_copy);
+    return (NULL);
+  }
+  
+  return (by_type);
+} /* }}} by_type_entry_t *lu_search_by_type */
+
+static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
+    identifier_t const *ident, user_class_list_t *user_class_list)
+{
+  user_class_list_t *ptr = NULL;
+
+  /* Lookup user_class_list from the per-plugin structure. If this is the first
+   * user_class to be added, the blocks return immediately. Otherwise they will
+   * set "ptr" to non-NULL. */
+  if (LU_IS_WILDCARD (ident->plugin))
+  {
+    if (by_type->wildcard_plugin_list == NULL)
+    {
+      by_type->wildcard_plugin_list = user_class_list;
+      return (0);
+    }
+
+    ptr = by_type->wildcard_plugin_list;
+  } /* if (plugin is wildcard) */
+  else /* (plugin is not wildcard) */
+  {
+    int status;
+
+    status = c_avl_get (by_type->by_plugin_tree,
+        ident->plugin, (void *) &ptr);
+
+    if (status != 0) /* plugin not yet in tree */
+    {
+      char *plugin_copy = strdup (ident->plugin);
+
+      if (plugin_copy == NULL)
+      {
+        ERROR ("utils_vl_lookup: strdup failed.");
+        sfree (user_class_list);
+        return (ENOMEM);
+      }
+
+      status = c_avl_insert (by_type->by_plugin_tree,
+          plugin_copy, user_class_list);
+      if (status != 0)
+      {
+        ERROR ("utils_vl_lookup: c_avl_insert(\"%s\") failed with status %i.",
+            plugin_copy, status);
+        sfree (plugin_copy);
+        sfree (user_class_list);
+        return (status);
+      }
+      else
+      {
+        return (0);
+      }
+    } /* if (plugin not yet in tree) */
+  } /* if (plugin is not wildcard) */
+
+  assert (ptr != NULL);
+
+  while (ptr->next != NULL)
+    ptr = ptr->next;
+  ptr->next = user_class_list;
+
+  return (0);
+} /* }}} int lu_add_by_plugin */
+
+static void lu_destroy_user_obj (lookup_t *obj, /* {{{ */
+    user_obj_t *user_obj)
+{
+  while (user_obj != NULL)
+  {
+    user_obj_t *next = user_obj->next;
+
+    if (obj->cb_free_obj != NULL)
+      obj->cb_free_obj (user_obj->user_obj);
+    user_obj->user_obj = NULL;
+
+    sfree (user_obj);
+    user_obj = next;
+  }
+} /* }}} void lu_destroy_user_obj */
+
+static void lu_destroy_user_class_list (lookup_t *obj, /* {{{ */
+    user_class_list_t *user_class_list)
+{
+  while (user_class_list != NULL)
+  {
+    user_class_list_t *next = user_class_list->next;
+
+    if (obj->cb_free_class != NULL)
+      obj->cb_free_class (user_class_list->entry.user_class);
+    user_class_list->entry.user_class = NULL;
+
+    lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
+    user_class_list->entry.user_obj_list = NULL;
+
+    sfree (user_class_list);
+    user_class_list = next;
+  }
+} /* }}} void lu_destroy_user_class_list */
+
+static void lu_destroy_by_type (lookup_t *obj, /* {{{ */
+    by_type_entry_t *by_type)
+{
+  
+  while (42)
+  {
+    char *plugin = NULL;
+    user_class_list_t *user_class_list = NULL;
+    int status;
+
+    status = c_avl_pick (by_type->by_plugin_tree,
+        (void *) &plugin, (void *) &user_class_list);
+    if (status != 0)
+      break;
+
+    DEBUG ("utils_vl_lookup: lu_destroy_by_type: Destroying plugin \"%s\".",
+        plugin);
+    sfree (plugin);
+    lu_destroy_user_class_list (obj, user_class_list);
+  }
+
+  c_avl_destroy (by_type->by_plugin_tree);
+  by_type->by_plugin_tree = NULL;
+
+  lu_destroy_user_class_list (obj, by_type->wildcard_plugin_list);
+  by_type->wildcard_plugin_list = NULL;
+
+  sfree (by_type);
+} /* }}} int lu_destroy_by_type */
+
+/*
+ * Public functions
+ */
+lookup_t *lookup_create (lookup_class_callback_t cb_user_class, /* {{{ */
+    lookup_obj_callback_t cb_user_obj,
+    lookup_free_class_callback_t cb_free_class,
+    lookup_free_obj_callback_t cb_free_obj)
+{
+  lookup_t *obj = malloc (sizeof (*obj));
+  if (obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (NULL);
+  }
+  memset (obj, 0, sizeof (*obj));
+
+  obj->by_type_tree = c_avl_create ((void *) strcmp);
+  if (obj->by_type_tree == NULL)
+  {
+    ERROR ("utils_vl_lookup: c_avl_create failed.");
+    sfree (obj);
+    return (NULL);
+  }
+
+  obj->cb_user_class = cb_user_class;
+  obj->cb_user_obj = cb_user_obj;
+  obj->cb_free_class = cb_free_class;
+  obj->cb_free_obj = cb_free_obj;
+
+  return (obj);
+} /* }}} lookup_t *lookup_create */
+
+void lookup_destroy (lookup_t *obj) /* {{{ */
+{
+  int status;
+
+  if (obj == NULL)
+    return;
+
+  while (42)
+  {
+    char *type = NULL;
+    by_type_entry_t *by_type = NULL;
+
+    status = c_avl_pick (obj->by_type_tree, (void *) &type, (void *) &by_type);
+    if (status != 0)
+      break;
+
+    DEBUG ("utils_vl_lookup: lookup_destroy: Destroying type \"%s\".", type);
+    sfree (type);
+    lu_destroy_by_type (obj, by_type);
+  }
+
+  c_avl_destroy (obj->by_type_tree);
+  obj->by_type_tree = NULL;
+
+  sfree (obj);
+} /* }}} void lookup_destroy */
+
+int lookup_add (lookup_t *obj, /* {{{ */
+    identifier_t const *ident, void *user_class)
+{
+  by_type_entry_t *by_type = NULL;
+  user_class_list_t *user_class_obj;
+
+  by_type = lu_search_by_type (obj, ident->type, /* allocate = */ 1);
+  if (by_type == NULL)
+    return (-1);
+
+  user_class_obj = malloc (sizeof (*user_class_obj));
+  if (user_class_obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (ENOMEM);
+  }
+  memset (user_class_obj, 0, sizeof (*user_class_obj));
+  user_class_obj->entry.user_class = user_class;
+  memmove (&user_class_obj->entry.ident, ident, sizeof (*ident));
+  user_class_obj->entry.user_obj_list = NULL;
+  user_class_obj->next = NULL;
+
+  return (lu_add_by_plugin (by_type, ident, user_class_obj));
+} /* }}} int lookup_add */
+
+/* returns the number of successful calls to the callback function */
+int lookup_search (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl)
+{
+  by_type_entry_t *by_type = NULL;
+  user_class_list_t *user_class_list = NULL;
+  int retval = 0;
+  int status;
+
+  if ((obj == NULL) || (ds == NULL) || (vl == NULL))
+    return (-EINVAL);
+
+  by_type = lu_search_by_type (obj, vl->type, /* allocate = */ 0);
+  if (by_type == NULL)
+    return (0);
+
+  status = c_avl_get (by_type->by_plugin_tree,
+      vl->plugin, (void *) &user_class_list);
+  if (status == 0)
+  {
+    status = lu_handle_user_class_list (obj, ds, vl, user_class_list);
+    if (status < 0)
+      return (status);
+    retval += status;
+  }
+
+  if (by_type->wildcard_plugin_list != NULL)
+  {
+    status = lu_handle_user_class_list (obj, ds, vl,
+        by_type->wildcard_plugin_list);
+    if (status < 0)
+      return (status);
+    retval += status;
+  }
+    
+  return (retval);
+} /* }}} lookup_search */
diff --git a/src/utils_vl_lookup.h b/src/utils_vl_lookup.h
new file mode 100644 (file)
index 0000000..c006fc7
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * collectd - src/utils_vl_lookup.h
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#ifndef UTILS_VL_LOOKUP_H
+#define UTILS_VL_LOOKUP_H 1
+
+#include "plugin.h"
+
+/*
+ * Types
+ */
+struct lookup_s;
+typedef struct lookup_s lookup_t;
+
+/* Given a user_class, constructs a new user_obj. */
+typedef void *(*lookup_class_callback_t) (data_set_t const *ds,
+    value_list_t const *vl, void *user_class);
+
+/* Given a user_class and a ds/vl combination, does stuff with the data.
+ * This is the main working horse of the module. */
+typedef int (*lookup_obj_callback_t) (data_set_t const *ds,
+    value_list_t const *vl,
+    void *user_class, void *user_obj);
+
+/* Used to free user_class pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_class_callback_t) (void *user_class);
+
+/* Used to free user_obj pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_obj_callback_t) (void *user_obj);
+
+struct identifier_s
+{
+  char host[DATA_MAX_NAME_LEN];
+  char plugin[DATA_MAX_NAME_LEN];
+  char plugin_instance[DATA_MAX_NAME_LEN];
+  char type[DATA_MAX_NAME_LEN];
+  char type_instance[DATA_MAX_NAME_LEN];
+};
+typedef struct identifier_s identifier_t;
+
+#define LU_ANY "/any/"
+#define LU_ALL "/all/"
+
+#define LU_IS_ANY(str) (strcmp (str, LU_ANY) == 0)
+#define LU_IS_ALL(str) (strcmp (str, LU_ALL) == 0)
+#define LU_IS_WILDCARD(str) (LU_IS_ANY(str) || LU_IS_ALL(str))
+
+/*
+ * Functions
+ */
+__attribute__((nonnull(1,2)))
+lookup_t *lookup_create (lookup_class_callback_t,
+    lookup_obj_callback_t,
+    lookup_free_class_callback_t,
+    lookup_free_obj_callback_t);
+void lookup_destroy (lookup_t *obj);
+
+int lookup_add (lookup_t *obj,
+    identifier_t const *ident, void *user_class);
+
+/* TODO(octo): Pass lookup_obj_callback_t to lookup_search()? */
+int lookup_search (lookup_t *obj,
+    data_set_t const *ds, value_list_t const *vl);
+
+#endif /* UTILS_VL_LOOKUP_H */
diff --git a/src/utils_vl_lookup_test.c b/src/utils_vl_lookup_test.c
new file mode 100644 (file)
index 0000000..6265b32
--- /dev/null
@@ -0,0 +1,214 @@
+/**
+ * collectd - src/utils_vl_lookup_test.c
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "utils_vl_lookup.h"
+
+static _Bool expect_new_obj = 0;
+static _Bool have_new_obj = 0;
+
+static identifier_t last_class_ident;
+static identifier_t last_obj_ident;
+
+static data_source_t dsrc_test = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_test = { "test", 1, &dsrc_test };
+
+static data_source_t dsrc_unknown = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_unknown = { "unknown", 1, &dsrc_unknown };
+
+static int lookup_obj_callback (data_set_t const *ds,
+    value_list_t const *vl,
+    void *user_class, void *user_obj)
+{
+  identifier_t *class = user_class;
+  identifier_t *obj = user_obj;
+
+  assert (expect_new_obj == have_new_obj);
+
+  memcpy (&last_class_ident, class, sizeof (last_class_ident));
+  memcpy (&last_obj_ident, obj, sizeof (last_obj_ident));
+
+  if (strcmp (obj->plugin_instance, "failure") == 0)
+    return (-1);
+
+  return (0);
+}
+
+static void *lookup_class_callback (data_set_t const *ds,
+    value_list_t const *vl, void *user_class)
+{
+  identifier_t *class = user_class;
+  identifier_t *obj;
+
+  assert (expect_new_obj);
+
+  memcpy (&last_class_ident, class, sizeof (last_class_ident));
+  
+  obj = malloc (sizeof (*obj));
+  strncpy (obj->host, vl->host, sizeof (obj->host));
+  strncpy (obj->plugin, vl->plugin, sizeof (obj->plugin));
+  strncpy (obj->plugin_instance, vl->plugin_instance, sizeof (obj->plugin_instance));
+  strncpy (obj->type, vl->type, sizeof (obj->type));
+  strncpy (obj->type_instance, vl->type_instance, sizeof (obj->type_instance));
+
+  have_new_obj = 1;
+
+  return ((void *) obj);
+}
+
+static void checked_lookup_add (lookup_t *obj, /* {{{ */
+    char const *host,
+    char const *plugin, char const *plugin_instance,
+    char const *type, char const *type_instance)
+{
+  identifier_t ident;
+  void *user_class;
+  int status;
+
+  memset (&ident, 0, sizeof (ident));
+  strncpy (ident.host, host, sizeof (ident.host));
+  strncpy (ident.plugin, plugin, sizeof (ident.plugin));
+  strncpy (ident.plugin_instance, plugin_instance, sizeof (ident.plugin_instance));
+  strncpy (ident.type, type, sizeof (ident.type));
+  strncpy (ident.type_instance, type_instance, sizeof (ident.type_instance));
+
+  user_class = malloc (sizeof (ident));
+  memmove (user_class, &ident, sizeof (ident));
+
+  status = lookup_add (obj, &ident, user_class);
+  assert (status == 0);
+} /* }}} void test_add */
+
+static int checked_lookup_search (lookup_t *obj,
+    char const *host,
+    char const *plugin, char const *plugin_instance,
+    char const *type, char const *type_instance,
+    _Bool expect_new)
+{
+  int status;
+  value_list_t vl = VALUE_LIST_STATIC;
+  data_set_t const *ds = &ds_unknown;
+
+  strncpy (vl.host, host, sizeof (vl.host));
+  strncpy (vl.plugin, plugin, sizeof (vl.plugin));
+  strncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
+  strncpy (vl.type, type, sizeof (vl.type));
+  strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+  if (strcmp (vl.type, "test") == 0)
+    ds = &ds_test;
+
+  expect_new_obj = expect_new;
+  have_new_obj = 0;
+
+  status = lookup_search (obj, ds, &vl);
+  return (status);
+}
+
+static lookup_t *checked_lookup_create (void)
+{
+  lookup_t *obj = lookup_create (
+      lookup_class_callback,
+      lookup_obj_callback,
+      (void *) free,
+      (void *) free);
+  assert (obj != NULL);
+  return (obj);
+}
+
+static void testcase0 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+
+  checked_lookup_add (obj, "/any/", "test", "", "test", "/all/");
+  checked_lookup_search (obj, "host0", "test", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host0", "test", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "test", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host1", "test", "", "test", "1",
+      /* expect new = */ 0);
+
+  lookup_destroy (obj);
+}
+
+static void testcase1 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+
+  checked_lookup_add (obj, "/any/", "/all/", "/all/", "test", "/all/");
+  checked_lookup_search (obj, "host0", "plugin0", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host0", "plugin0", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host0", "plugin1", "", "test", "0",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host0", "plugin1", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin0", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host1", "plugin0", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin1", "", "test", "0",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin1", "", "test", "1",
+      /* expect new = */ 0);
+
+  lookup_destroy (obj);
+}
+
+static void testcase2 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+  int status;
+
+  checked_lookup_add (obj, "/any/", "plugin0", "", "test", "/all/");
+  checked_lookup_add (obj, "/any/", "/all/", "", "test", "ti0");
+
+  status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "",
+      /* expect new = */ 0);
+  assert (status == 0);
+  status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "",
+      /* expect new = */ 1);
+  assert (status == 1);
+  status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "ti0",
+      /* expect new = */ 1);
+  assert (status == 1);
+  status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "ti0",
+      /* expect new = */ 0);
+  assert (status == 2);
+
+  lookup_destroy (obj);
+}
+
+int main (int argc, char **argv) /* {{{ */
+{
+  testcase0 ();
+  testcase1 ();
+  testcase2 ();
+  return (EXIT_SUCCESS);
+} /* }}} int main */