Merge branch 'pr/2056'
authorFlorian Forster <octo@collectd.org>
Mon, 28 Nov 2016 08:03:12 +0000 (09:03 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 28 Nov 2016 08:03:12 +0000 (09:03 +0100)
22 files changed:
src/Makefile.am
src/amqp.c
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/Makefile.am
src/daemon/utils_match.c
src/daemon/utils_match.h
src/daemon/utils_tail_match.c
src/daemon/utils_tail_match.h
src/daemon/utils_time.h
src/tail.c
src/types.db
src/utils_format_graphite.c
src/utils_format_graphite.h
src/utils_format_graphite_test.c
src/utils_latency.c
src/utils_latency.h
src/utils_latency_config.c [new file with mode: 0644]
src/utils_latency_config.h [new file with mode: 0644]
src/utils_latency_test.c
src/write_graphite.c
src/write_kafka.c

index fe2d2a8..a01176d 100644 (file)
@@ -52,7 +52,8 @@ test_format_json_LDADD = libformat_json.la daemon/libmetadata.la daemon/libplugi
 endif
 
 noinst_LTLIBRARIES += liblatency.la
-liblatency_la_SOURCES = utils_latency.c utils_latency.h
+liblatency_la_SOURCES = utils_latency.c utils_latency.h utils_latency_config.c utils_latency_config.h
+liblatency_la_LIBADD = daemon/libcommon.la
 check_PROGRAMS += test_utils_latency
 TESTS += test_utils_latency
 test_utils_latency_SOURCES = utils_latency_test.c testing.h
@@ -1115,6 +1116,7 @@ if BUILD_PLUGIN_TAIL
 pkglib_LTLIBRARIES += tail.la
 tail_la_SOURCES = tail.c
 tail_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+tail_la_LIBADD = liblatency.la
 endif
 
 if BUILD_PLUGIN_TAIL_CSV
index 4089fc3..b237ba3 100644 (file)
@@ -1015,6 +1015,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish)
             status = cf_util_get_flag (child, &conf->graphite_flags,
                     GRAPHITE_ALWAYS_APPEND_DS);
+        else if ((strcasecmp ("GraphitePreserveSeparator", child->key) == 0) && publish)
+            status = cf_util_get_flag (child, &conf->graphite_flags,
+                    GRAPHITE_PRESERVE_SEPARATOR);
         else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
             status = cf_util_get_string (child, &conf->prefix);
         else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
index 8ad24d6..e5b9643 100644 (file)
 #      Instance "local_user"
 #    </Match>
 #  </File>
+#  <File "/var/log/nginx/apache-time.log">
+#    #Use the following log format in nginx:
+#    #log_format response_time '[$host] "$upstream_response_time" ...'
+#    Instance "apache"
+#    <Match>
+#      Regex "^\\S+ \"([0-9.]+)\""
+#      <DSType Distribution>
+#        Percentile 80    # -> latency-foo-80
+#        Percentile 95    # -> latency-foo-95
+#        Percentile 99    # -> latency-foo-99
+#        Bucket 0   0.1   # -> bucket-latency-foo-0_0.1
+#        Bucket 0.1 0.2   # -> bucket-latency-foo-0.1_0.2
+#        Bucket 0.2 0.5   # -> bucket-latency-foo-0.2_0.5
+#        Bucket 0.5 1.0   # -> bucket-latency-foo-0.5_1
+#        Bucket 1.0 2.0   # -> bucket-latency-foo-1_2
+#        Bucket 2.0 0     # -> bucket-latency-foo-2_inf
+#      </DSType>
+#      Type "latency"
+#      Instance "foo"
+#    </Match>
+#  </File>
 #</Plugin>
 
 #<Plugin tail_csv>
 #    AlwaysAppendDS false
 #    EscapeCharacter "_"
 #    SeparateInstances false
+#    PreserveSeparator false
 #    DropDuplicateFields false
 #  </Node>
 #</Plugin>
index ffcddd2..27c4e16 100644 (file)
@@ -555,6 +555,7 @@ B<Synopsis:>
  #   GraphiteEscapeChar "_"
  #   GraphiteSeparateInstances false
  #   GraphiteAlwaysAppendDS false
+ #   GraphitePreserveSeparator false
    </Publish>
 
    # Receive values from an AMQP broker
@@ -729,6 +730,12 @@ If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
 identifier. If set to B<false> (the default), this is only done when there is
 more than one DS.
 
+=item B<GraphitePreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<GraphiteEscapeChar>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
 =back
 
 =head2 Plugin C<apache>
@@ -7082,6 +7089,15 @@ user using (extended) regular expressions, as described in L<regex(7)>.
         Type "counter"
         Instance "local_user"
       </Match>
+      <Match>
+        Regex "l=([0-9]*\\.[0-9]*)"
+        <DSType "Distribution">
+          Percentile 99
+          Bucket 0 100
+        </DSType>
+        Type "latency"
+        Instance "foo"
+      </Match>
     </File>
   </Plugin>
 
@@ -7180,14 +7196,74 @@ Increase the internal counter by one. These B<DSType> are the only ones that do
 not use the matched subexpression, but simply count the number of matched
 lines. Thus, you may use a regular expression without submatch in this case.
 
