aggregation plugin: Make it possible to set parts of the identifier.
authorFlorian Forster <octo@collectd.org>
Mon, 26 Nov 2012 10:30:25 +0000 (11:30 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 26 Nov 2012 10:30:25 +0000 (11:30 +0100)
src/aggregation.c
src/collectd.conf.pod

index 7ca26ca..96dc337 100644 (file)
  **/
 
 #include "collectd.h"
+
+#include <pthread.h>
+
 #include "plugin.h"
 #include "common.h"
 #include "configfile.h"
 #include "meta_data.h"
 #include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_subst.h"
 #include "utils_vl_lookup.h"
 
-#include <pthread.h>
-
 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
+#define AGG_FUNC_PLACEHOLDER "%{aggregation}"
 
 struct aggregation_s /* {{{ */
 {
   identifier_t ident;
   unsigned int group_by;
 
+  unsigned int regex_fields;
+
+  char *set_host;
+  char *set_plugin;
+  char *set_plugin_instance;
+  char *set_type_instance;
+
   _Bool calc_num;
   _Bool calc_sum;
   _Bool calc_average;
@@ -81,6 +91,23 @@ static lookup_t *lookup = NULL;
 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
 static agg_instance_t *agg_instance_list_head = NULL;
 
+static _Bool agg_is_regex (char const *str) /* {{{ */
+{
+  size_t len;
+
+  if (str == NULL)
+    return (0);
+
+  len = strlen (str);
+  if (len < 3)
+    return (0);
+
+  if ((str[0] == '/') && (str[len - 1] == '/'))
+    return (1);
+  else
+    return (0);
+} /* }}} _Bool agg_is_regex */
+
 static void agg_destroy (aggregation_t *agg) /* {{{ */
 {
   sfree (agg);
@@ -119,6 +146,92 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
   inst->max = NAN;
 } /* }}} void agg_instance_destroy */
 
+static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
+    value_list_t const *vl, aggregation_t const *agg)
+{
+#define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
+  if (agg->set_ ## field != NULL) \
+    sstrncpy (buffer, agg->set_ ## field, buffer_size); \
+  else if ((agg->regex_fields & group_mask) \
+      && (agg->group_by & group_mask)) \
+    sstrncpy (buffer, vl->field, buffer_size); \
+  else if ((agg->regex_fields & group_mask) \
+      && (AGG_MATCHES_ALL (agg->ident.field))) \
+    sstrncpy (buffer, all_value, buffer_size); \
+  else \
+    sstrncpy (buffer, agg->ident.field, buffer_size); \
+} while (0)
+
+  /* Host */
+  COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
+      host, LU_GROUP_BY_HOST, "global");
+
+  /* Plugin */
+  if (agg->set_plugin != NULL)
+    sstrncpy (inst->ident.plugin, agg->set_plugin,
+        sizeof (inst->ident.plugin));
+  else
+    sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
+
+  /* Plugin instance */
+  if (agg->set_plugin_instance != NULL)
+    sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
+        sizeof (inst->ident.plugin_instance));
+  else
+  {
+    char tmp_plugin[DATA_MAX_NAME_LEN];
+    char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
+
+    if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+        && (agg->group_by & LU_GROUP_BY_PLUGIN))
+      sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
+    else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+        && (AGG_MATCHES_ALL (agg->ident.plugin)))
+      sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
+    else
+      sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
+
+    if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+        && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
+      sstrncpy (tmp_plugin_instance, vl->plugin_instance,
+          sizeof (tmp_plugin_instance));
+    else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+        && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
+      sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
+    else
+      sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
+          sizeof (tmp_plugin_instance));
+
+    if ((strcmp ("", tmp_plugin) == 0)
+        && (strcmp ("", tmp_plugin_instance) == 0))
+      sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
+          sizeof (inst->ident.plugin_instance));
+    else if (strcmp ("", tmp_plugin) != 0)
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
+    else if (strcmp ("", tmp_plugin_instance) != 0)
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+    else
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+  }
+
+  /* Type */
+  sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
+
+  /* Type instance */
+  COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
+      type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
+
+#undef COPY_FIELD
+
+  return (0);
+} /* }}} int agg_instance_create_name */
+
 /* Create a new aggregation instance. */
 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
     value_list_t const *vl, aggregation_t *agg)
@@ -138,19 +251,7 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
 
   inst->ds_type = ds->ds[0].type;
 
-#define COPY_FIELD(field, group_mask) do { \
-  sstrncpy (inst->ident.field, \
-      (agg->group_by & group_mask) ? vl->field : agg->ident.field, \
-      sizeof (inst->ident.field)); \
-} while (0)
-
-  COPY_FIELD (host, LU_GROUP_BY_HOST);
-  COPY_FIELD (plugin, LU_GROUP_BY_PLUGIN);
-  COPY_FIELD (plugin_instance, LU_GROUP_BY_PLUGIN_INSTANCE);
-  COPY_FIELD (type, /* group_mask = */ 0);
-  COPY_FIELD (type_instance, LU_GROUP_BY_TYPE_INSTANCE);
-
-#undef COPY_FIELD
+  agg_instance_create_name (inst, vl, agg);
 
   inst->min = NAN;
   inst->max = NAN;
@@ -243,8 +344,8 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
   int status;
 
   if (pi_prefix[0] != 0)
-    ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
-        pi_prefix, func);
+    subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
+        pi_prefix, AGG_FUNC_PLACEHOLDER, func);
   else
     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
 
