do_shutdown++;
} /* }}} void sig_term_handler */
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
+{
+ RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
+ ci->file);
+
+ if (ci == NULL)
+ return (-1);
+
+ if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+ return (-1);
+
+ assert (ci->next == NULL);
+
+ if (cache_queue_tail == NULL)
+ cache_queue_head = ci;
+ else
+ cache_queue_tail->next = ci;
+ cache_queue_tail = ci;
+
+ return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+ gpointer data)
+{
+ cache_item_t *ci;
+ time_t now;
+
+ ci = (cache_item_t *) value;
+ now = *((time_t *) data);
+
+ if (((now - ci->last_flush_time) >= config_write_interval)
+ && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
+ enqueue_cache_item (ci);
+
+ return (TRUE);
+} /* }}} gboolean tree_callback_flush */
+
static void *queue_thread_main (void *args) /* {{{ */
{
+ struct timeval now;
+ struct timespec next_flush;
+
+ gettimeofday (&now, NULL);
+ next_flush.tv_sec = now.tv_sec + config_flush_interval;
+ next_flush.tv_nsec = 1000 * now.tv_usec;
+
pthread_mutex_lock (&cache_lock);
while ((do_shutdown == 0) || (cache_queue_head != NULL))
{
cache_item_t *ci;
-
char *file;
char **values;
int values_num;
int status;
int i;
+ /* First, check if it's time to do the cache flush. */
+ gettimeofday (&now, NULL);
+ if ((now.tv_sec > next_flush.tv_sec)
+ || ((now.tv_sec == next_flush.tv_sec)
+ && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+ {
+ time_t time_now;
+
+ /* Pass the current time as user data so that we don't need to call
+ * `time' for each node. */
+ time_now = time (NULL);
+
+ g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+
+ /* Determine the time of the next cache flush. */
+ while (next_flush.tv_sec < now.tv_sec)
+ next_flush.tv_sec += config_flush_interval;
+ }
+
+ /* Now, check if there's something to store away. If not, wait until
+ * something comes in or it's time to do the cache flush. */
if (cache_queue_head == NULL)
- pthread_cond_wait (&cache_cond, &cache_lock);
+ {
+ struct timespec timeout;
+ timeout.tv_sec = next_flush.tv_sec - now.tv_sec;
+ if (next_flush.tv_nsec < (1000 * now.tv_usec))
+ {
+ timeout.tv_sec--;
+ timeout.tv_nsec = 1000000000 + next_flush.tv_nsec
+ - (1000 * now.tv_usec);
+ }
+ else
+ {
+ timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec);
+ }
+
+ pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout);
+ }
+
+ /* Check if a value has arrived. This may be NULL if we timed out or there
+ * was an interrupt such as a signal. */
if (cache_queue_head == NULL)
continue;
if (((now - ci->last_flush_time) >= config_write_interval)
&& ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
{
- RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
- ci->file);
-
- assert (ci->next == NULL);
-
- if (cache_queue_tail == NULL)
- cache_queue_head = ci;
- else
- cache_queue_tail->next = ci;
- cache_queue_tail = ci;
-
+ enqueue_cache_item (ci);
pthread_cond_signal (&cache_cond);
}