static char **config_listen_address_list = NULL;
static int config_listen_address_list_len = 0;
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_total = 0;
+static uint64_t stats_values_total = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
/*
* Functions
*/
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
+ int did_insert = 0;
+
RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
ci->file);
if (cache_queue_tail == NULL)
cache_queue_tail = cache_queue_head;
+
+ did_insert = 1;
}
else if (cache_queue_head == ci)
{
else
cache_queue_tail->next = ci;
cache_queue_tail = ci;
+
+ did_insert = 1;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
+ if (did_insert)
+ {
+ pthread_mutex_lock (&stats_lock);
+ stats_queue_length++;
+ pthread_mutex_unlock (&stats_lock);
+ }
+
return (0);
} /* }}} int enqueue_cache_item */
cache_queue_tail = NULL;
ci->next = NULL;
+ pthread_mutex_lock (&stats_lock);
+ assert (stats_queue_length > 0);
+ stats_queue_length--;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_unlock (&cache_lock);
RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
for (i = 0; i < values_num; i++)
free (values[i]);
+ pthread_mutex_lock (&stats_lock);
+ stats_updates_total++;
+ stats_values_total += values_num;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_lock (&cache_lock);
pthread_cond_broadcast (&flush_cond);
} /* while (do_shutdown == 0) */