@@ -277,7 +378,6 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
 {
   value_list_t vl = VALUE_LIST_INIT;
-  char pi_prefix[DATA_MAX_NAME_LEN];
 
   /* Pre-set all the fields in the value list that will not change per
    * aggregation type (sum, average, ...). The struct will be re-used and must
@@ -294,39 +394,16 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
   }
   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
 
-  if (AGG_MATCHES_ALL (inst->ident.host))
-    sstrncpy (vl.host, "global", sizeof (vl.host));
-  else
-    sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
-
-  sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
-
-  if (AGG_MATCHES_ALL (inst->ident.plugin))
-  {
-    if (AGG_MATCHES_ALL (inst->ident.plugin_instance))
-      sstrncpy (pi_prefix, "", sizeof (pi_prefix));
-    else
-      sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
-  }
-  else
-  {
-    if (AGG_MATCHES_ALL (inst->ident.plugin_instance))
-      sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
-    else
-      ssnprintf (pi_prefix, sizeof (pi_prefix),
-          "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
-  }
-
+  sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+  sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
-
-  if (!AGG_MATCHES_ALL (inst->ident.type_instance))
-    sstrncpy (vl.type_instance, inst->ident.type_instance,
-        sizeof (vl.type_instance));
+  sstrncpy (vl.type_instance, inst->ident.type_instance,
+      sizeof (vl.type_instance));
 
 #define READ_FUNC(func, rate) do { \
   if (inst->state_ ## func != NULL) { \
     agg_instance_read_func (inst, #func, rate, \
-        inst->state_ ## func, &vl, pi_prefix, t); \
+        inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
   } \
 } while (0)
 
@@ -486,6 +563,14 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp ("TypeInstance", child->key) == 0)
       cf_util_get_string_buffer (child, agg->ident.type_instance,
           sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("SetHost", child->key) == 0)
+      cf_util_get_string (child, &agg->set_host);
+    else if (strcasecmp ("SetPlugin", child->key) == 0)
+      cf_util_get_string (child, &agg->set_plugin);
+    else if (strcasecmp ("SetPluginInstance", child->key) == 0)
+      cf_util_get_string (child, &agg->set_plugin_instance);
+    else if (strcasecmp ("SetTypeInstance", child->key) == 0)
+      cf_util_get_string (child, &agg->set_type_instance);
     else if (strcasecmp ("GroupBy", child->key) == 0)
       agg_config_handle_group_by (child, agg);
     else if (strcasecmp ("CalculateNum", child->key) == 0)
@@ -505,6 +590,15 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
           "<Aggregation /> blocks and will be ignored.", child->key);
   }
 
+  if (agg_is_regex (agg->ident.host))
+    agg->regex_fields |= LU_GROUP_BY_HOST;
+  if (agg_is_regex (agg->ident.plugin))
+    agg->regex_fields |= LU_GROUP_BY_PLUGIN;
+  if (agg_is_regex (agg->ident.plugin_instance))
+    agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
+  if (agg_is_regex (agg->ident.type_instance))
+    agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
+
   /* Sanity checking */
   is_valid = 1;
   if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
@@ -525,15 +619,26 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
     is_valid = 0;
   } /* }}} */
 
-  if (!AGG_MATCHES_ALL (agg->ident.host) /* {{{ */
-      && !AGG_MATCHES_ALL (agg->ident.plugin)
-      && !AGG_MATCHES_ALL (agg->ident.plugin_instance)
-      && !AGG_MATCHES_ALL (agg->ident.type_instance))
+  /* Check that there is at least one regex field without a grouping. {{{ */
+  if ((agg->regex_fields & ~agg->group_by) == 0)
   {
     ERROR ("aggregation plugin: An aggregation must contain at least one "
         "wildcard. This is achieved by leaving at least one of the \"Host\", "
         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
-        "and not grouping by that field. "
+        "or using a regular expression and not grouping by that field. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  /* Check that all grouping fields are regular expressions. {{{ */
+  if (agg->group_by & ~agg->regex_fields)
+  {
+    ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
+        "regular expression is configured or which are left blank) can be "
+        "specified in the \"GroupBy\" option. "
         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
         "Type \"%s\", TypeInstance \"%s\")",
         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
index 1c8b7a4..57dc950 100644 (file)
@@ -308,6 +308,61 @@ extended regular expressions as described in L<regex(7)>. Example usage:
 Group valued by the specified field. The B<GroupBy> option may be repeated to
 group by multiple fields.
 
+=item B<SetHost> I<Host>
+
+=item B<SetPlugin> I<Plugin>
+
+=item B<SetPluginInstance> I<PluginInstance>
+
+=item B<SetTypeInstance> I<TypeInstance>
+
+Sets the appropriate part of the identifier to the provided string.
+
+The I<PluginInstance> should include the placeholder C<%{aggregation}> which
+will be replaced with the aggregation function, e.g. "average". Not including
+the placeholder will result in duplication warnings and/or messed up values if
+more than one aggregation function are enabled.
+
+The following example calculates the average usage of all "even" CPUs:
+
+ <Plugin "aggregation">
+   <Aggregation>
+     Plugin "cpu"
+     PluginInstance "/[0,2,4,6,8]$/"
+     Type "cpu"
+     
+     SetPlugin "cpu"
+     SetPluginInstance "even-%{aggregation}"
+     
+     GroupBy "Host"
+     GroupBy "TypeInstance"
+     
+     CalculateAverage true
+   </Aggregation>
+ </Plugin>
+
+This will create the files:
+
+=over 4
+
+=item
+
+foo.example.com/cpu-even-average/cpu-idle
+
+=item
+
+foo.example.com/cpu-even-average/cpu-system
+
+=item
+
+foo.example.com/cpu-even-average/cpu-user
+
+=item
+
+...
+
+=back
+
 =item B<CalculateNum> B<true>|B<false>
 
 =item B<CalculateSum> B<true>|B<false>