+=item B<Distribution>
+
+Type to do calculations based on the distribution of values, primarily
+calculating percentiles. This is primarily geared towards latency, but can be
+used for other metrics as well. The range of values tracked with this setting
+must be in the range (0–2^34) and can be fractional. Please note that neither
+zero nor 2^34 are inclusive bounds, i.e. zero I<cannot> be handled by a
+distribution.
+
+This option must be used together with the B<Percentile> and/or B<Bucket>
+options.
+
+B<Synopsis:>
+
+  <DSType "Distribution">
+    Percentile 99
+    Bucket 0 100
+  </DSType>
+
+=over 4
+
+=item B<Percentile> I<Percent>
+
+Calculate and dispatch the configured percentile, i.e. compute the value, so
+that I<Percent> of all matched values are smaller than or equal to the computed
+latency.
+
+Metrics are reported with the I<type> B<Type> (the value of the above option)
+and the I<type instance> C<[E<lt>InstanceE<gt>-]E<lt>PercentE<gt>>.
+
+This option may be repeated to calculate more than one percentile.
+
+=item B<Bucket> I<lower_bound> I<upper_bound>
+
+Export the number of values (a C<DERIVE>) falling within the given range. Both,
+I<lower_bound> and I<upper_bound> may be a fractional number, such as B<0.5>.
+Each B<Bucket> option specifies an interval C<(I<lower_bound>,
+I<upper_bound>]>, i.e. the range I<excludes> the lower bound and I<includes>
+the upper bound. I<lower_bound> and I<upper_bound> may be zero, meaning no
+lower/upper bound.
+
+To export the entire (0–inf) range without overlap, use the upper bound of the
+previous range as the lower bound of the following range. In other words, use
+the following schema:
+
+  Bucket   0   1
+  Bucket   1   2
+  Bucket   2   5
+  Bucket   5  10
+  Bucket  10  20
+  Bucket  20  50
+  Bucket  50   0
+
+Metrics are reported with the I<type> C<bucket> and the I<type instance>
+C<E<lt>TypeE<gt>[-E<lt>InstanceE<gt>]-E<lt>lower_boundE<gt>_E<lt>upper_boundE<gt>>.
+
+This option may be repeated to calculate more than one rate.
+
 =back
 
-As you'd expect the B<Gauge*> types interpret the submatch as a floating point
-number, using L<strtod(3)>. The B<Counter*> and B<AbsoluteSet> types interpret
-the submatch as an unsigned integer using L<strtoull(3)>. The B<Derive*> types
-interpret the submatch as a signed integer using L<strtoll(3)>. B<CounterInc>
-and B<DeriveInc> do not use the submatch at all and it may be omitted in this
-case.
+=back
+
+The B<Gauge*> and B<Distribution> types interpret the submatch as a floating
+point number, using L<strtod(3)>. The B<Counter*> and B<AbsoluteSet> types
+interpret the submatch as an unsigned integer using L<strtoull(3)>. The
+B<Derive*> types interpret the submatch as a signed integer using
+L<strtoll(3)>. B<CounterInc> and B<DeriveInc> do not use the submatch at all
+and it may be omitted in this case.
 
 =item B<Type> I<Type>
 
@@ -8021,6 +8097,12 @@ If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
 identifier. If set to B<false> (the default), this is only done when there is
 more than one DS.
 
+=item B<PreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<EscapeCharacter>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
 =item B<DropDuplicateFields> B<false>|B<true>
 
 If set to B<true>, detect and remove duplicate components in Graphite metric
@@ -8423,6 +8505,18 @@ path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
 default), the plugin and plugin instance (and likewise the type and type
 instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
 
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=item B<GraphitePreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<GraphiteEscapeChar>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
 =item B<StoreRates> B<true>|B<false>
 
 If set to B<true> (the default), convert counter values to rates. If set to
index cb62c64..ac8c764 100644 (file)
@@ -67,7 +67,9 @@ collectd_SOURCES = collectd.c collectd.h \
                   utils_tail.c utils_tail.h \
                   utils_time.c utils_time.h \
                   types_list.c types_list.h \
-                  utils_threshold.c utils_threshold.h
+                  utils_threshold.c utils_threshold.h \
+                  ../utils_latency_config.h ../utils_latency_config.c \
+                  ../utils_latency.h ../utils_latency.c
 
 
 collectd_CPPFLAGS =  $(AM_CPPFLAGS) $(LTDLINCL)
index 35102b6..d1be244 100644 (file)
@@ -33,7 +33,6 @@
 
 #include <regex.h>
 
-#define UTILS_MATCH_FLAGS_FREE_USER_DATA 0x01
 #define UTILS_MATCH_FLAGS_EXCLUDE_REGEX 0x02
 
 struct cu_match_s
@@ -45,6 +44,7 @@ struct cu_match_s
   int (*callback) (const char *str, char * const *matches, size_t matches_num,
       void *user_data);
   void *user_data;
+  void (*free) (void *user_data);
 };
 
 /*
@@ -99,6 +99,13 @@ static int default_callback (const char __attribute__((unused)) *str,
     if (matches[1] == endptr)
       return (-1);
 
+    if (data->ds_type & UTILS_MATCH_CF_GAUGE_DIST)
+    {
+      latency_counter_add(data->latency, DOUBLE_TO_CDTIME_T(value));
+      data->values_num++;
+      return (0);
+    }
+
     if ((data->values_num == 0)
        || (data->ds_type & UTILS_MATCH_CF_GAUGE_LAST)
        || (data->ds_type & UTILS_MATCH_CF_GAUGE_PERSIST))
@@ -226,13 +233,22 @@ static int default_callback (const char __attribute__((unused)) *str,
   return (0);
 } /* int default_callback */
 
+static void match_simple_free (void *data)
+{
+  cu_match_value_t *user_data = (cu_match_value_t *) data;
+  if (user_data->latency)
+    latency_counter_destroy(user_data->latency);
+
+  free (data);
+} /* void match_simple_free */
+
 /*
  * Public functions
  */
 cu_match_t *match_create_callback (const char *regex, const char *excluderegex,
                int (*callback) (const char *str,
                  char * const *matches, size_t matches_num, void *user_data),
-               void *user_data)
+               void *user_data, void (*free_user_data) (void *user_data))
 {
   cu_match_t *obj;
   int status;
@@ -266,6 +282,7 @@ cu_match_t *match_create_callback (const char *regex, const char *excluderegex,
 
   obj->callback = callback;
   obj->user_data = user_data;
+  obj->free = free_user_data;
 
   return (obj);
 } /* cu_match_t *match_create_callback */
@@ -281,16 +298,28 @@ cu_match_t *match_create_simple (const char *regex,
     return (NULL);
   user_data->ds_type = match_ds_type;
 
+  if ((match_ds_type & UTILS_MATCH_DS_TYPE_GAUGE)
+      && (match_ds_type & UTILS_MATCH_CF_GAUGE_DIST))
+  {
+    user_data->latency = latency_counter_create();
+    if (user_data->latency == NULL)
+    {
+      ERROR ("match_create_simple(): latency_counter_create() failed.");
+      free (user_data);
+      return (NULL);
+    }
+  }
+
   obj = match_create_callback (regex, excluderegex,
-                              default_callback, user_data);
+                              default_callback, user_data, match_simple_free);
   if (obj == NULL)
   {
+    if (user_data->latency)
+      latency_counter_destroy(user_data->latency);
+
     sfree (user_data);
     return (NULL);
   }
-
-  obj->flags |= UTILS_MATCH_FLAGS_FREE_USER_DATA;
-
   return (obj);
 } /* cu_match_t *match_create_simple */
 
