src/plugin.c: Some fixes for write limits.
authorFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:19:54 +0000 (11:19 +0200)
committerFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:19:54 +0000 (11:19 +0200)
* Log an error once per second.
* Coding style fixes.
* Separate function for calculating drop probability.

src/plugin.c

index 7ccfc4d..12f002c 100644 (file)
@@ -121,7 +121,6 @@ static _Bool           plugin_ctx_key_initialized = 0;
 
 static long            write_limit_high = 0;
 static long            write_limit_low = 0;
-cdtime_t               last_drop_time = 0;
 
 /*
  * Static functions
@@ -1985,54 +1984,75 @@ static int plugin_dispatch_values_internal (value_list_t *vl)
        return (0);
 } /* int plugin_dispatch_values_internal */
 
-static _Bool drop_metric(void) {
-       _Bool drop = 0;
-       int wq_len = write_queue_length;
-       /* We store write_queue_length in a local variable because other threads may update write_queue_length.
-        * Having this in a local variable (like a cache) is better : we do not need a lock */
-
-       if(wq_len < write_limit_low) return(0);
-
-       if((write_limit_high > 0) && (wq_len > write_limit_low)) {
-               if(wq_len >= write_limit_high) {
-                       /* if high == low, we come here too */
-                       drop = 1;
-               } else {
-                       /* here, high != low */
-                       long probability_to_drop;
-                       long n;
-
-                       probability_to_drop = (wq_len - write_limit_low);
-
-                       n = cdrand_range(write_limit_low, write_limit_high);
-
-                       /* Let's have X = high - low.
-                        *   n is in range [0..X]
-                        *   probability_to_drop is in range [1..X[
-                        *   probability_to_drop gets bigger when wq_len gets bigger.
-                        */
-                       if(n <= probability_to_drop) {
-                               drop = 1;
-                       }
-               }
-       }
-       if(drop) {
-               cdtime_t now = cdtime();
-               if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) {
-                       last_drop_time = now;
-                       /* If you want to count dropped metrics, don't forget to add a lock here */
-                       /* dropped_metrics++; */
-                       ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric");
+static double get_drop_probability (void) /* {{{ */
+{
+       long pos;
+       long size;
+       long wql;
+
+       pthread_mutex_lock (&write_lock);
+       wql = write_queue_length;
+       pthread_mutex_unlock (&write_lock);
+
+       if (wql < write_limit_low)
+               return (0.0);
+       if (wql >= write_limit_high)
+               return (1.0);
+
+       pos = 1 + wql - write_limit_low;
+       size = 1 + write_limit_high - write_limit_low;
+
+       return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+       static cdtime_t last_message_time = 0;
+       static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+       double p;
+       double q;
+       int status;
+
+       if (write_limit_high == 0)
+               return (0);
+
+       p = get_drop_probability ();
+       if (p == 0.0)
+               return (0);
+
+       status = pthread_mutex_trylock (&last_message_lock);
+       if (status == 0)
+       {
+               cdtime_t now;
+
+               now = cdtime ();
+               if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+               {
+                       last_message_time = now;
+                       ERROR ("plugin_dispatch_values: Low water mark "
+                                       "reached. Dropping %.0f%% of metrics.",
+                                       100.0 * p);
                }
+               pthread_mutex_unlock (&last_message_lock);
        }
-       return(drop);
-}
+
+       if (p == 1.0)
+               return (1);
+
+       q = cdrand_d ();
+       if (q > p)
+               return (1);
+       else
+               return (0);
+} /* }}} _Bool check_drop_value */
 
 int plugin_dispatch_values (value_list_t const *vl)
 {
        int status;
 
-       if(drop_metric ()) return(0);
+       if (check_drop_value ())
+               return (0);
 
        status = plugin_write_enqueue (vl);
        if (status != 0)