From d53b119c992b580e7e11cf8d17480b91ab101111 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Mon, 17 Jun 2013 16:47:24 +0200 Subject: [PATCH] stats plugin: Add support for sets. --- src/collectd.conf.in | 5 +- src/collectd.conf.pod | 13 +++-- src/statsd.c | 140 ++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 141 insertions(+), 17 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 515da22a..98a75967 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -932,8 +932,9 @@ # Host "::" # Port "8125" # DeleteCounters false -# DeleteTimers false -# DeleteGauges false +# DeleteTimers false +# DeleteGauges false +# DeleteSets false # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index a16f29a1..39b3792d 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -5109,8 +5109,9 @@ The I listens to a UDP socket, reads "events" in the statsd protocol and dispatches rates or other aggregates of these numbers periodically. -The plugin implements the I, I and I types which are -dispatched as C, C and C respectively. +The plugin implements the I, I, I and I types which +are dispatched as the I types C, C, C and +C respectively. The following configuration options are valid: @@ -5132,11 +5133,13 @@ Defaults to C<8125>. =item B B|B +=item B B|B + These options control what happens if metrics are not updated in an interval. If set to B, the default, metrics are dispatched unchanged, i.e. the -rate of counters will be zero, timers report C and gauges are unchanged. -If set to B, the such metrics are not dispatched and removed from the -internal cache. +rate of counters and size of sets will be zero, timers report C and gauges +are unchanged. If set to B, the such metrics are not dispatched and +removed from the internal cache. =back diff --git a/src/statsd.c b/src/statsd.c index 5457c33f..9443fed4 100644 --- a/src/statsd.c +++ b/src/statsd.c @@ -45,7 +45,8 @@ enum metric_type_e { STATSD_COUNTER, STATSD_TIMER, - STATSD_GAUGE + STATSD_GAUGE, + STATSD_SET }; typedef enum metric_type_e metric_type_t; @@ -53,6 +54,7 @@ struct statsd_metric_s { metric_type_t type; int64_t value; + c_avl_tree_t *set; unsigned long updates_num; }; typedef struct statsd_metric_s statsd_metric_t; @@ -70,6 +72,7 @@ static char *conf_service = NULL; static _Bool conf_delete_counters = 0; static _Bool conf_delete_timers = 0; static _Bool conf_delete_gauges = 0; +static _Bool conf_delete_sets = 0; /* Must hold metrics_lock when calling this function. */ static int statsd_metric_set_unsafe (char const *name, int64_t value, /* {{{ */ @@ -227,14 +230,93 @@ static int statsd_handle_timer (char const *name, /* {{{ */ return (statsd_metric_add (key, (int64_t) value.derive, STATSD_TIMER)); } /* }}} int statsd_handle_timer */ -static int statsd_handle_set (char const *name __attribute__((unused)), /* {{{ */ - char const *value_str __attribute__((unused))) +static int statsd_handle_set (char const *key_orig, /* {{{ */ + char const *name_orig) { - static c_complain_t c = C_COMPLAIN_INIT_STATIC; + char key[DATA_MAX_NAME_LEN + 2]; + char *name; + statsd_metric_t *metric = NULL; + int status; + + ssnprintf (key, sizeof (key), "s:%s", key_orig); + + pthread_mutex_lock (&metrics_lock); + + status = c_avl_get (metrics_tree, key, (void *) &metric); + if (status != 0) /* Create a new metric */ + { + char *key_copy; + + DEBUG ("stats plugin: Adding new metric \"%s\".", key); + key_copy = strdup (key); + if (key_copy == NULL) + { + pthread_mutex_unlock (&metrics_lock); + ERROR ("statsd plugin: strdup failed."); + return (-1); + } + + metric = calloc (1, sizeof (*metric)); + if (metric == NULL) + { + pthread_mutex_unlock (&metrics_lock); + ERROR ("statsd plugin: calloc failed."); + sfree (key_copy); + return (-1); + } + metric->type = STATSD_SET; + metric->set = NULL; + + status = c_avl_insert (metrics_tree, key_copy, metric); + if (status != 0) + { + pthread_mutex_unlock (&metrics_lock); + ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.", + key_copy, status); + sfree (key_copy); + sfree (metric); + return (-1); + } + } + assert (metric != NULL); + + /* Make sure metric->set exists. */ + if (metric->set == NULL) + metric->set = c_avl_create ((void *) strcmp); + + if (metric->set == NULL) + { + pthread_mutex_unlock (&metrics_lock); + ERROR ("statsd plugin: c_avl_create failed."); + return (-1); + } + + name = strdup (name_orig); + if (name == NULL) + { + pthread_mutex_unlock (&metrics_lock); + ERROR ("statsd plugin: strdup failed."); + return (-1); + } - c_complain (LOG_WARNING, &c, - "statsd plugin: Support for sets is not yet implemented."); + status = c_avl_insert (metric->set, name, /* value = */ NULL); + if (status < 0) + { + pthread_mutex_unlock (&metrics_lock); + if (status < 0) + ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.", + name, status); + sfree (name); + return (-1); + } + else if (status > 0) /* key already exists */ + { + sfree (name); + } + + metric->updates_num++; + pthread_mutex_unlock (&metrics_lock); return (0); } /* }}} int statsd_handle_set */ @@ -486,6 +568,8 @@ static int statsd_config (oconfig_item_t *ci) /* {{{ */ cf_util_get_boolean (child, &conf_delete_timers); else if (strcasecmp ("DeleteGauges", child->key) == 0) cf_util_get_boolean (child, &conf_delete_gauges); + else if (strcasecmp ("DeleteSets", child->key) == 0) + cf_util_get_boolean (child, &conf_delete_sets); else ERROR ("statsd plugin: The \"%s\" config option is not valid.", child->key); @@ -524,7 +608,29 @@ static int statsd_init (void) /* {{{ */ return (0); } /* }}} int statsd_init */ -static int statsd_metric_submit (char const *name, /* {{{ */ +/* Must hold metrics_lock when calling this function. */ +static int statsd_metric_clear_set_unsafe (statsd_metric_t *metric) /* {{{ */ +{ + void *key; + void *value; + + if ((metric == NULL) || (metric->type != STATSD_SET)) + return (EINVAL); + + if (metric->set == NULL) + return (0); + + while (c_avl_pick (metric->set, &key, &value) == 0) + { + sfree (key); + sfree (value); + } + + return (0); +} /* }}} int statsd_metric_clear_set_unsafe */ + +/* Must hold metrics_lock when calling this function. */ +static int statsd_metric_submit_unsafe (char const *name, /* {{{ */ statsd_metric_t const *metric) { value_t values[1]; @@ -540,6 +646,13 @@ static int statsd_metric_submit (char const *name, /* {{{ */ values[0].gauge = ((gauge_t) metric->value) / ((gauge_t) metric->updates_num); } + else if (metric->type == STATSD_SET) + { + if (metric->set == NULL) + values[0].gauge = 0.0; + else + values[0].gauge = (gauge_t) c_avl_size (metric->set); + } else values[0].derive = (derive_t) metric->value; @@ -552,13 +665,15 @@ static int statsd_metric_submit (char const *name, /* {{{ */ sstrncpy (vl.type, "gauge", sizeof (vl.type)); else if (metric->type == STATSD_TIMER) sstrncpy (vl.type, "latency", sizeof (vl.type)); + else if (metric->type == STATSD_SET) + sstrncpy (vl.type, "objects", sizeof (vl.type)); else /* if (metric->type == STATSD_COUNTER) */ sstrncpy (vl.type, "derive", sizeof (vl.type)); sstrncpy (vl.type_instance, name, sizeof (vl.type_instance)); return (plugin_dispatch_values (&vl)); -} /* }}} int statsd_metric_submit */ +} /* }}} int statsd_metric_submit_unsafe */ static int statsd_read (void) /* {{{ */ { @@ -584,7 +699,8 @@ static int statsd_read (void) /* {{{ */ if ((metric->updates_num == 0) && ((conf_delete_counters && (metric->type == STATSD_COUNTER)) || (conf_delete_timers && (metric->type == STATSD_TIMER)) - || (conf_delete_gauges && (metric->type == STATSD_GAUGE)))) + || (conf_delete_gauges && (metric->type == STATSD_GAUGE)) + || (conf_delete_sets && (metric->type == STATSD_SET)))) { DEBUG ("statsd plugin: Deleting metric \"%s\".", name); strarray_add (&to_be_deleted, &to_be_deleted_num, name); @@ -593,8 +709,12 @@ static int statsd_read (void) /* {{{ */ /* Names have a prefix, e.g. "c:", which determines the (statsd) type. * Remove this here. */ - statsd_metric_submit (name + 2, metric); + statsd_metric_submit_unsafe (name + 2, metric); + + /* Reset the metric. */ metric->updates_num = 0; + if (metric->type == STATSD_SET) + statsd_metric_clear_set_unsafe (metric); } c_avl_iterator_destroy (iter); -- 2.11.0