@@ -313,10 +342,8 @@ void match_destroy (cu_match_t *obj)
   if (obj == NULL)
     return;
 
-  if (obj->flags & UTILS_MATCH_FLAGS_FREE_USER_DATA)
-  {
-    sfree (obj->user_data);
-  }
+  if ((obj->user_data != NULL) && (obj->free != NULL))
+      (*obj->free) (obj->user_data);
 
   sfree (obj);
 } /* void match_destroy */
index d43ae3b..7b263d7 100644 (file)
@@ -28,6 +28,7 @@
 #define UTILS_MATCH_H 1
 
 #include "plugin.h"
+#include "utils_latency.h"
 
 /*
  * Each type may have 12 sub-types
@@ -47,6 +48,7 @@
 #define UTILS_MATCH_CF_GAUGE_INC     0x10
 #define UTILS_MATCH_CF_GAUGE_ADD     0x20
 #define UTILS_MATCH_CF_GAUGE_PERSIST 0x40
+#define UTILS_MATCH_CF_GAUGE_DIST    0x80
 
 #define UTILS_MATCH_CF_COUNTER_SET   0x01
 #define UTILS_MATCH_CF_COUNTER_ADD   0x02
@@ -71,6 +73,7 @@ struct cu_match_value_s
   int ds_type;
   value_t value;
   unsigned int values_num;
+  latency_counter_t *latency;
 };
 typedef struct cu_match_value_s cu_match_value_t;
 
@@ -94,11 +97,13 @@ typedef struct cu_match_value_s cu_match_value_t;
  *  callback.
  *  The optional `excluderegex' allows to exclude the line from the match, if
  *  the excluderegex matches.
+ *  When `match_destroy' is called the `user_data' pointer is freed using
+ *  the `free_user_data' callback - if it is not NULL.
  */
 cu_match_t *match_create_callback (const char *regex, const char *excluderegex,
                int (*callback) (const char *str,
                  char * const *matches, size_t matches_num, void *user_data),
-               void *user_data);
+               void *user_data, void (*free_user_data) (void *user_data));
 
 /*
  * NAME
index 99d5dec..a0cbc11 100644 (file)
@@ -36,6 +36,7 @@
 #include "utils_match.h"
 #include "utils_tail.h"
 #include "utils_tail_match.h"
+#include "utils_latency_config.h"
 
 struct cu_tail_match_simple_s
 {
@@ -44,6 +45,7 @@ struct cu_tail_match_simple_s
   char type[DATA_MAX_NAME_LEN];
   char type_instance[DATA_MAX_NAME_LEN];
   cdtime_t interval;
+  latency_config_t latency_config;
 };
 typedef struct cu_tail_match_simple_s cu_tail_match_simple_t;
 
@@ -102,6 +104,77 @@ static int simple_submit_match (cu_match_t *match, void *user_data)
   return (0);
 } /* int simple_submit_match */
 
