2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
39 typedef struct rrd_cache_s {
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
53 struct rrd_queue_s *next;
55 typedef struct rrd_queue_s rrd_queue_t;
60 static const char *config_keys[] = {
61 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
62 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
63 "XFF", "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67 * is zero a default, depending on the `interval' member of the value list is
69 static char *datadir = NULL;
70 static double write_rate = 0.0;
71 static rrdcreate_config_t rrdcreate_config = {
77 /* timespans = */ NULL,
78 /* timespans_num = */ 0,
80 /* consolidation_functions = */ NULL,
81 /* consolidation_functions_num = */ 0,
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86 * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout = 0;
88 static cdtime_t cache_flush_timeout = 0;
89 static cdtime_t random_timeout = 0;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache = NULL;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
94 static rrd_queue_t *queue_head = NULL;
95 static rrd_queue_t *queue_tail = NULL;
96 static rrd_queue_t *flushq_head = NULL;
97 static rrd_queue_t *flushq_tail = NULL;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
107 static int do_shutdown = 0;
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
112 optind = 0; /* bug in librrd? */
115 int status = rrd_update_r(filename, template, argc, (void *)argv);
117 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
125 #else /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
133 assert(template == NULL);
136 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137 if (new_argv == NULL) {
138 ERROR("rrdtool plugin: malloc failed.");
142 new_argv[0] = "update";
143 new_argv[1] = filename;
145 memcpy(new_argv + 2, argv, argc * sizeof(char *));
146 new_argv[new_argc] = NULL;
148 pthread_mutex_lock(&librrd_lock);
149 optind = 0; /* bug in librrd? */
152 status = rrd_update(new_argc, new_argv);
153 pthread_mutex_unlock(&librrd_lock);
156 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167 const data_set_t *ds,
168 const value_list_t *vl) {
173 memset(buffer, '\0', buffer_len);
175 tt = CDTIME_T_TO_TIME_T(vl->time);
176 status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177 if ((status < 1) || (status >= buffer_len))
181 for (size_t i = 0; i < ds->ds_num; i++) {
182 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183 (ds->ds[i].type != DS_TYPE_GAUGE) &&
184 (ds->ds[i].type != DS_TYPE_DERIVE) &&
185 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
188 if (ds->ds[i].type == DS_TYPE_COUNTER)
189 status = snprintf(buffer + offset, buffer_len - offset, ":%llu",
190 vl->values[i].counter);
191 else if (ds->ds[i].type == DS_TYPE_GAUGE)
192 status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193 vl->values[i].gauge);
194 else if (ds->ds[i].type == DS_TYPE_DERIVE)
195 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196 vl->values[i].derive);
197 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199 vl->values[i].absolute);
201 if ((status < 1) || (status >= (buffer_len - offset)))
205 } /* for ds->ds_num */
208 } /* int value_list_to_string_multiple */
210 static int value_list_to_string(char *buffer, int buffer_len,
211 const data_set_t *ds, const value_list_t *vl) {
216 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
218 tt = CDTIME_T_TO_TIME_T(vl->time);
219 switch (ds->ds[0].type) {
221 status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222 vl->values[0].derive);
225 status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226 vl->values[0].gauge);
228 case DS_TYPE_COUNTER:
229 status = snprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
230 vl->values[0].counter);
232 case DS_TYPE_ABSOLUTE:
233 status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234 vl->values[0].absolute);
240 if ((status < 1) || (status >= buffer_len))
244 } /* int value_list_to_string */
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247 value_list_t const *vl) {
248 char const suffix[] = ".rrd";
252 if (datadir != NULL) {
253 size_t datadir_len = strlen(datadir) + 1;
255 if (datadir_len >= buffer_size)
258 sstrncpy(buffer, datadir, buffer_size);
259 buffer[datadir_len - 1] = '/';
260 buffer[datadir_len] = 0;
262 buffer += datadir_len;
263 buffer_size -= datadir_len;
266 status = FORMAT_VL(buffer, buffer_size, vl);
270 len = strlen(buffer);
271 assert(len < buffer_size);
275 if (buffer_size <= sizeof(suffix))
278 memcpy(buffer, suffix, sizeof(suffix));
280 } /* int value_list_to_filename */
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283 struct timeval tv_next_update;
284 struct timeval tv_now;
286 gettimeofday(&tv_next_update, /* timezone = */ NULL);
289 rrd_queue_t *queue_entry;
290 rrd_cache_t *cache_entry;
298 pthread_mutex_lock(&queue_lock);
299 /* Wait for values to arrive */
301 struct timespec ts_wait;
303 while ((flushq_head == NULL) && (queue_head == NULL) &&
305 pthread_cond_wait(&queue_cond, &queue_lock);
307 if ((flushq_head == NULL) && (queue_head == NULL))
310 /* Don't delay if there's something to flush */
311 if (flushq_head != NULL)
314 /* Don't delay if we're shutting down */
315 if (do_shutdown != 0)
318 /* Don't delay if no delay was configured. */
319 if (write_rate <= 0.0)
322 gettimeofday(&tv_now, /* timezone = */ NULL);
323 status = timeval_cmp(tv_next_update, tv_now, NULL);
324 /* We're good to go */
328 /* We're supposed to wait a bit with this update, so we'll
329 * wait for the next addition to the queue or to the end of
330 * the wait period - whichever comes first. */
331 ts_wait.tv_sec = tv_next_update.tv_sec;
332 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
334 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335 if (status == ETIMEDOUT)
339 /* XXX: If you need to lock both, cache_lock and queue_lock, at
340 * the same time, ALWAYS lock `cache_lock' first! */
342 /* We're in the shutdown phase */
343 if ((flushq_head == NULL) && (queue_head == NULL)) {
344 pthread_mutex_unlock(&queue_lock);
348 if (flushq_head != NULL) {
349 /* Dequeue the first flush entry */
350 queue_entry = flushq_head;
351 if (flushq_head == flushq_tail)
352 flushq_head = flushq_tail = NULL;
354 flushq_head = flushq_head->next;
355 } else /* if (queue_head != NULL) */
357 /* Dequeue the first regular entry */
358 queue_entry = queue_head;
359 if (queue_head == queue_tail)
360 queue_head = queue_tail = NULL;
362 queue_head = queue_head->next;
365 /* Unlock the queue again */
366 pthread_mutex_unlock(&queue_lock);
368 /* We now need the cache lock so the entry isn't updated while
369 * we make a copy of its values */
370 pthread_mutex_lock(&cache_lock);
372 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
375 values = cache_entry->values;
376 values_num = cache_entry->values_num;
378 cache_entry->values = NULL;
379 cache_entry->values_num = 0;
380 cache_entry->flags = FLAG_NONE;
383 pthread_mutex_unlock(&cache_lock);
386 sfree(queue_entry->filename);
391 /* Update `tv_next_update' */
392 if (write_rate > 0.0) {
393 gettimeofday(&tv_now, /* timezone = */ NULL);
394 tv_next_update.tv_sec = tv_now.tv_sec;
395 tv_next_update.tv_usec =
396 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397 while (tv_next_update.tv_usec > 1000000) {
398 tv_next_update.tv_sec++;
399 tv_next_update.tv_usec -= 1000000;
403 /* Write the values to the RRD-file */
404 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406 (values_num == 1) ? "" : "s", queue_entry->filename);
408 for (int i = 0; i < values_num; i++) {
412 sfree(queue_entry->filename);
416 pthread_exit((void *)0);
418 } /* void *rrd_queue_thread */
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421 rrd_queue_t **tail) {
422 rrd_queue_t *queue_entry;
424 queue_entry = malloc(sizeof(*queue_entry));
425 if (queue_entry == NULL)
428 queue_entry->filename = strdup(filename);
429 if (queue_entry->filename == NULL) {
434 queue_entry->next = NULL;
436 pthread_mutex_lock(&queue_lock);
441 (*tail)->next = queue_entry;
444 pthread_cond_signal(&queue_cond);
445 pthread_mutex_unlock(&queue_lock);
448 } /* int rrd_queue_enqueue */
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451 rrd_queue_t **tail) {
455 pthread_mutex_lock(&queue_lock);
460 while (this != NULL) {
461 if (strcmp(this->filename, filename) == 0)
469 pthread_mutex_unlock(&queue_lock);
476 prev->next = this->next;
478 if (this->next == NULL)
481 pthread_mutex_unlock(&queue_lock);
483 sfree(this->filename);
487 } /* int rrd_queue_dequeue */
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
498 c_avl_iterator_t *iter;
500 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501 CDTIME_T_TO_DOUBLE(timeout));
505 /* Build a list of entries to be flushed */
506 iter = c_avl_get_iterator(cache);
507 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508 if (rc->flags != FLAG_NONE)
510 /* timeout == 0 => flush everything */
511 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
513 else if (rc->values_num > 0) {
516 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
518 rc->flags = FLAG_QUEUED;
519 } else /* ancient and no values -> waste of memory */
521 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
524 ERROR("rrdtool plugin: "
525 "realloc failed: %s",
526 sstrerror(errno, errbuf, sizeof(errbuf)));
527 c_avl_iterator_destroy(iter);
532 keys[keys_num] = key;
535 } /* while (c_avl_iterator_next) */
536 c_avl_iterator_destroy(iter);
538 for (int i = 0; i < keys_num; i++) {
539 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
540 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
544 assert(rc->values == NULL);
545 assert(rc->values_num == 0);
550 } /* for (i = 0..keys_num) */
554 cache_flush_last = now;
555 } /* void rrd_cache_flush */
557 static int rrd_cache_flush_identifier(cdtime_t timeout,
558 const char *identifier) {
564 if (identifier == NULL) {
565 rrd_cache_flush(timeout);
572 snprintf(key, sizeof(key), "%s.rrd", identifier);
574 snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
575 key[sizeof(key) - 1] = 0;
577 status = c_avl_get(cache, key, (void *)&rc);
579 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
580 "c_avl_get (%s) failed. Does that file really exist?",
585 if (rc->flags == FLAG_FLUSHQ) {
587 } else if (rc->flags == FLAG_QUEUED) {
588 rrd_queue_dequeue(key, &queue_head, &queue_tail);
589 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
591 rc->flags = FLAG_FLUSHQ;
592 } else if ((now - rc->first_value) < timeout) {
594 } else if (rc->values_num > 0) {
595 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
597 rc->flags = FLAG_FLUSHQ;
601 } /* int rrd_cache_flush_identifier */
603 static int64_t rrd_get_random_variation(void) {
604 if (random_timeout == 0)
607 return (int64_t)cdrand_range(-random_timeout, random_timeout);
608 } /* int64_t rrd_get_random_variation */
610 static int rrd_cache_insert(const char *filename, const char *value,
611 cdtime_t value_time) {
612 rrd_cache_t *rc = NULL;
616 pthread_mutex_lock(&cache_lock);
618 /* This shouldn't happen, but it did happen at least once, so we'll be
621 pthread_mutex_unlock(&cache_lock);
622 WARNING("rrdtool plugin: cache == NULL.");
626 c_avl_get(cache, filename, (void *)&rc);
629 rc = malloc(sizeof(*rc));
631 pthread_mutex_unlock(&cache_lock);
638 rc->random_variation = rrd_get_random_variation();
639 rc->flags = FLAG_NONE;
643 assert(value_time > 0); /* plugin_dispatch() ensures this. */
644 if (rc->last_value >= value_time) {
645 pthread_mutex_unlock(&cache_lock);
646 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
647 ">= (value_time = %" PRIu64 ")",
648 rc->last_value, value_time);
653 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
654 if (values_new == NULL) {
656 void *cache_key = NULL;
658 sstrerror(errno, errbuf, sizeof(errbuf));
660 c_avl_remove(cache, filename, &cache_key, NULL);
661 pthread_mutex_unlock(&cache_lock);
663 ERROR("rrdtool plugin: realloc failed: %s", errbuf);
670 rc->values = values_new;
672 rc->values[rc->values_num] = strdup(value);
673 if (rc->values[rc->values_num] != NULL)
676 if (rc->values_num == 1)
677 rc->first_value = value_time;
678 rc->last_value = value_time;
680 /* Insert if this is the first value */
682 void *cache_key = strdup(filename);
684 if (cache_key == NULL) {
686 sstrerror(errno, errbuf, sizeof(errbuf));
688 pthread_mutex_unlock(&cache_lock);
690 ERROR("rrdtool plugin: strdup failed: %s", errbuf);
692 sfree(rc->values[0]);
698 c_avl_insert(cache, cache_key, rc);
701 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
702 "values_num = %i; age = %.3f;",
703 filename, rc->values_num,
704 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
706 if ((rc->last_value - rc->first_value) >=
707 (cache_timeout + rc->random_variation)) {
708 /* XXX: If you need to lock both, cache_lock and queue_lock, at
709 * the same time, ALWAYS lock `cache_lock' first! */
710 if (rc->flags == FLAG_NONE) {
713 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
715 rc->flags = FLAG_QUEUED;
717 rc->random_variation = rrd_get_random_variation();
719 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
723 if ((cache_timeout > 0) &&
724 ((cdtime() - cache_flush_last) > cache_flush_timeout))
725 rrd_cache_flush(cache_timeout + random_timeout);
727 pthread_mutex_unlock(&cache_lock);
730 } /* int rrd_cache_insert */
732 static int rrd_cache_destroy(void) /* {{{ */
739 pthread_mutex_lock(&cache_lock);
742 pthread_mutex_unlock(&cache_lock);
746 while (c_avl_pick(cache, &key, &value) == 0) {
755 if (rc->values_num > 0)
758 for (int i = 0; i < rc->values_num; i++)
759 sfree(rc->values[i]);
764 c_avl_destroy(cache);
768 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
769 non_empty, (non_empty == 1) ? "entry" : "entries");
771 DEBUG("rrdtool plugin: No values have been lost "
772 "when destroying the cache.");
775 pthread_mutex_unlock(&cache_lock);
777 } /* }}} int rrd_cache_destroy */
779 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
780 int a = *((int *)a_ptr);
781 int b = *((int *)b_ptr);
789 } /* int rrd_compare_numeric */
791 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
792 user_data_t __attribute__((unused)) * user_data) {
797 if (0 != strcmp(ds->type, vl->type)) {
798 ERROR("rrdtool plugin: DS type does not match value list type");
802 char filename[PATH_MAX];
803 if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
806 char values[32 * ds->ds_num];
807 if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
810 struct stat statbuf = {0};
811 if (stat(filename, &statbuf) == -1) {
812 if (errno == ENOENT) {
813 if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
815 } else if (rrdcreate_config.async) {
820 ERROR("rrdtool plugin: stat(%s) failed: %s", filename,
821 sstrerror(errno, errbuf, sizeof(errbuf)));
824 } else if (!S_ISREG(statbuf.st_mode)) {
825 ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
829 return rrd_cache_insert(filename, values, vl->time);
830 } /* int rrd_write */
832 static int rrd_flush(cdtime_t timeout, const char *identifier,
833 __attribute__((unused)) user_data_t *user_data) {
834 pthread_mutex_lock(&cache_lock);
837 pthread_mutex_unlock(&cache_lock);
841 rrd_cache_flush_identifier(timeout, identifier);
843 pthread_mutex_unlock(&cache_lock);
845 } /* int rrd_flush */
847 static int rrd_config(const char *key, const char *value) {
848 if (strcasecmp("CacheTimeout", key) == 0) {
849 double tmp = atof(value);
851 fprintf(stderr, "rrdtool: `CacheTimeout' must "
852 "be greater than 0.\n");
853 ERROR("rrdtool: `CacheTimeout' must "
854 "be greater than 0.\n");
857 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
858 } else if (strcasecmp("CacheFlush", key) == 0) {
859 double tmp = atof(value);
861 fprintf(stderr, "rrdtool: `CacheFlush' must "
862 "be greater than 0.\n");
863 ERROR("rrdtool: `CacheFlush' must "
864 "be greater than 0.\n");
867 cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
868 } else if (strcasecmp("DataDir", key) == 0) {
874 ERROR("rrdtool plugin: strdup failed.");
879 while ((len > 0) && (tmp[len - 1] == '/')) {
885 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
890 if (datadir != NULL) {
895 } else if (strcasecmp("StepSize", key) == 0) {
896 unsigned long temp = strtoul(value, NULL, 0);
898 rrdcreate_config.stepsize = temp;
899 } else if (strcasecmp("HeartBeat", key) == 0) {
900 int temp = atoi(value);
902 rrdcreate_config.heartbeat = temp;
903 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
905 rrdcreate_config.async = 1;
907 rrdcreate_config.async = 0;
908 } else if (strcasecmp("RRARows", key) == 0) {
909 int tmp = atoi(value);
911 fprintf(stderr, "rrdtool: `RRARows' must "
912 "be greater than 0.\n");
913 ERROR("rrdtool: `RRARows' must "
914 "be greater than 0.\n");
917 rrdcreate_config.rrarows = tmp;
918 } else if (strcasecmp("RRATimespan", key) == 0) {
919 char *saveptr = NULL;
925 value_copy = strdup(value);
926 if (value_copy == NULL)
930 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
933 tmp_alloc = realloc(rrdcreate_config.timespans,
934 sizeof(int) * (rrdcreate_config.timespans_num + 1));
935 if (tmp_alloc == NULL) {
936 fprintf(stderr, "rrdtool: realloc failed.\n");
937 ERROR("rrdtool: realloc failed.\n");
941 rrdcreate_config.timespans = tmp_alloc;
942 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
943 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
944 rrdcreate_config.timespans_num++;
945 } /* while (strtok_r) */
947 qsort(/* base = */ rrdcreate_config.timespans,
948 /* nmemb = */ rrdcreate_config.timespans_num,
949 /* size = */ sizeof(rrdcreate_config.timespans[0]),
950 /* compar = */ rrd_compare_numeric);
953 } else if (strcasecmp("XFF", key) == 0) {
954 double tmp = atof(value);
955 if ((tmp < 0.0) || (tmp >= 1.0)) {
956 fprintf(stderr, "rrdtool: `XFF' must "
957 "be in the range 0 to 1 (exclusive).");
958 ERROR("rrdtool: `XFF' must "
959 "be in the range 0 to 1 (exclusive).");
962 rrdcreate_config.xff = tmp;
963 } else if (strcasecmp("WritesPerSecond", key) == 0) {
964 double wps = atof(value);
967 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
968 "greater than or equal to zero.");
970 } else if (wps == 0.0) {
973 write_rate = 1.0 / wps;
975 } else if (strcasecmp("RandomTimeout", key) == 0) {
980 fprintf(stderr, "rrdtool: `RandomTimeout' must "
981 "be greater than or equal to zero.\n");
982 ERROR("rrdtool: `RandomTimeout' must "
983 "be greater then or equal to zero.");
985 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
991 } /* int rrd_config */
993 static int rrd_shutdown(void) {
994 pthread_mutex_lock(&cache_lock);
996 pthread_mutex_unlock(&cache_lock);
998 pthread_mutex_lock(&queue_lock);
1000 pthread_cond_signal(&queue_cond);
1001 pthread_mutex_unlock(&queue_lock);
1003 if ((queue_thread_running != 0) &&
1004 ((queue_head != NULL) || (flushq_head != NULL))) {
1005 INFO("rrdtool plugin: Shutting down the queue thread. "
1006 "This may take a while.");
1007 } else if (queue_thread_running != 0) {
1008 INFO("rrdtool plugin: Shutting down the queue thread.");
1011 /* Wait for all the values to be written to disk before returning. */
1012 if (queue_thread_running != 0) {
1013 pthread_join(queue_thread, NULL);
1014 memset(&queue_thread, 0, sizeof(queue_thread));
1015 queue_thread_running = 0;
1016 DEBUG("rrdtool plugin: queue_thread exited.");
1019 rrd_cache_destroy();
1022 } /* int rrd_shutdown */
1024 static int rrd_init(void) {
1025 static int init_once = 0;
1031 if (rrdcreate_config.heartbeat <= 0)
1032 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1034 /* Set the cache up */
1035 pthread_mutex_lock(&cache_lock);
1037 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1038 if (cache == NULL) {
1039 pthread_mutex_unlock(&cache_lock);
1040 ERROR("rrdtool plugin: c_avl_create failed.");
1044 cache_flush_last = cdtime();
1045 if (cache_timeout == 0) {
1047 cache_flush_timeout = 0;
1048 } else if (cache_flush_timeout < cache_timeout) {
1049 INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1051 "Ajusting \"CacheFlush\" to %.3f seconds.",
1052 CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1053 CDTIME_T_TO_DOUBLE(cache_timeout),
1054 CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1055 cache_flush_timeout = 10 * cache_timeout;
1058 /* Assure that "cache_timeout + random_variation" is never negative. */
1059 if (random_timeout > cache_timeout) {
1060 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1061 CDTIME_T_TO_DOUBLE(cache_timeout));
1062 random_timeout = cache_timeout;
1065 pthread_mutex_unlock(&cache_lock);
1068 plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1069 /* args = */ NULL, "rrdtool queue");
1071 ERROR("rrdtool plugin: Cannot create queue-thread.");
1074 queue_thread_running = 1;
1076 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1077 " heartbeat = %i; rrarows = %i; xff = %lf;",
1078 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1079 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1080 rrdcreate_config.xff);
1083 } /* int rrd_init */
1085 void module_register(void) {
1086 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1087 plugin_register_init("rrdtool", rrd_init);
1088 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1089 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1090 plugin_register_shutdown("rrdtool", rrd_shutdown);