static long write_limit_high = 0;
static long write_limit_low = 0;
-cdtime_t last_drop_time = 0;
/*
* Static functions
return (0);
} /* int plugin_dispatch_values_internal */
-static _Bool drop_metric(void) {
- _Bool drop = 0;
- int wq_len = write_queue_length;
- /* We store write_queue_length in a local variable because other threads may update write_queue_length.
- * Having this in a local variable (like a cache) is better : we do not need a lock */
-
- if(wq_len < write_limit_low) return(0);
-
- if((write_limit_high > 0) && (wq_len > write_limit_low)) {
- if(wq_len >= write_limit_high) {
- /* if high == low, we come here too */
- drop = 1;
- } else {
- /* here, high != low */
- long probability_to_drop;
- long n;
-
- probability_to_drop = (wq_len - write_limit_low);
-
- n = cdrand_range(write_limit_low, write_limit_high);
-
- /* Let's have X = high - low.
- * n is in range [0..X]
- * probability_to_drop is in range [1..X[
- * probability_to_drop gets bigger when wq_len gets bigger.
- */
- if(n <= probability_to_drop) {
- drop = 1;
- }
- }
- }
- if(drop) {
- cdtime_t now = cdtime();
- if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) {
- last_drop_time = now;
- /* If you want to count dropped metrics, don't forget to add a lock here */
- /* dropped_metrics++; */
- ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric");
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
}
+ pthread_mutex_unlock (&last_message_lock);
}
- return(drop);
-}
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
- if(drop_metric ()) return(0);
+ if (check_drop_value ())
+ return (0);
status = plugin_write_enqueue (vl);
if (status != 0)