+static int latency_submit_match(cu_match_t *match, void *user_data) {
+  cu_tail_match_simple_t *data = (cu_tail_match_simple_t *)user_data;
+  cu_match_value_t *match_value;
+  value_list_t vl = VALUE_LIST_INIT;
+
+  match_value = (cu_match_value_t *)match_get_user_data(match);
+  if (match_value == NULL)
+    return (-1);
+
+  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
+  sstrncpy(vl.plugin, data->plugin, sizeof(vl.plugin));
+  sstrncpy(vl.plugin_instance, data->plugin_instance,
+           sizeof(vl.plugin_instance));
+  vl.interval = data->interval;
+  vl.time = cdtime();
+
+  /* Submit percentiles */
+  sstrncpy(vl.type, data->type, sizeof(vl.type));
+  for (size_t i = 0; i < data->latency_config.percentile_num; i++) {
+    if (strlen(data->type_instance) != 0)
+      ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-%.0f",
+                data->type_instance, data->latency_config.percentile[i]);
+    else
+      ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%.0f",
+                data->latency_config.percentile[i]);
+
+    vl.values = &(value_t){
+        .gauge =
+            (match_value->values_num != 0)
+                ? CDTIME_T_TO_DOUBLE(latency_counter_get_percentile(
+                      match_value->latency, data->latency_config.percentile[i]))
+                : NAN,
+    };
+    vl.values_len = 1;
+
+    plugin_dispatch_values(&vl);
+  }
+
+  /* Submit buckets */
+  sstrncpy(vl.type, "bucket", sizeof(vl.type));
+  for (size_t i = 0; i < data->latency_config.buckets_num; i++) {
+    latency_bucket_t bucket = data->latency_config.buckets[i];
+
+    double lower_bound = CDTIME_T_TO_DOUBLE(bucket.lower_bound);
+    double upper_bound =
+        bucket.upper_bound ? CDTIME_T_TO_DOUBLE(bucket.upper_bound) : INFINITY;
+
+    if (strlen(data->type_instance) != 0)
+      ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-%s-%g_%g",
+                data->type, data->type_instance, lower_bound, upper_bound);
+    else
+      ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-%g_%g",
+                data->type, lower_bound, upper_bound);
+
+    vl.values = &(value_t){
+        .gauge = latency_counter_get_rate(match_value->latency,
+                                          bucket.lower_bound,
+                                          bucket.upper_bound, vl.time),
+    };
+    vl.values_len = 1;
+
+    plugin_dispatch_values(&vl);
+  }
+
+  match_value->value.gauge = NAN;
+  match_value->values_num = 0;
+  latency_counter_reset(match_value->latency);
+
+  return (0);
+} /* int latency_submit_match */
+
 static int tail_callback (void *data, char *buf,
     int __attribute__((unused)) buflen)
 {
@@ -113,6 +186,13 @@ static int tail_callback (void *data, char *buf,
   return (0);
 } /* int tail_callback */
 
+static void tail_match_simple_free (void *data)
+{
+  cu_tail_match_simple_t *user_data = (cu_tail_match_simple_t *) data;
+  latency_config_free(user_data->latency_config);
+  sfree (user_data);
+} /* void tail_match_simple_free */
+
 /*
  * Public functions
  */
@@ -193,7 +273,9 @@ int tail_match_add_match (cu_tail_match_t *obj, cu_match_t *match,
 int tail_match_add_match_simple (cu_tail_match_t *obj,
     const char *regex, const char *excluderegex, int ds_type,
     const char *plugin, const char *plugin_instance,
-    const char *type, const char *type_instance, const cdtime_t interval)
+    const char *type, const char *type_instance,
+    const latency_config_t latency_cfg,
+    const cdtime_t interval)
 {
   cu_match_t *match;
   cu_tail_match_simple_t *user_data;
@@ -213,22 +295,38 @@ int tail_match_add_match_simple (cu_tail_match_t *obj,
   sstrncpy (user_data->plugin, plugin, sizeof (user_data->plugin));
   if (plugin_instance != NULL)
     sstrncpy (user_data->plugin_instance, plugin_instance,
-       sizeof (user_data->plugin_instance));
+        sizeof (user_data->plugin_instance));
 
   sstrncpy (user_data->type, type, sizeof (user_data->type));
   if (type_instance != NULL)
     sstrncpy (user_data->type_instance, type_instance,
-       sizeof (user_data->type_instance));
+        sizeof (user_data->type_instance));
 
   user_data->interval = interval;
 
-  status = tail_match_add_match (obj, match, simple_submit_match,
+  if ((ds_type & UTILS_MATCH_DS_TYPE_GAUGE)
+      && (ds_type & UTILS_MATCH_CF_GAUGE_DIST))
+  {
+    status = latency_config_copy(&user_data->latency_config, latency_cfg);
+    if (status != 0)
+    {
+      ERROR ("tail_match_add_match_simple: latency_config_copy() failed.");
+      status = -1;
+      goto out;
+    }
+
+    status = tail_match_add_match (obj, match, latency_submit_match,
+      user_data, tail_match_simple_free);
+  } else {
+    status = tail_match_add_match (obj, match, simple_submit_match,
       user_data, free);
+  }
 
+out:
   if (status != 0)
   {
+    tail_match_simple_free(user_data);
     match_destroy (match);
-    sfree (user_data);
   }
 
   return (status);
index 0404de2..ffb4999 100644 (file)
@@ -34,6 +34,7 @@
  */
 
 #include "utils_match.h"
+#include "utils_latency_config.h"
 
 struct cu_tail_match_s;
 typedef struct cu_tail_match_s cu_tail_match_t;
@@ -103,6 +104,7 @@ int tail_match_add_match (cu_tail_match_t *obj, cu_match_t *match,
  *  passed `plugin', `plugin_instance', `type', and `type_instance' are
  *  directly used when submitting these values.
  *  With excluderegex it is possible to exlude lines from the match.
+ *  The `latency_cfg' specifies configuration for submitting latency.
  *
  * RETURN VALUE
  *   Zero upon success, non-zero otherwise.
@@ -110,7 +112,8 @@ int tail_match_add_match (cu_tail_match_t *obj, cu_match_t *match,
 int tail_match_add_match_simple (cu_tail_match_t *obj,
     const char *regex, const char *excluderegex, int ds_type,
     const char *plugin, const char *plugin_instance,
-    const char *type, const char *type_instance, const cdtime_t interval);
+    const char *type, const char *type_instance, const latency_config_t latency_cfg,
+    const cdtime_t interval);
 
 /*
  * NAME
index 2d83e0b..db510af 100644 (file)
@@ -88,8 +88,9 @@ extern cdtime_t cdtime_mock;
 
 #define CDTIME_T_TO_DOUBLE(t)                                                  \
   (double) { ((double)(t)) / 1073741824.0 }
+#define DOUBLE_TO_CDTIME_T_STATIC(d) ((cdtime_t)((d)*1073741824.0))
 #define DOUBLE_TO_CDTIME_T(d)                                                  \
-  (cdtime_t) { (cdtime_t)((d)*1073741824.0) }
+  (cdtime_t) { DOUBLE_TO_CDTIME_T_STATIC(d) }
 
 #define CDTIME_T_TO_TIMEVAL(t)                                                 \
   (struct timeval) {                                                           \
index 0ac8be7..ff76b72 100644 (file)
@@ -29,6 +29,7 @@
 #include "common.h"
 #include "plugin.h"
 #include "utils_tail_match.h"
+#include "utils_latency_config.h"
 
 /*
  *  <Plugin tail>
@@ -54,6 +55,7 @@ struct ctail_config_match_s
   char *type;
   char *type_instance;
   cdtime_t interval;
+  latency_config_t latency;
 };
 typedef struct ctail_config_match_s ctail_config_match_t;
 
@@ -70,54 +72,63 @@ static int ctail_config_add_match_dstype (ctail_config_match_t *cm,
     return (-1);
   }
 
-  if (strncasecmp ("Gauge", ci->values[0].value.string, strlen ("Gauge")) == 0)
+  char const *ds_type = ci->values[0].value.string;
+  if (strncasecmp ("Gauge", ds_type, strlen ("Gauge")) == 0)
   {
     cm->flags = UTILS_MATCH_DS_TYPE_GAUGE;
-    if (strcasecmp ("GaugeAverage", ci->values[0].value.string) == 0)
+    if (strcasecmp ("GaugeAverage", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_AVERAGE;
-    else if (strcasecmp ("GaugeMin", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("GaugeMin", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_MIN;
-    else if (strcasecmp ("GaugeMax", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("GaugeMax", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_MAX;
-    else if (strcasecmp ("GaugeLast", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("GaugeLast", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_LAST;
-    else if (strcasecmp ("GaugeInc", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("GaugeInc", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_INC;
-    else if (strcasecmp ("GaugeAdd", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("GaugeAdd", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_ADD;
     else if (strcasecmp ("GaugePersist", ci->values[0].value.string) == 0)
       cm->flags |= UTILS_MATCH_CF_GAUGE_PERSIST;
     else
       cm->flags = 0;
   }
-  else if (strncasecmp ("Counter", ci->values[0].value.string, strlen ("Counter")) == 0)
+  else if (strcasecmp ("Distribution", ds_type) == 0)
+  {
+    cm->flags = UTILS_MATCH_DS_TYPE_GAUGE | UTILS_MATCH_CF_GAUGE_DIST;
+
+    int status = latency_config (&cm->latency, ci, "tail");
+    if (status != 0)
+      return (status);
+  }
+  else if (strncasecmp ("Counter", ds_type, strlen ("Counter")) == 0)
   {
     cm->flags = UTILS_MATCH_DS_TYPE_COUNTER;
-    if (strcasecmp ("CounterSet", ci->values[0].value.string) == 0)
+    if (strcasecmp ("CounterSet", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_COUNTER_SET;
-    else if (strcasecmp ("CounterAdd", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("CounterAdd", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_COUNTER_ADD;
-    else if (strcasecmp ("CounterInc", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("CounterInc", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_COUNTER_INC;
     else
       cm->flags = 0;
   }
-  else if (strncasecmp ("Derive", ci->values[0].value.string, strlen ("Derive")) == 0)
+  else if (strncasecmp ("Derive", ds_type, strlen ("Derive")) == 0)
   {
     cm->flags = UTILS_MATCH_DS_TYPE_DERIVE;
-    if (strcasecmp ("DeriveSet", ci->values[0].value.string) == 0)
+    if (strcasecmp ("DeriveSet", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_DERIVE_SET;
-    else if (strcasecmp ("DeriveAdd", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("DeriveAdd", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_DERIVE_ADD;
-    else if (strcasecmp ("DeriveInc", ci->values[0].value.string) == 0)
+    else if (strcasecmp ("DeriveInc", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_DERIVE_INC;
     else
       cm->flags = 0;
   }
-  else if (strncasecmp ("Absolute", ci->values[0].value.string, strlen ("Absolute")) == 0)
+  else if (strncasecmp ("Absolute", ds_type, strlen ("Absolute")) == 0)
   {
     cm->flags = UTILS_MATCH_DS_TYPE_ABSOLUTE;
-    if (strcasecmp ("AbsoluteSet", ci->values[0].value.string) == 0)
+    if (strcasecmp ("AbsoluteSet", ds_type) == 0)
       cm->flags |= UTILS_MATCH_CF_ABSOLUTE_SET;
     else
       cm->flags = 0;
@@ -201,19 +212,20 @@ static int ctail_config_add_match (cu_tail_match_t *tm,
 
   if (status == 0)
   {
+    // TODO(octo): there's nothing "simple" about the latency stuff …
     status = tail_match_add_match_simple (tm, cm.regex, cm.excluderegex,
-       cm.flags, "tail", plugin_instance, cm.type, cm.type_instance, interval);
+      cm.flags, "tail", plugin_instance, cm.type, cm.type_instance,
+      cm.latency, interval);
 
     if (status != 0)
-    {
       ERROR ("tail plugin: tail_match_add_match_simple failed.");
-    }
   }
 
   sfree (cm.regex);
   sfree (cm.excluderegex);
   sfree (cm.type);
   sfree (cm.type_instance);
+  latency_config_free(cm.latency);
 
   return (status);
 } /* int ctail_config_add_match */
index cb7501e..6855187 100644 (file)
@@ -9,6 +9,7 @@ ath_stat                value:DERIVE:0:U
 backends                value:GAUGE:0:65535
 bitrate                 value:GAUGE:0:4294967295
 blocked_clients         value:GAUGE:0:U
+bucket                  value:GAUGE:0:U
 bytes                   value:GAUGE:0:U
 cache_eviction          value:DERIVE:0:U
 cache_operation         value:DERIVE:0:U
index 69c619f..85f5917 100644 (file)
@@ -83,7 +83,7 @@ static int gr_format_values (char *ret, size_t ret_len,
 }
 
 static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len,
-    char escape_char)
+    char escape_char, _Bool preserve_separator)
 {
     memset (dst, 0, dst_len);
 
@@ -98,7 +98,7 @@ static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len,
             break;
         }
 
-        if ((src[i] == '.')
+        if ((!preserve_separator && (src[i] == '.'))
                 || isspace ((int) src[i])
                 || iscntrl ((int) src[i]))
             dst[i] = escape_char;
@@ -130,16 +130,18 @@ static int gr_format_name (char *ret, int ret_len,
     if (postfix == NULL)
         postfix = "";
 
+    _Bool preserve_separator = (flags & GRAPHITE_PRESERVE_SEPARATOR) ? 1 : 0;
+
     gr_copy_escape_part (n_host, vl->host,
-            sizeof (n_host), escape_char);
+            sizeof (n_host), escape_char, preserve_separator);
     gr_copy_escape_part (n_plugin, vl->plugin,
-            sizeof (n_plugin), escape_char);
+            sizeof (n_plugin), escape_char, preserve_separator);
     gr_copy_escape_part (n_plugin_instance, vl->plugin_instance,
-            sizeof (n_plugin_instance), escape_char);
+            sizeof (n_plugin_instance), escape_char, preserve_separator);
     gr_copy_escape_part (n_type, vl->type,
-            sizeof (n_type), escape_char);
+            sizeof (n_type), escape_char, preserve_separator);
     gr_copy_escape_part (n_type_instance, vl->type_instance,
-            sizeof (n_type_instance), escape_char);
+            sizeof (n_type_instance), escape_char, preserve_separator);
 
     if (n_plugin_instance[0] != '\0')
         ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
