2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
39 typedef struct rrd_cache_s {
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
53 struct rrd_queue_s *next;
55 typedef struct rrd_queue_s rrd_queue_t;
60 static const char *config_keys[] = {
61 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
62 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
63 "XFF", "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67 * is zero a default, depending on the `interval' member of the value list is
70 static double write_rate;
71 static rrdcreate_config_t rrdcreate_config = {
77 /* timespans = */ NULL,
78 /* timespans_num = */ 0,
80 /* consolidation_functions = */ NULL,
81 /* consolidation_functions_num = */ 0,
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86 * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout;
88 static cdtime_t cache_flush_timeout;
89 static cdtime_t random_timeout;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
94 static rrd_queue_t *queue_head;
95 static rrd_queue_t *queue_tail;
96 static rrd_queue_t *flushq_head;
97 static rrd_queue_t *flushq_tail;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
107 static int do_shutdown;
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
112 optind = 0; /* bug in librrd? */
115 int status = rrd_update_r(filename, template, argc, (void *)argv);
117 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
125 #else /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
133 assert(template == NULL);
136 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137 if (new_argv == NULL) {
138 ERROR("rrdtool plugin: malloc failed.");
142 new_argv[0] = "update";
143 new_argv[1] = filename;
145 memcpy(new_argv + 2, argv, argc * sizeof(char *));
146 new_argv[new_argc] = NULL;
148 pthread_mutex_lock(&librrd_lock);
149 optind = 0; /* bug in librrd? */
152 status = rrd_update(new_argc, new_argv);
153 pthread_mutex_unlock(&librrd_lock);
156 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167 const data_set_t *ds,
168 const value_list_t *vl) {
173 memset(buffer, '\0', buffer_len);
175 tt = CDTIME_T_TO_TIME_T(vl->time);
176 status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177 if ((status < 1) || (status >= buffer_len))
181 for (size_t i = 0; i < ds->ds_num; i++) {
182 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183 (ds->ds[i].type != DS_TYPE_GAUGE) &&
184 (ds->ds[i].type != DS_TYPE_DERIVE) &&
185 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
188 if (ds->ds[i].type == DS_TYPE_COUNTER)
189 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
190 (uint64_t)vl->values[i].counter);
191 else if (ds->ds[i].type == DS_TYPE_GAUGE)
192 status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193 vl->values[i].gauge);
194 else if (ds->ds[i].type == DS_TYPE_DERIVE)
195 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196 vl->values[i].derive);
197 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199 vl->values[i].absolute);
201 if ((status < 1) || (status >= (buffer_len - offset)))
205 } /* for ds->ds_num */
208 } /* int value_list_to_string_multiple */
210 static int value_list_to_string(char *buffer, int buffer_len,
211 const data_set_t *ds, const value_list_t *vl) {
216 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
218 tt = CDTIME_T_TO_TIME_T(vl->time);
219 switch (ds->ds[0].type) {
221 status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222 vl->values[0].derive);
225 status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226 vl->values[0].gauge);
228 case DS_TYPE_COUNTER:
229 status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
230 (uint64_t)vl->values[0].counter);
232 case DS_TYPE_ABSOLUTE:
233 status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234 vl->values[0].absolute);
240 if ((status < 1) || (status >= buffer_len))
244 } /* int value_list_to_string */
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247 value_list_t const *vl) {
248 char const suffix[] = ".rrd";
252 if (datadir != NULL) {
253 size_t datadir_len = strlen(datadir) + 1;
255 if (datadir_len >= buffer_size)
258 sstrncpy(buffer, datadir, buffer_size);
259 buffer[datadir_len - 1] = '/';
260 buffer[datadir_len] = 0;
262 buffer += datadir_len;
263 buffer_size -= datadir_len;
266 status = FORMAT_VL(buffer, buffer_size, vl);
270 len = strlen(buffer);
271 assert(len < buffer_size);
275 if (buffer_size <= sizeof(suffix))
278 memcpy(buffer, suffix, sizeof(suffix));
280 } /* int value_list_to_filename */
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283 struct timeval tv_next_update;
284 struct timeval tv_now;
286 gettimeofday(&tv_next_update, /* timezone = */ NULL);
289 rrd_queue_t *queue_entry;
290 rrd_cache_t *cache_entry;
298 pthread_mutex_lock(&queue_lock);
299 /* Wait for values to arrive */
301 struct timespec ts_wait;
303 while ((flushq_head == NULL) && (queue_head == NULL) &&
305 pthread_cond_wait(&queue_cond, &queue_lock);
307 if ((flushq_head == NULL) && (queue_head == NULL))
310 /* Don't delay if there's something to flush */
311 if (flushq_head != NULL)
314 /* Don't delay if we're shutting down */
315 if (do_shutdown != 0)
318 /* Don't delay if no delay was configured. */
319 if (write_rate <= 0.0)
322 gettimeofday(&tv_now, /* timezone = */ NULL);
323 status = timeval_cmp(tv_next_update, tv_now, NULL);
324 /* We're good to go */
328 /* We're supposed to wait a bit with this update, so we'll
329 * wait for the next addition to the queue or to the end of
330 * the wait period - whichever comes first. */
331 ts_wait.tv_sec = tv_next_update.tv_sec;
332 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
334 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335 if (status == ETIMEDOUT)
339 /* XXX: If you need to lock both, cache_lock and queue_lock, at
340 * the same time, ALWAYS lock `cache_lock' first! */
342 /* We're in the shutdown phase */
343 if ((flushq_head == NULL) && (queue_head == NULL)) {
344 pthread_mutex_unlock(&queue_lock);
348 if (flushq_head != NULL) {
349 /* Dequeue the first flush entry */
350 queue_entry = flushq_head;
351 if (flushq_head == flushq_tail)
352 flushq_head = flushq_tail = NULL;
354 flushq_head = flushq_head->next;
355 } else /* if (queue_head != NULL) */
357 /* Dequeue the first regular entry */
358 queue_entry = queue_head;
359 if (queue_head == queue_tail)
360 queue_head = queue_tail = NULL;
362 queue_head = queue_head->next;
365 /* Unlock the queue again */
366 pthread_mutex_unlock(&queue_lock);
368 /* We now need the cache lock so the entry isn't updated while
369 * we make a copy of its values */
370 pthread_mutex_lock(&cache_lock);
372 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
375 values = cache_entry->values;
376 values_num = cache_entry->values_num;
378 cache_entry->values = NULL;
379 cache_entry->values_num = 0;
380 cache_entry->flags = FLAG_NONE;
383 pthread_mutex_unlock(&cache_lock);
386 sfree(queue_entry->filename);
391 /* Update `tv_next_update' */
392 if (write_rate > 0.0) {
393 gettimeofday(&tv_now, /* timezone = */ NULL);
394 tv_next_update.tv_sec = tv_now.tv_sec;
395 tv_next_update.tv_usec =
396 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397 while (tv_next_update.tv_usec > 1000000) {
398 tv_next_update.tv_sec++;
399 tv_next_update.tv_usec -= 1000000;
403 /* Write the values to the RRD-file */
404 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406 (values_num == 1) ? "" : "s", queue_entry->filename);
408 for (int i = 0; i < values_num; i++) {
412 sfree(queue_entry->filename);
416 pthread_exit((void *)0);
418 } /* void *rrd_queue_thread */
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421 rrd_queue_t **tail) {
422 rrd_queue_t *queue_entry;
424 queue_entry = malloc(sizeof(*queue_entry));
425 if (queue_entry == NULL)
428 queue_entry->filename = strdup(filename);
429 if (queue_entry->filename == NULL) {
434 queue_entry->next = NULL;
436 pthread_mutex_lock(&queue_lock);
441 (*tail)->next = queue_entry;
444 pthread_cond_signal(&queue_cond);
445 pthread_mutex_unlock(&queue_lock);
448 } /* int rrd_queue_enqueue */
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451 rrd_queue_t **tail) {
455 pthread_mutex_lock(&queue_lock);
460 while (this != NULL) {
461 if (strcmp(this->filename, filename) == 0)
469 pthread_mutex_unlock(&queue_lock);
476 prev->next = this->next;
478 if (this->next == NULL)
481 pthread_mutex_unlock(&queue_lock);
483 sfree(this->filename);
487 } /* int rrd_queue_dequeue */
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
498 c_avl_iterator_t *iter;
500 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501 CDTIME_T_TO_DOUBLE(timeout));
505 /* Build a list of entries to be flushed */
506 iter = c_avl_get_iterator(cache);
507 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508 if (rc->flags != FLAG_NONE)
510 /* timeout == 0 => flush everything */
511 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
513 else if (rc->values_num > 0) {
516 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
518 rc->flags = FLAG_QUEUED;
519 } else /* ancient and no values -> waste of memory */
521 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
523 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
524 c_avl_iterator_destroy(iter);
529 keys[keys_num] = key;
532 } /* while (c_avl_iterator_next) */
533 c_avl_iterator_destroy(iter);
535 for (int i = 0; i < keys_num; i++) {
536 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
537 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
541 assert(rc->values == NULL);
542 assert(rc->values_num == 0);
547 } /* for (i = 0..keys_num) */
551 cache_flush_last = now;
552 } /* void rrd_cache_flush */
554 static int rrd_cache_flush_identifier(cdtime_t timeout,
555 const char *identifier) {
561 if (identifier == NULL) {
562 rrd_cache_flush(timeout);
569 snprintf(key, sizeof(key), "%s.rrd", identifier);
571 snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
572 key[sizeof(key) - 1] = 0;
574 status = c_avl_get(cache, key, (void *)&rc);
576 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
577 "c_avl_get (%s) failed. Does that file really exist?",
582 if (rc->flags == FLAG_FLUSHQ) {
584 } else if (rc->flags == FLAG_QUEUED) {
585 rrd_queue_dequeue(key, &queue_head, &queue_tail);
586 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
588 rc->flags = FLAG_FLUSHQ;
589 } else if ((now - rc->first_value) < timeout) {
591 } else if (rc->values_num > 0) {
592 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
594 rc->flags = FLAG_FLUSHQ;
598 } /* int rrd_cache_flush_identifier */
600 static int64_t rrd_get_random_variation(void) {
601 if (random_timeout == 0)
604 return (int64_t)cdrand_range(-random_timeout, random_timeout);
605 } /* int64_t rrd_get_random_variation */
607 static int rrd_cache_insert(const char *filename, const char *value,
608 cdtime_t value_time) {
609 rrd_cache_t *rc = NULL;
613 pthread_mutex_lock(&cache_lock);
615 /* This shouldn't happen, but it did happen at least once, so we'll be
618 pthread_mutex_unlock(&cache_lock);
619 WARNING("rrdtool plugin: cache == NULL.");
623 int status = c_avl_get(cache, filename, (void *)&rc);
624 if ((status != 0) || (rc == NULL)) {
625 rc = malloc(sizeof(*rc));
627 pthread_mutex_unlock(&cache_lock);
634 rc->random_variation = rrd_get_random_variation();
635 rc->flags = FLAG_NONE;
639 assert(value_time > 0); /* plugin_dispatch() ensures this. */
640 if (rc->last_value >= value_time) {
641 pthread_mutex_unlock(&cache_lock);
642 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
643 ">= (value_time = %" PRIu64 ")",
644 rc->last_value, value_time);
649 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
650 if (values_new == NULL) {
651 void *cache_key = NULL;
653 c_avl_remove(cache, filename, &cache_key, NULL);
654 pthread_mutex_unlock(&cache_lock);
656 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
663 rc->values = values_new;
665 rc->values[rc->values_num] = strdup(value);
666 if (rc->values[rc->values_num] != NULL)
669 if (rc->values_num == 1)
670 rc->first_value = value_time;
671 rc->last_value = value_time;
673 /* Insert if this is the first value */
675 void *cache_key = strdup(filename);
677 if (cache_key == NULL) {
678 pthread_mutex_unlock(&cache_lock);
680 ERROR("rrdtool plugin: strdup failed: %s", STRERRNO);
682 sfree(rc->values[0]);
688 c_avl_insert(cache, cache_key, rc);
691 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
692 "values_num = %i; age = %.3f;",
693 filename, rc->values_num,
694 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
696 if ((rc->last_value - rc->first_value) >=
697 (cache_timeout + rc->random_variation)) {
698 /* XXX: If you need to lock both, cache_lock and queue_lock, at
699 * the same time, ALWAYS lock `cache_lock' first! */
700 if (rc->flags == FLAG_NONE) {
703 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
705 rc->flags = FLAG_QUEUED;
707 rc->random_variation = rrd_get_random_variation();
709 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
713 if ((cache_timeout > 0) &&
714 ((cdtime() - cache_flush_last) > cache_flush_timeout))
715 rrd_cache_flush(cache_timeout + random_timeout);
717 pthread_mutex_unlock(&cache_lock);
720 } /* int rrd_cache_insert */
722 static int rrd_cache_destroy(void) /* {{{ */
729 pthread_mutex_lock(&cache_lock);
732 pthread_mutex_unlock(&cache_lock);
736 while (c_avl_pick(cache, &key, &value) == 0) {
745 if (rc->values_num > 0)
748 for (int i = 0; i < rc->values_num; i++)
749 sfree(rc->values[i]);
754 c_avl_destroy(cache);
758 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
759 non_empty, (non_empty == 1) ? "entry" : "entries");
761 DEBUG("rrdtool plugin: No values have been lost "
762 "when destroying the cache.");
765 pthread_mutex_unlock(&cache_lock);
767 } /* }}} int rrd_cache_destroy */
769 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
770 int a = *((int *)a_ptr);
771 int b = *((int *)b_ptr);
779 } /* int rrd_compare_numeric */
781 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
782 user_data_t __attribute__((unused)) * user_data) {
787 if (0 != strcmp(ds->type, vl->type)) {
788 ERROR("rrdtool plugin: DS type does not match value list type");
792 char filename[PATH_MAX];
793 if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
796 char values[32 * (ds->ds_num + 1)];
797 if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
800 struct stat statbuf = {0};
801 if (stat(filename, &statbuf) == -1) {
802 if (errno == ENOENT) {
803 if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
805 } else if (rrdcreate_config.async) {
809 ERROR("rrdtool plugin: stat(%s) failed: %s", filename, STRERRNO);
812 } else if (!S_ISREG(statbuf.st_mode)) {
813 ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
817 return rrd_cache_insert(filename, values, vl->time);
818 } /* int rrd_write */
820 static int rrd_flush(cdtime_t timeout, const char *identifier,
821 __attribute__((unused)) user_data_t *user_data) {
822 pthread_mutex_lock(&cache_lock);
825 pthread_mutex_unlock(&cache_lock);
829 rrd_cache_flush_identifier(timeout, identifier);
831 pthread_mutex_unlock(&cache_lock);
833 } /* int rrd_flush */
835 static int rrd_config(const char *key, const char *value) {
836 if (strcasecmp("CacheTimeout", key) == 0) {
837 double tmp = atof(value);
839 fprintf(stderr, "rrdtool: `CacheTimeout' must "
840 "be greater than 0.\n");
841 ERROR("rrdtool: `CacheTimeout' must "
842 "be greater than 0.\n");
845 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
846 } else if (strcasecmp("CacheFlush", key) == 0) {
847 double tmp = atof(value);
849 fprintf(stderr, "rrdtool: `CacheFlush' must "
850 "be greater than 0.\n");
851 ERROR("rrdtool: `CacheFlush' must "
852 "be greater than 0.\n");
855 cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
856 } else if (strcasecmp("DataDir", key) == 0) {
862 ERROR("rrdtool plugin: strdup failed.");
867 while ((len > 0) && (tmp[len - 1] == '/')) {
873 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
878 if (datadir != NULL) {
883 } else if (strcasecmp("StepSize", key) == 0) {
884 unsigned long temp = strtoul(value, NULL, 0);
886 rrdcreate_config.stepsize = temp;
887 } else if (strcasecmp("HeartBeat", key) == 0) {
888 int temp = atoi(value);
890 rrdcreate_config.heartbeat = temp;
891 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
893 rrdcreate_config.async = 1;
895 rrdcreate_config.async = 0;
896 } else if (strcasecmp("RRARows", key) == 0) {
897 int tmp = atoi(value);
899 fprintf(stderr, "rrdtool: `RRARows' must "
900 "be greater than 0.\n");
901 ERROR("rrdtool: `RRARows' must "
902 "be greater than 0.\n");
905 rrdcreate_config.rrarows = tmp;
906 } else if (strcasecmp("RRATimespan", key) == 0) {
907 char *saveptr = NULL;
913 value_copy = strdup(value);
914 if (value_copy == NULL)
918 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
921 tmp_alloc = realloc(rrdcreate_config.timespans,
922 sizeof(int) * (rrdcreate_config.timespans_num + 1));
923 if (tmp_alloc == NULL) {
924 fprintf(stderr, "rrdtool: realloc failed.\n");
925 ERROR("rrdtool: realloc failed.\n");
929 rrdcreate_config.timespans = tmp_alloc;
930 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
931 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
932 rrdcreate_config.timespans_num++;
933 } /* while (strtok_r) */
935 qsort(/* base = */ rrdcreate_config.timespans,
936 /* nmemb = */ rrdcreate_config.timespans_num,
937 /* size = */ sizeof(rrdcreate_config.timespans[0]),
938 /* compar = */ rrd_compare_numeric);
941 } else if (strcasecmp("XFF", key) == 0) {
942 double tmp = atof(value);
943 if ((tmp < 0.0) || (tmp >= 1.0)) {
944 fprintf(stderr, "rrdtool: `XFF' must "
945 "be in the range 0 to 1 (exclusive).");
946 ERROR("rrdtool: `XFF' must "
947 "be in the range 0 to 1 (exclusive).");
950 rrdcreate_config.xff = tmp;
951 } else if (strcasecmp("WritesPerSecond", key) == 0) {
952 double wps = atof(value);
955 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
956 "greater than or equal to zero.");
958 } else if (wps == 0.0) {
961 write_rate = 1.0 / wps;
963 } else if (strcasecmp("RandomTimeout", key) == 0) {
968 fprintf(stderr, "rrdtool: `RandomTimeout' must "
969 "be greater than or equal to zero.\n");
970 ERROR("rrdtool: `RandomTimeout' must "
971 "be greater then or equal to zero.");
973 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
979 } /* int rrd_config */
981 static int rrd_shutdown(void) {
982 pthread_mutex_lock(&cache_lock);
984 pthread_mutex_unlock(&cache_lock);
986 pthread_mutex_lock(&queue_lock);
988 pthread_cond_signal(&queue_cond);
989 pthread_mutex_unlock(&queue_lock);
991 if ((queue_thread_running != 0) &&
992 ((queue_head != NULL) || (flushq_head != NULL))) {
993 INFO("rrdtool plugin: Shutting down the queue thread. "
994 "This may take a while.");
995 } else if (queue_thread_running != 0) {
996 INFO("rrdtool plugin: Shutting down the queue thread.");
999 /* Wait for all the values to be written to disk before returning. */
1000 if (queue_thread_running != 0) {
1001 pthread_join(queue_thread, NULL);
1002 memset(&queue_thread, 0, sizeof(queue_thread));
1003 queue_thread_running = 0;
1004 DEBUG("rrdtool plugin: queue_thread exited.");
1007 rrd_cache_destroy();
1010 } /* int rrd_shutdown */
1012 static int rrd_init(void) {
1013 static int init_once;
1019 if (rrdcreate_config.heartbeat <= 0)
1020 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1022 /* Set the cache up */
1023 pthread_mutex_lock(&cache_lock);
1025 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1026 if (cache == NULL) {
1027 pthread_mutex_unlock(&cache_lock);
1028 ERROR("rrdtool plugin: c_avl_create failed.");
1032 cache_flush_last = cdtime();
1033 if (cache_timeout == 0) {
1035 cache_flush_timeout = 0;
1036 } else if (cache_flush_timeout < cache_timeout) {
1037 INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1038 "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1039 CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1040 CDTIME_T_TO_DOUBLE(cache_timeout),
1041 CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1042 cache_flush_timeout = 10 * cache_timeout;
1045 /* Assure that "cache_timeout + random_variation" is never negative. */
1046 if (random_timeout > cache_timeout) {
1047 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1048 CDTIME_T_TO_DOUBLE(cache_timeout));
1049 random_timeout = cache_timeout;
1052 pthread_mutex_unlock(&cache_lock);
1055 plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1056 /* args = */ NULL, "rrdtool queue");
1058 ERROR("rrdtool plugin: Cannot create queue-thread.");
1061 queue_thread_running = 1;
1063 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1064 " heartbeat = %i; rrarows = %i; xff = %lf;",
1065 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1066 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1067 rrdcreate_config.xff);
1070 } /* int rrd_init */
1072 void module_register(void) {
1073 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1074 plugin_register_init("rrdtool", rrd_init);
1075 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1076 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1077 plugin_register_shutdown("rrdtool", rrd_shutdown);