enum
{
FLAG_NONE = 0x00,
- FLAG_QUEUED = 0x01
+ FLAG_QUEUED = 0x01,
+ FLAG_FLUSHQ = 0x02
} flags;
};
typedef struct rrd_cache_s rrd_cache_t;
"HeartBeat",
"RRARows",
"RRATimespan",
- "XFF"
+ "XFF",
+ "WritesPerSecond"
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
static int heartbeat = 0;
static int rrarows = 1200;
static double xff = 0.1;
+static double write_rate = 0.0;
/* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
* ALWAYS lock `cache_lock' first! */
static rrd_queue_t *queue_head = NULL;
static rrd_queue_t *queue_tail = NULL;
+static rrd_queue_t *flushq_head = NULL;
+static rrd_queue_t *flushq_tail = NULL;
static pthread_t queue_thread = 0;
static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
static void *rrd_queue_thread (void *data)
{
+ struct timeval tv_next_update;
+ struct timeval tv_now;
+
+ gettimeofday (&tv_next_update, /* timezone = */ NULL);
+
while (42)
{
rrd_queue_t *queue_entry;
int values_num;
int i;
- /* XXX: If you need to lock both, cache_lock and queue_lock, at
- * the same time, ALWAYS lock `cache_lock' first! */
-
- /* wait until an entry is available */
- pthread_mutex_lock (&queue_lock);
- while ((queue_head == NULL) && (do_shutdown == 0))
- pthread_cond_wait (&queue_cond, &queue_lock);
-
- /* We're in the shutdown phase */
- if (queue_head == NULL)
- {
- pthread_mutex_unlock (&queue_lock);
- break;
- }
-
- /* Dequeue the first entry */
- queue_entry = queue_head;
- if (queue_head == queue_tail)
- queue_head = queue_tail = NULL;
- else
- queue_head = queue_head->next;
+ pthread_mutex_lock (&queue_lock);
+ /* Wait for values to arrive */
+ while (true)
+ {
+ struct timespec ts_wait;
+ int status;
+
+ while ((flushq_head == NULL) && (queue_head == NULL)
+ && (do_shutdown == 0))
+ pthread_cond_wait (&queue_cond, &queue_lock);
+
+ if ((flushq_head == NULL) && (queue_head == NULL))
+ break;
+
+ /* Don't delay if there's something to flush */
+ if (flushq_head != NULL)
+ break;
+
+ /* Don't delay if we're shutting down */
+ if (do_shutdown != 0)
+ break;
+
+ /* Don't delay if no delay was configured. */
+ if (write_rate <= 0.0)
+ break;
+
+ gettimeofday (&tv_now, /* timezone = */ NULL);
+ status = timeval_sub_timespec (&tv_next_update, &tv_now,
+ &ts_wait);
+ /* We're good to go */
+ if (status != 0)
+ break;
+
+ /* We're supposed to wait a bit with this update, so we'll
+ * wait for the next addition to the queue or to the end of
+ * the wait period - whichever comes first. */
+ ts_wait.tv_sec = tv_next_update.tv_sec;
+ ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
+
+ status = pthread_cond_timedwait (&queue_cond, &queue_lock,
+ &ts_wait);
+ if (status == ETIMEDOUT)
+ break;
+ } /* while (true) */
+
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+
+ /* We're in the shutdown phase */
+ if ((flushq_head == NULL) && (queue_head == NULL))
+ {
+ pthread_mutex_unlock (&queue_lock);
+ break;
+ }
+
+ if (flushq_head != NULL)
+ {
+ /* Dequeue the first flush entry */
+ queue_entry = flushq_head;
+ if (flushq_head == flushq_tail)
+ flushq_head = flushq_tail = NULL;
+ else
+ flushq_head = flushq_head->next;
+ }
+ else /* if (queue_head != NULL) */
+ {
+ /* Dequeue the first regular entry */
+ queue_entry = queue_head;
+ if (queue_head == queue_tail)
+ queue_head = queue_tail = NULL;
+ else
+ queue_head = queue_head->next;
+ }
/* Unlock the queue again */
pthread_mutex_unlock (&queue_lock);
pthread_mutex_unlock (&cache_lock);
+ /* Update `tv_next_update' */
+ if (write_rate > 0.0)
+ {
+ gettimeofday (&tv_now, /* timezone = */ NULL);
+ tv_next_update.tv_sec = tv_now.tv_sec;
+ tv_next_update.tv_usec = tv_now.tv_usec
+ + ((suseconds_t) (1000000 * write_rate));
+ while (tv_next_update.tv_usec > 1000000)
+ {
+ tv_next_update.tv_sec++;
+ tv_next_update.tv_usec -= 1000000;
+ }
+ }
+
/* Write the values to the RRD-file */
srrd_update (queue_entry->filename, NULL,
values_num, (const char **)values);
return ((void *) 0);
} /* void *rrd_queue_thread */
-static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir)
+static int rrd_queue_enqueue (const char *filename,
+ rrd_queue_t **head, rrd_queue_t **tail)
{
rrd_queue_t *queue_entry;
queue_entry->next = NULL;
pthread_mutex_lock (&queue_lock);
- if (dir == QUEUE_INSERT_FRONT)
- {
- queue_entry->next = queue_head;
- queue_head = queue_entry;
- if (queue_tail == NULL)
- queue_tail = queue_head;
- }
- else /* (dir == QUEUE_INSERT_BACK) */
- {
- if (queue_tail == NULL)
- queue_head = queue_entry;
- else
- queue_tail->next = queue_entry;
- queue_tail = queue_entry;
- }
+
+ if (*tail == NULL)
+ *head = queue_entry;
+ else
+ (*tail)->next = queue_entry;
+ *tail = queue_entry;
+
pthread_cond_signal (&queue_cond);
pthread_mutex_unlock (&queue_lock);
- DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
-
return (0);
-} /* int rrd_queue_cache_entry */
+} /* int rrd_queue_enqueue */
-static int rrd_queue_move_to_front (const char *filename)
+static int rrd_queue_dequeue (const char *filename,
+ rrd_queue_t **head, rrd_queue_t **tail)
{
rrd_queue_t *this;
rrd_queue_t *prev;
- this = NULL;
- prev = NULL;
pthread_mutex_lock (&queue_lock);
- for (this = queue_head; this != NULL; this = this->next)
+
+ prev = NULL;
+ this = *head;
+
+ while (this != NULL)
{
if (strcmp (this->filename, filename) == 0)
break;
+
prev = this;
+ this = this->next;
}
- /* Check if we found the entry and if it is NOT the first entry. */
- if ((this != NULL) && (prev != NULL))
+ if (this == NULL)
{
- prev->next = this->next;
- this->next = queue_head;
- queue_head = this;
+ pthread_mutex_unlock (&queue_lock);
+ return (-1);
}
+
+ if (prev == NULL)
+ *head = this->next;
+ else
+ prev->next = this->next;
+
+ if (this->next == NULL)
+ *tail = prev;
+
pthread_mutex_unlock (&queue_lock);
+ sfree (this->filename);
+ sfree (this);
+
return (0);
-} /* int rrd_queue_move_to_front */
+} /* int rrd_queue_dequeue */
static void rrd_cache_flush (int timeout)
{
iter = c_avl_get_iterator (cache);
while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
{
- if (rc->flags == FLAG_QUEUED)
+ if (rc->flags != FLAG_NONE)
continue;
else if ((now - rc->first_value) < timeout)
continue;
else if (rc->values_num > 0)
{
- if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0)
+ int status;
+
+ status = rrd_queue_enqueue (key, &queue_head, &queue_tail);
+ if (status == 0)
rc->flags = FLAG_QUEUED;
}
else /* ancient and no values -> waste of memory */
now = time (NULL);
if (datadir == NULL)
- snprintf (key, sizeof (key), "%s.rrd",
- identifier);
+ snprintf (key, sizeof (key), "%s.rrd",
+ identifier);
else
- snprintf (key, sizeof (key), "%s/%s.rrd",
- datadir, identifier);
+ snprintf (key, sizeof (key), "%s/%s.rrd",
+ datadir, identifier);
key[sizeof (key) - 1] = 0;
status = c_avl_get (cache, key, (void *) &rc);
if (status != 0)
{
WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
- "c_avl_get (%s) failed. Does that file really exist?",
- key);
+ "c_avl_get (%s) failed. Does that file really exist?",
+ key);
return (status);
}
- if (rc->flags == FLAG_QUEUED)
- status = rrd_queue_move_to_front (key);
+ if (rc->flags == FLAG_FLUSHQ)
+ {
+ status = 0;
+ }
+ else if (rc->flags == FLAG_QUEUED)
+ {
+ rrd_queue_dequeue (key, &queue_head, &queue_tail);
+ status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
+ if (status == 0)
+ rc->flags = FLAG_FLUSHQ;
+ }
else if ((now - rc->first_value) < timeout)
+ {
status = 0;
+ }
else if (rc->values_num > 0)
{
- status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT);
+ status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
if (status == 0)
- rc->flags = FLAG_QUEUED;
+ rc->flags = FLAG_FLUSHQ;
}
return (status);
{
/* XXX: If you need to lock both, cache_lock and queue_lock, at
* the same time, ALWAYS lock `cache_lock' first! */
- if (rc->flags != FLAG_QUEUED)
+ if (rc->flags == FLAG_NONE)
{
- if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0)
+ int status;
+
+ status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
+ if (status == 0)
rc->flags = FLAG_QUEUED;
}
else
((time (NULL) - cache_flush_last) > cache_flush_timeout))
rrd_cache_flush (cache_flush_timeout);
-
pthread_mutex_unlock (&cache_lock);
return (0);
}
xff = tmp;
}
+ else if (strcasecmp ("WritesPerSecond", key) == 0)
+ {
+ double wps = atof (value);
+
+ if (wps < 0.0)
+ {
+ fprintf (stderr, "rrdtool: `WritesPerSecond' must be "
+ "greater than or equal to zero.");
+ return (1);
+ }
+ else if (wps == 0.0)
+ {
+ write_rate = 0.0;
+ }
+ else
+ {
+ write_rate = 1.0 / wps;
+ }
+ }
else
{
return (-1);