index 5165f9e..ebc5080 100644 (file)
@@ -30,6 +30,7 @@
 #define GRAPHITE_SEPARATE_INSTANCES 0x02
 #define GRAPHITE_ALWAYS_APPEND_DS   0x04
 #define GRAPHITE_DROP_DUPE_FIELDS   0x08
+#define GRAPHITE_PRESERVE_SEPARATOR 0x10
 
 int format_graphite (char *buffer,
     size_t buffer_size, const data_set_t *ds,
index 30cdd7a..f349db1 100644 (file)
@@ -96,6 +96,19 @@ DEF_TEST(metric_name)
       .flags = GRAPHITE_ALWAYS_APPEND_DS,
       .want_name = "example@com.test-foo.single-bar.value",
     },
+    /* flag GRAPHITE_PRESERVE_SEPARATOR */
+    {
+      .plugin_instance = "f.o.o",
+      .type_instance = "b.a.r",
+      .flags = 0,
+      .want_name = "example@com.test-f@o@o.single-b@a@r",
+    },
+    {
+      .plugin_instance = "f.o.o",
+      .type_instance = "b.a.r",
+      .flags = GRAPHITE_PRESERVE_SEPARATOR,
+      .want_name = "example.com.test-f.o.o.single-b.a.r",
+    },
     /* prefix and suffix */
     {
       .prefix = "foo.",
index c67752a..5749f10 100644 (file)
 # define LLONG_MAX 9223372036854775807LL
 #endif
 
-#ifndef HISTOGRAM_NUM_BINS
-# define HISTOGRAM_NUM_BINS 1000
-#endif
-
 #ifndef HISTOGRAM_DEFAULT_BIN_WIDTH
 /* 1048576 = 2^20 ^= 1/1024 s */
 # define HISTOGRAM_DEFAULT_BIN_WIDTH 1048576
@@ -152,7 +148,7 @@ void latency_counter_add (latency_counter_t *lc, cdtime_t latency) /* {{{ */
   if (lc->max < latency)
     lc->max = latency;
 
-  /* A latency of _exactly_ 1.0 ms should be stored in the buffer 0, so
+  /* A latency of _exactly_ 1.0 ms is stored in the buffer 0, so
    * subtract one from the cdtime_t value so that exactly 1.0 ms get sorted
    * accordingly. */
   bin = (latency - 1) / lc->bin_width;
@@ -294,4 +290,64 @@ cdtime_t latency_counter_get_percentile (latency_counter_t *lc, /* {{{ */
   return (latency_interpolated);
 } /* }}} cdtime_t latency_counter_get_percentile */
 
+double latency_counter_get_rate(const latency_counter_t *lc, /* {{{ */
+                                cdtime_t lower, cdtime_t upper,
+                                const cdtime_t now) {
+  if ((lc == NULL) || (lc->num == 0))
+    return (NAN);
+
+  if (upper && (upper < lower))
+    return (NAN);
+  if (lower == upper)
+    return (0);
+
+  /* Buckets have an exclusive lower bound and an inclusive upper bound. That
+   * means that the first bucket, index 0, represents (0-bin_width]. That means
+   * that latency==bin_width needs to result in bin=0, that's why we need to
+   * subtract one before dividing by bin_width. */
+  cdtime_t lower_bin = 0;
+  if (lower)
+    /* lower is *exclusive* => determine bucket for lower+1 */
+    lower_bin = ((lower + 1) - 1) / lc->bin_width;
+
+  /* lower is greater than the longest latency observed => rate is zero. */
+  if (lower_bin >= HISTOGRAM_NUM_BINS)
+    return (0);
+
+  cdtime_t upper_bin = HISTOGRAM_NUM_BINS - 1;
+  if (upper)
+    upper_bin = (upper - 1) / lc->bin_width;
+
+  if (upper_bin >= HISTOGRAM_NUM_BINS) {
+    upper_bin = HISTOGRAM_NUM_BINS - 1;
+    upper = 0;
+  }
+
+  double sum = 0;
+  for (size_t i = lower_bin; i <= upper_bin; i++)
+    sum += lc->histogram[i];
+
+  if (lower) {
+    /* Approximate ratio of requests in lower_bin, that fall between
+     * lower_bin_boundary and lower. This ratio is then subtracted from sum to
+     * increase accuracy. */
+    cdtime_t lower_bin_boundary = lower_bin * lc->bin_width;
+    assert(lower >= lower_bin_boundary);
+    double lower_ratio =
+        (double)(lower - lower_bin_boundary) / ((double)lc->bin_width);
+    sum -= lower_ratio * lc->histogram[lower_bin];
+  }
+
+  if (upper) {
+    /* As above: approximate ratio of requests in upper_bin, that fall between
+     * upper and upper_bin_boundary. */
+    cdtime_t upper_bin_boundary = (upper_bin + 1) * lc->bin_width;
+    assert(upper <= upper_bin_boundary);
+    double ratio = (double)(upper_bin_boundary - upper) / (double)lc->bin_width;
+    sum -= ratio * lc->histogram[upper_bin];
+  }
+
+  return sum / (CDTIME_T_TO_DOUBLE(now - lc->start_time));
+} /* }}} double latency_counter_get_rate */
+
 /* vim: set sw=2 sts=2 et fdm=marker : */
index cb56b08..8fdf024 100644 (file)
 
 #include "utils_time.h"
 
+#ifndef HISTOGRAM_NUM_BINS
+# define HISTOGRAM_NUM_BINS 1000
+#endif
+
 struct latency_counter_s;
 typedef struct latency_counter_s latency_counter_t;
 
@@ -45,4 +49,18 @@ cdtime_t latency_counter_get_average (latency_counter_t *lc);
 cdtime_t latency_counter_get_percentile (latency_counter_t *lc,
     double percent);
 
+/*
+ * NAME
+ *  latency_counter_get_rate(counter,lower,upper,now)
+ *
+ * DESCRIPTION
+ *   Calculates rate of latency values fall within requested interval.
+ *   Interval specified as (lower,upper], i.e. the lower boundary is exclusive,
+ *   the upper boundary is inclusive.
+ *   When lower is zero, then the interval is (0, upper].
+ *   When upper is zero, then the interval is (lower, infinity).
+ */
+double latency_counter_get_rate (const latency_counter_t *lc,
+    cdtime_t lower, cdtime_t upper, const cdtime_t now);
+
 /* vim: set sw=2 sts=2 et : */
diff --git a/src/utils_latency_config.c b/src/utils_latency_config.c
new file mode 100644 (file)
index 0000000..133678e
--- /dev/null
@@ -0,0 +1,150 @@
+/**
+ * collectd - src/utils_latency_config.c
+ * Copyright (C) 2013-2016   Florian octo Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Pavel Rochnyack <pavel2000 at ngs.ru>
+ */
+
+#include "utils_latency_config.h"
+#include "collectd.h"
+#include "common.h"
+
+static int latency_config_add_percentile(latency_config_t *conf,
+                                         oconfig_item_t *ci,
+                                         const char *plugin) {
+  double percent;
+  int status = cf_util_get_double(ci, &percent);
+  if (status != 0)
+    return status;
+
+  if ((percent <= 0.0) || (percent >= 100)) {
+    ERROR("%s plugin: The value for \"%s\" must be between 0 and 100, "
+          "exclusively.",
+          plugin, ci->key);
+    return ERANGE;
+  }
+
+  double *tmp = realloc(conf->percentile,
+                        sizeof(*conf->percentile) * (conf->percentile_num + 1));
+  if (tmp == NULL) {
+    ERROR("%s plugin: realloc failed.", plugin);
+    return ENOMEM;
+  }
+  conf->percentile = tmp;
+  conf->percentile[conf->percentile_num] = percent;
+  conf->percentile_num++;
+
+  return 0;
+} /* int latency_config_add_percentile */
+
+static int latency_config_add_bucket(latency_config_t *conf, oconfig_item_t *ci,
+                                     const char *plugin) {
+  if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_NUMBER) ||
+      (ci->values[1].type != OCONFIG_TYPE_NUMBER)) {
+    ERROR("%s plugin: \"%s\" requires exactly two numeric arguments.", plugin,
+          ci->key);
+    return EINVAL;
+  }
+
+  if (ci->values[1].value.number &&
+      ci->values[1].value.number <= ci->values[0].value.number) {
+    ERROR("%s plugin: MIN must be less than MAX in \"%s\".", plugin, ci->key);
+    return ERANGE;
+  }
+
+  if (ci->values[0].value.number < 0) {
+    ERROR("%s plugin: MIN must be greater then or equal to zero in \"%s\".",
+          plugin, ci->key);
+    return ERANGE;
+  }
+
+  latency_bucket_t *tmp =
+      realloc(conf->buckets, sizeof(*conf->buckets) * (conf->buckets_num + 1));
+  if (tmp == NULL) {
+    ERROR("%s plugin: realloc failed.", plugin);
+    return ENOMEM;
+  }
+  conf->buckets = tmp;
+  conf->buckets[conf->buckets_num].lower_bound =
+      DOUBLE_TO_CDTIME_T(ci->values[0].value.number);
+  conf->buckets[conf->buckets_num].upper_bound =
+      DOUBLE_TO_CDTIME_T(ci->values[1].value.number);
+  conf->buckets_num++;
+
+  return (0);
+} /* int latency_config_add_bucket */
+
+int latency_config(latency_config_t *conf, oconfig_item_t *ci,
+                   char const *plugin) {
+  int status = 0;
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Percentile", child->key) == 0)
+      status = latency_config_add_percentile(conf, child, plugin);
+    else if (strcasecmp("Bucket", child->key) == 0)
+      status = latency_config_add_bucket(conf, child, plugin);
+    else
+      WARNING("%s plugin: \"%s\" is not a valid option within a \"%s\" block.",
+              plugin, child->key, ci->key);
+
+    if (status != 0)
+      return status;
+  }
+
+  if ((status == 0) && (conf->percentile_num == 0) &&
+      (conf->buckets_num == 0)) {
+    ERROR("%s plugin: The \"%s\" block must contain at least one "
+          "\"Percentile\" or \"Bucket\" option.",
+          plugin, ci->key);
+    return EINVAL;
+  }
+
+  return 0;
+}
+
+int latency_config_copy(latency_config_t *dst, const latency_config_t src) {
+  *dst = (latency_config_t){
+      .percentile_num = src.percentile_num, .buckets_num = src.buckets_num,
+  };
+
+  dst->percentile = calloc(dst->percentile_num, sizeof(*dst->percentile));
+  dst->buckets = calloc(dst->buckets_num, sizeof(*dst->buckets));
+
+  if ((dst->percentile == NULL) || (dst->buckets == NULL)) {
+    latency_config_free(*dst);
+    return ENOMEM;
+  }
+
+  memmove(dst->percentile, src.percentile,
+          dst->percentile_num * sizeof(*dst->percentile));
+  memmove(dst->buckets, src.buckets, dst->buckets_num * sizeof(*dst->buckets));
+
+  return 0;
+} /* int latency_config_copy */
+
+void latency_config_free(latency_config_t conf) {
+  sfree(conf.percentile);
+  sfree(conf.buckets);
+} /* void latency_config_free */
diff --git a/src/utils_latency_config.h b/src/utils_latency_config.h
new file mode 100644 (file)
index 0000000..9a7a11a
--- /dev/null
@@ -0,0 +1,62 @@
+/**
+ * collectd - src/utils_latency_config.c
+ * Copyright (C) 2013-2016   Florian octo Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Pavel Rochnyack <pavel2000 at ngs.ru>
+ */
+
+#ifndef UTILS_LATENCY_CONFIG_H
+#define UTILS_LATENCY_CONFIG_H 1
+
+#include "collectd.h"
+
+#include "liboconfig/oconfig.h"
+#include "utils_time.h"
+
+typedef struct {
+  cdtime_t lower_bound;
+  cdtime_t upper_bound;
+} latency_bucket_t;
+
+typedef struct {
+  double *percentile;
+  size_t percentile_num;
+
+  latency_bucket_t *buckets;
+  size_t buckets_num;
+
+  /*
+  _Bool lower;
+  _Bool upper;
+  _Bool avg;
+  */
+} latency_config_t;
+
+int latency_config(latency_config_t *conf, oconfig_item_t *ci,
+                   char const *plugin);
+
+int latency_config_copy(latency_config_t *dst, const latency_config_t src);
+
+void latency_config_free(latency_config_t conf);
+
+#endif /* UTILS_LATENCY_CONFIG_H */
index 9c3b0ad..01f194f 100644 (file)
@@ -24,7 +24,7 @@
  *   Florian octo Forster <octo at collectd.org>
  */
 
