From fcb0810fa9eb88c28e63fca46ca9c95dc86d44bb Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Mon, 28 May 2007 12:29:13 +0200 Subject: [PATCH] rrdtool plugin: Queue values to be written, but don't write them synchronously. --- src/rrdtool.c | 425 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 277 insertions(+), 148 deletions(-) diff --git a/src/rrdtool.c b/src/rrdtool.c index ea962905..feed01c2 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -37,9 +37,21 @@ struct rrd_cache_s char **values; time_t first_value; time_t last_value; + enum + { + FLAG_NONE = 0x00, + FLAG_QUEUED = 0x01 + } flags; }; typedef struct rrd_cache_s rrd_cache_t; +struct rrd_queue_s +{ + char *filename; + struct rrd_queue_s *next; +}; +typedef struct rrd_queue_s rrd_queue_t; + /* * Private variables */ @@ -83,12 +95,22 @@ static int heartbeat = 0; static int rrarows = 1200; static double xff = 0.1; +/* XXX: If you need to lock both, cache_lock and queue_lock, at the same time, + * ALWAYS lock `cache_lock' first! */ static int cache_timeout = 0; static int cache_flush_timeout = 0; static time_t cache_flush_last; static avl_tree_t *cache = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; +static rrd_queue_t *queue_head = NULL; +static rrd_queue_t *queue_tail = NULL; +static pthread_t queue_thread = 0; +static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; + +static int do_shutdown = 0; + /* * * * * * * * * * * WARNING: Magic * * * * * * * * * * */ @@ -434,86 +456,7 @@ static int value_list_to_filename (char *buffer, int buffer_len, return (0); } /* int value_list_to_filename */ -static rrd_cache_t *rrd_cache_insert (const char *filename, - const char *value, time_t value_time) -{ - rrd_cache_t *rc = NULL; - int new_rc = 0; - - if (cache != NULL) - avl_get (cache, filename, (void *) &rc); - - if (rc == NULL) - { - rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t)); - if (rc == NULL) - return (NULL); - rc->values_num = 0; - rc->values = NULL; - rc->first_value = 0; - rc->last_value = 0; - new_rc = 1; - } - - if (rc->last_value >= value_time) - { - WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", - (unsigned int) rc->last_value, - (unsigned int) value_time); - return (NULL); - } - - rc->values = (char **) realloc ((void *) rc->values, - (rc->values_num + 1) * sizeof (char *)); - if (rc->values == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: realloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - if (cache != NULL) - { - void *cache_key = NULL; - avl_remove (cache, filename, &cache_key, NULL); - sfree (cache_key); - } - free (rc); - return (NULL); - } - - rc->values[rc->values_num] = strdup (value); - if (rc->values[rc->values_num] != NULL) - rc->values_num++; - - if (rc->values_num == 1) - rc->first_value = value_time; - rc->last_value = value_time; - - /* Insert if this is the first value */ - if ((cache != NULL) && (new_rc == 1)) - { - void *cache_key = strdup (filename); - - if (cache_key == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: strdup failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (rc->values[0]); - sfree (rc->values); - sfree (rc); - return (NULL); - } - - avl_insert (cache, cache_key, rc); - } - - DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value, - (unsigned int) value_time, (void *) rc); - - return (rc); -} /* rrd_cache_t *rrd_cache_insert */ - -static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc) +static int rrd_write_to_file (char *filename, char **values, int values_num) { char **argv; int argc; @@ -521,26 +464,17 @@ static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc) char *fn; int status; - int i; - - if (rc->values_num < 1) + if (values_num < 1) return (0); - argc = rc->values_num + 2; + argc = values_num + 2; argv = (char **) malloc ((argc + 1) * sizeof (char *)); if (argv == NULL) return (-1); - fn = strdup (filename); - if (fn == NULL) - { - free (argv); - return (-1); - } - argv[0] = "update"; - argv[1] = fn; - memcpy (argv + 2, rc->values, rc->values_num * sizeof (char *)); + argv[1] = filename; + memcpy (argv + 2, values, values_num * sizeof (char *)); argv[argc] = NULL; DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv); @@ -557,16 +491,110 @@ static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc) free (argv); free (fn); - /* Free the value list of `rc' */ - for (i = 0; i < rc->values_num; i++) - free (rc->values[i]); - free (rc->values); - rc->values = NULL; - rc->values_num = 0; return (status); } /* int rrd_write_cache_entry */ +static void *rrd_queue_thread (void *data) +{ + while (42) + { + rrd_queue_t *queue_entry; + rrd_cache_t *cache_entry; + char **values; + int values_num; + int i; + + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + + /* wait until an entry is available */ + pthread_mutex_lock (&queue_lock); + while ((queue_head == NULL) && (do_shutdown == 0)) + pthread_cond_wait (&queue_cond, &queue_lock); + + /* We're in the shutdown phase */ + if (queue_head == NULL) + { + pthread_mutex_unlock (&queue_lock); + break; + } + + /* Dequeue the first entry */ + queue_entry = queue_head; + if (queue_head == queue_tail) + queue_head = queue_tail = NULL; + else + queue_head = queue_head->next; + + /* Unlock the queue again */ + pthread_mutex_unlock (&queue_lock); + + /* We now need the cache lock so the entry isn't updated while + * we make a copy of it's values */ + pthread_mutex_lock (&cache_lock); + + avl_get (cache, queue_entry->filename, (void *) &cache_entry); + + values = cache_entry->values; + values_num = cache_entry->values_num; + + cache_entry->values = NULL; + cache_entry->values_num = 0; + cache_entry->flags = FLAG_NONE; + + pthread_mutex_unlock (&cache_lock); + + /* Write the values to the RRD-file */ + rrd_write_to_file (queue_entry->filename, values, values_num); + + for (i = 0; i < values_num; i++) + { + sfree (values[i]); + } + sfree (values); + sfree (queue_entry->filename); + sfree (queue_entry); + } /* while (42) */ + + pthread_mutex_lock (&cache_lock); + avl_destroy (cache); + cache = NULL; + pthread_mutex_unlock (&cache_lock); + + pthread_exit ((void *) 0); + return ((void *) 0); +} /* void *rrd_queue_thread */ + +static int rrd_queue_cache_entry (const char *filename) +{ + rrd_queue_t *queue_entry; + + queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); + if (queue_entry == NULL) + return (-1); + + queue_entry->filename = strdup (filename); + if (queue_entry->filename == NULL) + { + free (queue_entry); + return (-1); + } + + queue_entry->next = NULL; + + pthread_mutex_lock (&queue_lock); + if (queue_tail == NULL) + queue_head = queue_entry; + else + queue_tail->next = queue_entry; + queue_tail = queue_entry; + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); + + return (0); +} /* int rrd_queue_cache_entry */ + static void rrd_cache_flush (int timeout) { rrd_cache_t *rc; @@ -579,9 +607,6 @@ static void rrd_cache_flush (int timeout) avl_iterator_t *iter; int i; - if (cache == NULL) - return; - DEBUG ("Flushing cache, timeout = %i", timeout); now = time (NULL); @@ -591,7 +616,17 @@ static void rrd_cache_flush (int timeout) while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { DEBUG ("key = %s; age = %i;", key, now - rc->first_value); - if ((now - rc->first_value) >= timeout) + + if (rc->flags == FLAG_QUEUED) + continue; + else if ((now - rc->first_value) < timeout) + continue; + else if (rc->values_num > 0) + { + if (rrd_queue_cache_entry (key) == 0) + rc->flags = FLAG_QUEUED; + } + else /* ancient and no values -> waste of memory */ { keys = (char **) realloc ((void *) keys, (keys_num + 1) * sizeof (char *)); @@ -619,8 +654,9 @@ static void rrd_cache_flush (int timeout) continue; } - rrd_write_cache_entry (keys[i], rc); - /* rc's value-list is free's by `rrd_write_cache_entry' */ + assert (rc->values == NULL); + assert (rc->values_num == 0); + sfree (rc); sfree (key); keys[i] = NULL; @@ -632,13 +668,118 @@ static void rrd_cache_flush (int timeout) cache_flush_last = now; } /* void rrd_cache_flush */ +static int rrd_cache_insert (const char *filename, + const char *value, time_t value_time) +{ + rrd_cache_t *rc = NULL; + int new_rc = 0; + char **values_new; + + pthread_mutex_lock (&cache_lock); + + avl_get (cache, filename, (void *) &rc); + + if (rc == NULL) + { + rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t)); + if (rc == NULL) + return (-1); + rc->values_num = 0; + rc->values = NULL; + rc->first_value = 0; + rc->last_value = 0; + rc->flags = FLAG_NONE; + new_rc = 1; + } + + if (rc->last_value >= value_time) + { + pthread_mutex_unlock (&cache_lock); + WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", + (unsigned int) rc->last_value, + (unsigned int) value_time); + return (-1); + } + + values_new = (char **) realloc ((void *) rc->values, + (rc->values_num + 1) * sizeof (char *)); + if (values_new == NULL) + { + char errbuf[1024]; + void *cache_key = NULL; + + sstrerror (errno, errbuf, sizeof (errbuf)); + + avl_remove (cache, filename, &cache_key, NULL); + pthread_mutex_unlock (&cache_lock); + + ERROR ("rrdtool plugin: realloc failed: %s", errbuf); + + sfree (cache_key); + sfree (rc->values); + sfree (rc); + return (-1); + } + + rc->values[rc->values_num] = strdup (value); + if (rc->values[rc->values_num] != NULL) + rc->values_num++; + + if (rc->values_num == 1) + rc->first_value = value_time; + rc->last_value = value_time; + + /* Insert if this is the first value */ + if (new_rc == 1) + { + void *cache_key = strdup (filename); + + if (cache_key == NULL) + { + char errbuf[1024]; + sstrerror (errno, errbuf, sizeof (errbuf)); + + pthread_mutex_unlock (&cache_lock); + + ERROR ("rrdtool plugin: strdup failed: %s", errbuf); + + sfree (rc->values[0]); + sfree (rc->values); + sfree (rc); + return (-1); + } + + avl_insert (cache, cache_key, rc); + } + + DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value, + (unsigned int) value_time, (void *) rc); + + if (((rc->last_value - rc->first_value) >= cache_timeout) + && (rc->flags != FLAG_QUEUED)) + { + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + if (rrd_queue_cache_entry (filename) == 0) + rc->flags = FLAG_QUEUED; + } + + if ((cache_timeout > 0) && + ((time (NULL) - cache_flush_last) > cache_flush_timeout)) + rrd_cache_flush (cache_flush_timeout); + + + pthread_mutex_unlock (&cache_lock); + + return (0); +} /* int rrd_cache_insert */ + static int rrd_write (const data_set_t *ds, const value_list_t *vl) { struct stat statbuf; char filename[512]; char values[512]; - rrd_cache_t *rc; - time_t now; + int status; if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0) return (-1); @@ -669,37 +810,9 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (-1); } - pthread_mutex_lock (&cache_lock); - rc = rrd_cache_insert (filename, values, vl->time); - if (rc == NULL) - { - pthread_mutex_unlock (&cache_lock); - return (-1); - } - - if (cache == NULL) - { - rrd_write_cache_entry (filename, rc); - /* rc's value-list is free's by `rrd_write_cache_entry' */ - sfree (rc); - pthread_mutex_unlock (&cache_lock); - return (0); - } - - now = time (NULL); - - DEBUG ("age (%s) = %i", filename, now - rc->first_value); + status = rrd_cache_insert (filename, values, vl->time); - /* `rc' is not free'd here, because we'll likely reuse it. If not, then - * the next flush will remove this entry. */ - if ((now - rc->first_value) >= cache_timeout) - rrd_write_cache_entry (filename, rc); - - if ((now - cache_flush_last) >= cache_flush_timeout) - rrd_cache_flush (cache_flush_timeout); - - pthread_mutex_unlock (&cache_lock); - return (0); + return (status); } /* int rrd_write */ static int rrd_config (const char *key, const char *value) @@ -833,16 +946,20 @@ static int rrd_shutdown (void) { pthread_mutex_lock (&cache_lock); rrd_cache_flush (-1); - if (cache != NULL) - avl_destroy (cache); - cache = NULL; pthread_mutex_unlock (&cache_lock); + pthread_mutex_lock (&queue_lock); + do_shutdown = 1; + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); + return (0); } /* int rrd_shutdown */ static int rrd_init (void) { + int status; + if (stepsize <= 0) stepsize = interval_g; if (heartbeat <= 0) @@ -857,23 +974,34 @@ static int rrd_init (void) "smaller than your `interval'. This will " "create needlessly big RRD-files."); + /* Set the cache up */ pthread_mutex_lock (&cache_lock); + + cache = avl_create ((int (*) (const void *, const void *)) strcmp); + if (cache == NULL) + { + ERROR ("rrdtool plugin: avl_create failed."); + return (-1); + } + + cache_flush_last = time (NULL); if (cache_timeout < 2) { cache_timeout = 0; cache_flush_timeout = 0; } - else - { - if (cache_flush_timeout < cache_timeout) - cache_flush_timeout = 10 * cache_timeout; + else if (cache_flush_timeout < cache_timeout) + cache_flush_timeout = 10 * cache_timeout; - cache = avl_create ((int (*) (const void *, const void *)) strcmp); - cache_flush_last = time (NULL); - plugin_register_shutdown ("rrdtool", rrd_shutdown); - } pthread_mutex_unlock (&cache_lock); + status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL); + if (status != 0) + { + ERROR ("rrdtool plugin: Cannot create queue-thread."); + return (-1); + } + DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;", (datadir == NULL) ? "(null)" : datadir, @@ -888,4 +1016,5 @@ void module_register (void) config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); plugin_register_write ("rrdtool", rrd_write); + plugin_register_shutdown ("rrdtool", rrd_shutdown); } -- 2.11.0