From: Florian Forster Date: Fri, 22 Aug 2008 14:12:41 +0000 (+0200) Subject: rrdtool plugin: Implemented the `WritesPerSecond' option. X-Git-Tag: collectd-4.5.0~53 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=ea90b50563a778907ea83ca031b7253905947798;p=collectd.git rrdtool plugin: Implemented the `WritesPerSecond' option. This option lets you slow down the `queue thread' within the rrdtool plugin, so that the system stays responsive while writing all values to disk. When FLUSH'ing values and during shutdown this limit is not in effect. --- diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index d9a4b04c..ea508fbb 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -1449,7 +1449,7 @@ Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option. =item B I -When the C uses a cache (by setting B, see below) +When the C plugin uses a cache (by setting B, see below) it writes all values for a certain RRD-file if the oldest value is older than (or equal to) the number of seconds specified. If some RRD-file is not updated anymore for some reason (the computer was shut down, the network is broken, @@ -1468,6 +1468,30 @@ reduces IO-operations and thus lessens the load produced by updating the files. The trade off is that the graphs kind of "drag behind" and that more memory is used. +=item B B + +When collecting many statistics with collectd and the C plugin, you +will run serious performance problems. The B setting and the +internal update queue assert that collectd continues to work just fine even +under heavy load, but the system may become very unresponsive and slow. This is +a problem especially if create graphs from the RRD files on the same machine, +for example using the C script included in the +C directory. + +This setting is designed for very large setups. Setting this option to a value +between 25 and 80 updates per second, depending on your hardware, will leave +the server responsive enough to draw graphs even while all the cached values +are written to disk. Flushed values, i.Ee. values that are forced to disk +by the B command, are B effected by this limit. They are still +written as fast as possible, so that web frontends have up to date data when +generating graphs. + +For example: If you have 100,000 RRD files and set B to 30 +updates per second, writing all values to disk will take approximately +56Eminutes. Together with the flushing ability that's integrated into +"collection3" you'll end up with a responsive and fast system, up to date +graphs and basically a "backup" of your values every hour. + =back =head2 Plugin C diff --git a/src/rrdtool.c b/src/rrdtool.c index 7cf24367..2e1727a8 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -42,7 +42,8 @@ struct rrd_cache_s enum { FLAG_NONE = 0x00, - FLAG_QUEUED = 0x01 + FLAG_QUEUED = 0x01, + FLAG_FLUSHQ = 0x02 } flags; }; typedef struct rrd_cache_s rrd_cache_t; @@ -94,7 +95,8 @@ static const char *config_keys[] = "HeartBeat", "RRARows", "RRATimespan", - "XFF" + "XFF", + "WritesPerSecond" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); @@ -106,6 +108,7 @@ static int stepsize = 0; static int heartbeat = 0; static int rrarows = 1200; static double xff = 0.1; +static double write_rate = 0.0; /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time, * ALWAYS lock `cache_lock' first! */ @@ -117,6 +120,8 @@ static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; +static rrd_queue_t *flushq_head = NULL; +static rrd_queue_t *flushq_tail = NULL; static pthread_t queue_thread = 0; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -605,6 +610,11 @@ static int value_list_to_filename (char *buffer, int buffer_len, static void *rrd_queue_thread (void *data) { + struct timeval tv_next_update; + struct timeval tv_now; + + gettimeofday (&tv_next_update, /* timezone = */ NULL); + while (42) { rrd_queue_t *queue_entry; @@ -613,27 +623,79 @@ static void *rrd_queue_thread (void *data) int values_num; int i; - /* XXX: If you need to lock both, cache_lock and queue_lock, at - * the same time, ALWAYS lock `cache_lock' first! */ - - /* wait until an entry is available */ - pthread_mutex_lock (&queue_lock); - while ((queue_head == NULL) && (do_shutdown == 0)) - pthread_cond_wait (&queue_cond, &queue_lock); - - /* We're in the shutdown phase */ - if (queue_head == NULL) - { - pthread_mutex_unlock (&queue_lock); - break; - } - - /* Dequeue the first entry */ - queue_entry = queue_head; - if (queue_head == queue_tail) - queue_head = queue_tail = NULL; - else - queue_head = queue_head->next; + pthread_mutex_lock (&queue_lock); + /* Wait for values to arrive */ + while (true) + { + struct timespec ts_wait; + int status; + + while ((flushq_head == NULL) && (queue_head == NULL) + && (do_shutdown == 0)) + pthread_cond_wait (&queue_cond, &queue_lock); + + if ((flushq_head == NULL) && (queue_head == NULL)) + break; + + /* Don't delay if there's something to flush */ + if (flushq_head != NULL) + break; + + /* Don't delay if we're shutting down */ + if (do_shutdown != 0) + break; + + /* Don't delay if no delay was configured. */ + if (write_rate <= 0.0) + break; + + gettimeofday (&tv_now, /* timezone = */ NULL); + status = timeval_sub_timespec (&tv_next_update, &tv_now, + &ts_wait); + /* We're good to go */ + if (status != 0) + break; + + /* We're supposed to wait a bit with this update, so we'll + * wait for the next addition to the queue or to the end of + * the wait period - whichever comes first. */ + ts_wait.tv_sec = tv_next_update.tv_sec; + ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec; + + status = pthread_cond_timedwait (&queue_cond, &queue_lock, + &ts_wait); + if (status == ETIMEDOUT) + break; + } /* while (true) */ + + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + + /* We're in the shutdown phase */ + if ((flushq_head == NULL) && (queue_head == NULL)) + { + pthread_mutex_unlock (&queue_lock); + break; + } + + if (flushq_head != NULL) + { + /* Dequeue the first flush entry */ + queue_entry = flushq_head; + if (flushq_head == flushq_tail) + flushq_head = flushq_tail = NULL; + else + flushq_head = flushq_head->next; + } + else /* if (queue_head != NULL) */ + { + /* Dequeue the first regular entry */ + queue_entry = queue_head; + if (queue_head == queue_tail) + queue_head = queue_tail = NULL; + else + queue_head = queue_head->next; + } /* Unlock the queue again */ pthread_mutex_unlock (&queue_lock); @@ -653,6 +715,20 @@ static void *rrd_queue_thread (void *data) pthread_mutex_unlock (&cache_lock); + /* Update `tv_next_update' */ + if (write_rate > 0.0) + { + gettimeofday (&tv_now, /* timezone = */ NULL); + tv_next_update.tv_sec = tv_now.tv_sec; + tv_next_update.tv_usec = tv_now.tv_usec + + ((suseconds_t) (1000000 * write_rate)); + while (tv_next_update.tv_usec > 1000000) + { + tv_next_update.tv_sec++; + tv_next_update.tv_usec -= 1000000; + } + } + /* Write the values to the RRD-file */ srrd_update (queue_entry->filename, NULL, values_num, (const char **)values); @@ -677,7 +753,8 @@ static void *rrd_queue_thread (void *data) return ((void *) 0); } /* void *rrd_queue_thread */ -static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir) +static int rrd_queue_enqueue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) { rrd_queue_t *queue_entry; @@ -695,55 +772,60 @@ static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir) queue_entry->next = NULL; pthread_mutex_lock (&queue_lock); - if (dir == QUEUE_INSERT_FRONT) - { - queue_entry->next = queue_head; - queue_head = queue_entry; - if (queue_tail == NULL) - queue_tail = queue_head; - } - else /* (dir == QUEUE_INSERT_BACK) */ - { - if (queue_tail == NULL) - queue_head = queue_entry; - else - queue_tail->next = queue_entry; - queue_tail = queue_entry; - } + + if (*tail == NULL) + *head = queue_entry; + else + (*tail)->next = queue_entry; + *tail = queue_entry; + pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); - DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename); - return (0); -} /* int rrd_queue_cache_entry */ +} /* int rrd_queue_enqueue */ -static int rrd_queue_move_to_front (const char *filename) +static int rrd_queue_dequeue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) { rrd_queue_t *this; rrd_queue_t *prev; - this = NULL; - prev = NULL; pthread_mutex_lock (&queue_lock); - for (this = queue_head; this != NULL; this = this->next) + + prev = NULL; + this = *head; + + while (this != NULL) { if (strcmp (this->filename, filename) == 0) break; + prev = this; + this = this->next; } - /* Check if we found the entry and if it is NOT the first entry. */ - if ((this != NULL) && (prev != NULL)) + if (this == NULL) { - prev->next = this->next; - this->next = queue_head; - queue_head = this; + pthread_mutex_unlock (&queue_lock); + return (-1); } + + if (prev == NULL) + *head = this->next; + else + prev->next = this->next; + + if (this->next == NULL) + *tail = prev; + pthread_mutex_unlock (&queue_lock); + sfree (this->filename); + sfree (this); + return (0); -} /* int rrd_queue_move_to_front */ +} /* int rrd_queue_dequeue */ static void rrd_cache_flush (int timeout) { @@ -765,13 +847,16 @@ static void rrd_cache_flush (int timeout) iter = c_avl_get_iterator (cache); while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { - if (rc->flags == FLAG_QUEUED) + if (rc->flags != FLAG_NONE) continue; else if ((now - rc->first_value) < timeout) continue; else if (rc->values_num > 0) { - if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0) + int status; + + status = rrd_queue_enqueue (key, &queue_head, &queue_tail); + if (status == 0) rc->flags = FLAG_QUEUED; } else /* ancient and no values -> waste of memory */ @@ -833,31 +918,42 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier) now = time (NULL); if (datadir == NULL) - snprintf (key, sizeof (key), "%s.rrd", - identifier); + snprintf (key, sizeof (key), "%s.rrd", + identifier); else - snprintf (key, sizeof (key), "%s/%s.rrd", - datadir, identifier); + snprintf (key, sizeof (key), "%s/%s.rrd", + datadir, identifier); key[sizeof (key) - 1] = 0; status = c_avl_get (cache, key, (void *) &rc); if (status != 0) { WARNING ("rrdtool plugin: rrd_cache_flush_identifier: " - "c_avl_get (%s) failed. Does that file really exist?", - key); + "c_avl_get (%s) failed. Does that file really exist?", + key); return (status); } - if (rc->flags == FLAG_QUEUED) - status = rrd_queue_move_to_front (key); + if (rc->flags == FLAG_FLUSHQ) + { + status = 0; + } + else if (rc->flags == FLAG_QUEUED) + { + rrd_queue_dequeue (key, &queue_head, &queue_tail); + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); + if (status == 0) + rc->flags = FLAG_FLUSHQ; + } else if ((now - rc->first_value) < timeout) + { status = 0; + } else if (rc->values_num > 0) { - status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT); + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); if (status == 0) - rc->flags = FLAG_QUEUED; + rc->flags = FLAG_FLUSHQ; } return (status); @@ -957,9 +1053,12 @@ static int rrd_cache_insert (const char *filename, { /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ - if (rc->flags != FLAG_QUEUED) + if (rc->flags == FLAG_NONE) { - if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0) + int status; + + status = rrd_queue_enqueue (filename, &queue_head, &queue_tail); + if (status == 0) rc->flags = FLAG_QUEUED; } else @@ -972,7 +1071,6 @@ static int rrd_cache_insert (const char *filename, ((time (NULL) - cache_flush_last) > cache_flush_timeout)) rrd_cache_flush (cache_flush_timeout); - pthread_mutex_unlock (&cache_lock); return (0); @@ -1168,6 +1266,25 @@ static int rrd_config (const char *key, const char *value) } xff = tmp; } + else if (strcasecmp ("WritesPerSecond", key) == 0) + { + double wps = atof (value); + + if (wps < 0.0) + { + fprintf (stderr, "rrdtool: `WritesPerSecond' must be " + "greater than or equal to zero."); + return (1); + } + else if (wps == 0.0) + { + write_rate = 0.0; + } + else + { + write_rate = 1.0 / wps; + } + } else { return (-1);