-#define DBL_PRECISION 1e-9
+#define DBL_PRECISION 1e-6
 
 #include "common.h" /* for STATIC_ARRAY_SIZE */
 #include "collectd.h"
@@ -96,10 +96,139 @@ DEF_TEST(percentile)
   return 0;
 }
 
+DEF_TEST (get_rate) {
+  /* We re-declare the struct here so we can inspect its content. */
+  struct {
+    cdtime_t start_time;
+    cdtime_t sum;
+    size_t num;
+    cdtime_t min;
+    cdtime_t max;
+    cdtime_t bin_width;
+    int histogram[HISTOGRAM_NUM_BINS];
+  } *peek;
+  latency_counter_t *l;
+
+  CHECK_NOT_NULL (l = latency_counter_create ());
+  peek = (void *) l;
+
+  for (time_t i = 1; i <= 125; i++) {
+    latency_counter_add (l, TIME_T_TO_CDTIME_T(i));
+  }
+
+  /* We expect a bucket width of 125ms. */
+  EXPECT_EQ_UINT64 (DOUBLE_TO_CDTIME_T(0.125), peek->bin_width);
+
+  struct {
+    size_t index;
+    int want;
+  } bucket_cases[] = {
+    { 0, 0}, /* (0.000-0.125] */
+    { 1, 0}, /* (0.125-0.250] */
+    { 2, 0}, /* (0.250-0.375] */
+    { 3, 0}, /* (0.375-0.500] */
+    { 4, 0}, /* (0.500-0.625] */
+    { 5, 0}, /* (0.625-0.750] */
+    { 6, 0}, /* (0.750-0.875] */
+    { 7, 1}, /* (0.875-1.000] */
+    { 8, 0}, /* (1.000-1.125] */
+    { 9, 0}, /* (1.125-1.250] */
+    {10, 0}, /* (1.250-1.375] */
+    {11, 0}, /* (1.375-1.500] */
+    {12, 0}, /* (1.500-1.625] */
+    {13, 0}, /* (1.625-1.750] */
+    {14, 0}, /* (1.750-1.875] */
+    {15, 1}, /* (1.875-2.000] */
+    {16, 0}, /* (2.000-2.125] */
+  };
+
+  for (size_t i = 0; i < STATIC_ARRAY_SIZE(bucket_cases); i++) {
+    size_t index = bucket_cases[i].index;
+    EXPECT_EQ_INT(bucket_cases[i].want, peek->histogram[index]);
+  }
+
+  struct {
+    cdtime_t lower_bound;
+    cdtime_t upper_bound;
+    double want;
+  } cases[] = {
+    { // bucket 6 is zero
+      DOUBLE_TO_CDTIME_T_STATIC(0.750),
+      DOUBLE_TO_CDTIME_T_STATIC(0.875),
+      0.00,
+    },
+    { // bucket 7 contains the t=1 update
+      DOUBLE_TO_CDTIME_T_STATIC(0.875),
+      DOUBLE_TO_CDTIME_T_STATIC(1.000),
+      1.00,
+    },
+    { // range: bucket 7 - bucket 15; contains the t=1 and t=2 updates
+      DOUBLE_TO_CDTIME_T_STATIC(0.875),
+      DOUBLE_TO_CDTIME_T_STATIC(2.000),
+      2.00,
+    },
+    { // lower bucket is only partially applied
+      DOUBLE_TO_CDTIME_T_STATIC(0.875 + (0.125 / 4)),
+      DOUBLE_TO_CDTIME_T_STATIC(2.000),
+      1.75,
+    },
+    { // upper bucket is only partially applied
+      DOUBLE_TO_CDTIME_T_STATIC(0.875),
+      DOUBLE_TO_CDTIME_T_STATIC(2.000 - (0.125 / 4)),
+      1.75,
+    },
+    { // both buckets are only partially applied
+      DOUBLE_TO_CDTIME_T_STATIC(0.875 + (0.125 / 4)),
+      DOUBLE_TO_CDTIME_T_STATIC(2.000 - (0.125 / 4)),
+      1.50,
+    },
+    { // lower bound is unspecified
+      0,
+      DOUBLE_TO_CDTIME_T_STATIC(2.000),
+      2.00,
+    },
+    { // upper bound is unspecified
+      DOUBLE_TO_CDTIME_T_STATIC(125.000 - 0.125),
+      0,
+      1.00,
+    },
+    { // overflow test: upper >> longest latency
+      DOUBLE_TO_CDTIME_T_STATIC(1.000),
+      DOUBLE_TO_CDTIME_T_STATIC(999999),
+      124.00,
+    },
+    { // overflow test: lower > longest latency
+      DOUBLE_TO_CDTIME_T_STATIC(130),
+      0,
+      0.00,
+    },
+    { // lower > upper => error
+      DOUBLE_TO_CDTIME_T_STATIC(10),
+      DOUBLE_TO_CDTIME_T_STATIC(9),
+      NAN,
+    },
+    { // lower == upper => zero
+      DOUBLE_TO_CDTIME_T_STATIC(9),
+      DOUBLE_TO_CDTIME_T_STATIC(9),
+      0.00,
+    },
+  };
+
+  for (size_t i = 0; i < STATIC_ARRAY_SIZE(cases); i++) {
+    cdtime_t now = peek->start_time + TIME_T_TO_CDTIME_T(1);
+    EXPECT_EQ_DOUBLE (cases[i].want,
+                      latency_counter_get_rate (l, cases[i].lower_bound, cases[i].upper_bound, now));
+  }
+
+  latency_counter_destroy (l);
+  return 0;
+}
+
 int main (void)
 {
   RUN_TEST(simple);
   RUN_TEST(percentile);
+  RUN_TEST(get_rate);
 
   END_TEST;
 }
index ad8dfce..9feb6b3 100644 (file)
@@ -566,6 +566,9 @@ static int wg_config_node (oconfig_item_t *ci)
         else if (strcasecmp ("AlwaysAppendDS", child->key) == 0)
             cf_util_get_flag (child, &cb->format_flags,
                     GRAPHITE_ALWAYS_APPEND_DS);
+        else if (strcasecmp ("PreserveSeparator", child->key) == 0)
+            cf_util_get_flag (child, &cb->format_flags,
+                    GRAPHITE_PRESERVE_SEPARATOR);
         else if (strcasecmp ("DropDuplicateFields", child->key) == 0)
             cf_util_get_flag (child, &cb->format_flags,
                     GRAPHITE_DROP_DUPE_FIELDS);
index 6018fea..a719cd3 100644 (file)
@@ -376,6 +376,10 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             status = cf_util_get_flag (child, &tctx->graphite_flags,
                                        GRAPHITE_ALWAYS_APPEND_DS);
 
+        } else if (strcasecmp ("GraphitePreserveSeparator", child->key) == 0) {
+            status = cf_util_get_flag (child, &tctx->graphite_flags,
+                                       GRAPHITE_PRESERVE_SEPARATOR);
+
         } else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
             status = cf_util_get_string (child, &tctx->prefix);
         } else if (strcasecmp ("GraphitePostfix", child->key) == 0) {