2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
47 typedef struct rrd_cache_s rrd_cache_t;
49 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
50 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
54 struct rrd_queue_s *next;
56 typedef struct rrd_queue_s rrd_queue_t;
61 static const char *config_keys[] = {
62 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
63 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
64 "XFF", "WritesPerSecond", "RandomTimeout"};
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
67 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
68 * is zero a default, depending on the `interval' member of the value list is
70 static char *datadir = NULL;
71 static double write_rate = 0.0;
72 static rrdcreate_config_t rrdcreate_config = {
78 /* timespans = */ NULL,
79 /* timespans_num = */ 0,
81 /* consolidation_functions = */ NULL,
82 /* consolidation_functions_num = */ 0,
86 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
87 * ALWAYS lock `cache_lock' first! */
88 static cdtime_t cache_timeout = 0;
89 static cdtime_t cache_flush_timeout = 0;
90 static cdtime_t random_timeout = 0;
91 static cdtime_t cache_flush_last;
92 static c_avl_tree_t *cache = NULL;
93 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
95 static rrd_queue_t *queue_head = NULL;
96 static rrd_queue_t *queue_tail = NULL;
97 static rrd_queue_t *flushq_head = NULL;
98 static rrd_queue_t *flushq_tail = NULL;
99 static pthread_t queue_thread;
100 static int queue_thread_running = 1;
101 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
104 #if !HAVE_THREADSAFE_LIBRRD
105 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
108 static int do_shutdown = 0;
110 #if HAVE_THREADSAFE_LIBRRD
111 static int srrd_update(char *filename, char *template, int argc,
115 optind = 0; /* bug in librrd? */
118 status = rrd_update_r(filename, template, argc, (void *)argv);
121 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
126 } /* int srrd_update */
127 /* #endif HAVE_THREADSAFE_LIBRRD */
129 #else /* !HAVE_THREADSAFE_LIBRRD */
130 static int srrd_update(char *filename, char *template, int argc,
137 assert(template == NULL);
140 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
141 if (new_argv == NULL) {
142 ERROR("rrdtool plugin: malloc failed.");
146 new_argv[0] = "update";
147 new_argv[1] = filename;
149 memcpy(new_argv + 2, argv, argc * sizeof(char *));
150 new_argv[new_argc] = NULL;
152 pthread_mutex_lock(&librrd_lock);
153 optind = 0; /* bug in librrd? */
156 status = rrd_update(new_argc, new_argv);
157 pthread_mutex_unlock(&librrd_lock);
160 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
167 } /* int srrd_update */
168 #endif /* !HAVE_THREADSAFE_LIBRRD */
170 static int value_list_to_string_multiple(char *buffer, int buffer_len,
171 const data_set_t *ds,
172 const value_list_t *vl) {
177 memset(buffer, '\0', buffer_len);
179 tt = CDTIME_T_TO_TIME_T(vl->time);
180 status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
181 if ((status < 1) || (status >= buffer_len))
185 for (size_t i = 0; i < ds->ds_num; i++) {
186 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
187 (ds->ds[i].type != DS_TYPE_GAUGE) &&
188 (ds->ds[i].type != DS_TYPE_DERIVE) &&
189 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
192 if (ds->ds[i].type == DS_TYPE_COUNTER)
193 status = snprintf(buffer + offset, buffer_len - offset, ":%llu",
194 vl->values[i].counter);
195 else if (ds->ds[i].type == DS_TYPE_GAUGE)
196 status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
197 vl->values[i].gauge);
198 else if (ds->ds[i].type == DS_TYPE_DERIVE)
199 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
200 vl->values[i].derive);
201 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
202 status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
203 vl->values[i].absolute);
205 if ((status < 1) || (status >= (buffer_len - offset)))
209 } /* for ds->ds_num */
212 } /* int value_list_to_string_multiple */
214 static int value_list_to_string(char *buffer, int buffer_len,
215 const data_set_t *ds, const value_list_t *vl) {
220 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
222 tt = CDTIME_T_TO_TIME_T(vl->time);
223 switch (ds->ds[0].type) {
225 status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
226 vl->values[0].derive);
229 status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
230 vl->values[0].gauge);
232 case DS_TYPE_COUNTER:
233 status = snprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
234 vl->values[0].counter);
236 case DS_TYPE_ABSOLUTE:
237 status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
238 vl->values[0].absolute);
244 if ((status < 1) || (status >= buffer_len))
248 } /* int value_list_to_string */
250 static int value_list_to_filename(char *buffer, size_t buffer_size,
251 value_list_t const *vl) {
252 char const suffix[] = ".rrd";
256 if (datadir != NULL) {
257 size_t datadir_len = strlen(datadir) + 1;
259 if (datadir_len >= buffer_size)
262 sstrncpy(buffer, datadir, buffer_size);
263 buffer[datadir_len - 1] = '/';
264 buffer[datadir_len] = 0;
266 buffer += datadir_len;
267 buffer_size -= datadir_len;
270 status = FORMAT_VL(buffer, buffer_size, vl);
274 len = strlen(buffer);
275 assert(len < buffer_size);
279 if (buffer_size <= sizeof(suffix))
282 memcpy(buffer, suffix, sizeof(suffix));
284 } /* int value_list_to_filename */
286 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
287 struct timeval tv_next_update;
288 struct timeval tv_now;
290 gettimeofday(&tv_next_update, /* timezone = */ NULL);
293 rrd_queue_t *queue_entry;
294 rrd_cache_t *cache_entry;
302 pthread_mutex_lock(&queue_lock);
303 /* Wait for values to arrive */
305 struct timespec ts_wait;
307 while ((flushq_head == NULL) && (queue_head == NULL) &&
309 pthread_cond_wait(&queue_cond, &queue_lock);
311 if ((flushq_head == NULL) && (queue_head == NULL))
314 /* Don't delay if there's something to flush */
315 if (flushq_head != NULL)
318 /* Don't delay if we're shutting down */
319 if (do_shutdown != 0)
322 /* Don't delay if no delay was configured. */
323 if (write_rate <= 0.0)
326 gettimeofday(&tv_now, /* timezone = */ NULL);
327 status = timeval_cmp(tv_next_update, tv_now, NULL);
328 /* We're good to go */
332 /* We're supposed to wait a bit with this update, so we'll
333 * wait for the next addition to the queue or to the end of
334 * the wait period - whichever comes first. */
335 ts_wait.tv_sec = tv_next_update.tv_sec;
336 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
338 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
339 if (status == ETIMEDOUT)
343 /* XXX: If you need to lock both, cache_lock and queue_lock, at
344 * the same time, ALWAYS lock `cache_lock' first! */
346 /* We're in the shutdown phase */
347 if ((flushq_head == NULL) && (queue_head == NULL)) {
348 pthread_mutex_unlock(&queue_lock);
352 if (flushq_head != NULL) {
353 /* Dequeue the first flush entry */
354 queue_entry = flushq_head;
355 if (flushq_head == flushq_tail)
356 flushq_head = flushq_tail = NULL;
358 flushq_head = flushq_head->next;
359 } else /* if (queue_head != NULL) */
361 /* Dequeue the first regular entry */
362 queue_entry = queue_head;
363 if (queue_head == queue_tail)
364 queue_head = queue_tail = NULL;
366 queue_head = queue_head->next;
369 /* Unlock the queue again */
370 pthread_mutex_unlock(&queue_lock);
372 /* We now need the cache lock so the entry isn't updated while
373 * we make a copy of its values */
374 pthread_mutex_lock(&cache_lock);
376 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
379 values = cache_entry->values;
380 values_num = cache_entry->values_num;
382 cache_entry->values = NULL;
383 cache_entry->values_num = 0;
384 cache_entry->flags = FLAG_NONE;
387 pthread_mutex_unlock(&cache_lock);
390 sfree(queue_entry->filename);
395 /* Update `tv_next_update' */
396 if (write_rate > 0.0) {
397 gettimeofday(&tv_now, /* timezone = */ NULL);
398 tv_next_update.tv_sec = tv_now.tv_sec;
399 tv_next_update.tv_usec =
400 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
401 while (tv_next_update.tv_usec > 1000000) {
402 tv_next_update.tv_sec++;
403 tv_next_update.tv_usec -= 1000000;
407 /* Write the values to the RRD-file */
408 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
409 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
410 (values_num == 1) ? "" : "s", queue_entry->filename);
412 for (int i = 0; i < values_num; i++) {
416 sfree(queue_entry->filename);
420 pthread_exit((void *)0);
422 } /* void *rrd_queue_thread */
424 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
425 rrd_queue_t **tail) {
426 rrd_queue_t *queue_entry;
428 queue_entry = malloc(sizeof(*queue_entry));
429 if (queue_entry == NULL)
432 queue_entry->filename = strdup(filename);
433 if (queue_entry->filename == NULL) {
438 queue_entry->next = NULL;
440 pthread_mutex_lock(&queue_lock);
445 (*tail)->next = queue_entry;
448 pthread_cond_signal(&queue_cond);
449 pthread_mutex_unlock(&queue_lock);
452 } /* int rrd_queue_enqueue */
454 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
455 rrd_queue_t **tail) {
459 pthread_mutex_lock(&queue_lock);
464 while (this != NULL) {
465 if (strcmp(this->filename, filename) == 0)
473 pthread_mutex_unlock(&queue_lock);
480 prev->next = this->next;
482 if (this->next == NULL)
485 pthread_mutex_unlock(&queue_lock);
487 sfree(this->filename);
491 } /* int rrd_queue_dequeue */
493 /* XXX: You must hold "cache_lock" when calling this function! */
494 static void rrd_cache_flush(cdtime_t timeout) {
502 c_avl_iterator_t *iter;
504 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
505 CDTIME_T_TO_DOUBLE(timeout));
509 /* Build a list of entries to be flushed */
510 iter = c_avl_get_iterator(cache);
511 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
512 if (rc->flags != FLAG_NONE)
514 /* timeout == 0 => flush everything */
515 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
517 else if (rc->values_num > 0) {
520 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
522 rc->flags = FLAG_QUEUED;
523 } else /* ancient and no values -> waste of memory */
525 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
528 ERROR("rrdtool plugin: "
529 "realloc failed: %s",
530 sstrerror(errno, errbuf, sizeof(errbuf)));
531 c_avl_iterator_destroy(iter);
536 keys[keys_num] = key;
539 } /* while (c_avl_iterator_next) */
540 c_avl_iterator_destroy(iter);
542 for (int i = 0; i < keys_num; i++) {
543 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
544 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
548 assert(rc->values == NULL);
549 assert(rc->values_num == 0);
554 } /* for (i = 0..keys_num) */
558 cache_flush_last = now;
559 } /* void rrd_cache_flush */
561 static int rrd_cache_flush_identifier(cdtime_t timeout,
562 const char *identifier) {
568 if (identifier == NULL) {
569 rrd_cache_flush(timeout);
576 snprintf(key, sizeof(key), "%s.rrd", identifier);
578 snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
579 key[sizeof(key) - 1] = 0;
581 status = c_avl_get(cache, key, (void *)&rc);
583 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
584 "c_avl_get (%s) failed. Does that file really exist?",
589 if (rc->flags == FLAG_FLUSHQ) {
591 } else if (rc->flags == FLAG_QUEUED) {
592 rrd_queue_dequeue(key, &queue_head, &queue_tail);
593 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
595 rc->flags = FLAG_FLUSHQ;
596 } else if ((now - rc->first_value) < timeout) {
598 } else if (rc->values_num > 0) {
599 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
601 rc->flags = FLAG_FLUSHQ;
605 } /* int rrd_cache_flush_identifier */
607 static int64_t rrd_get_random_variation(void) {
608 if (random_timeout == 0)
611 return (int64_t)cdrand_range(-random_timeout, random_timeout);
612 } /* int64_t rrd_get_random_variation */
614 static int rrd_cache_insert(const char *filename, const char *value,
615 cdtime_t value_time) {
616 rrd_cache_t *rc = NULL;
620 pthread_mutex_lock(&cache_lock);
622 /* This shouldn't happen, but it did happen at least once, so we'll be
625 pthread_mutex_unlock(&cache_lock);
626 WARNING("rrdtool plugin: cache == NULL.");
630 c_avl_get(cache, filename, (void *)&rc);
633 rc = malloc(sizeof(*rc));
635 pthread_mutex_unlock(&cache_lock);
642 rc->random_variation = rrd_get_random_variation();
643 rc->flags = FLAG_NONE;
647 assert(value_time > 0); /* plugin_dispatch() ensures this. */
648 if (rc->last_value >= value_time) {
649 pthread_mutex_unlock(&cache_lock);
650 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
651 ">= (value_time = %" PRIu64 ")",
652 rc->last_value, value_time);
657 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
658 if (values_new == NULL) {
660 void *cache_key = NULL;
662 sstrerror(errno, errbuf, sizeof(errbuf));
664 c_avl_remove(cache, filename, &cache_key, NULL);
665 pthread_mutex_unlock(&cache_lock);
667 ERROR("rrdtool plugin: realloc failed: %s", errbuf);
674 rc->values = values_new;
676 rc->values[rc->values_num] = strdup(value);
677 if (rc->values[rc->values_num] != NULL)
680 if (rc->values_num == 1)
681 rc->first_value = value_time;
682 rc->last_value = value_time;
684 /* Insert if this is the first value */
686 void *cache_key = strdup(filename);
688 if (cache_key == NULL) {
690 sstrerror(errno, errbuf, sizeof(errbuf));
692 pthread_mutex_unlock(&cache_lock);
694 ERROR("rrdtool plugin: strdup failed: %s", errbuf);
696 sfree(rc->values[0]);
702 c_avl_insert(cache, cache_key, rc);
705 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
706 "values_num = %i; age = %.3f;",
707 filename, rc->values_num,
708 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
710 if ((rc->last_value - rc->first_value) >=
711 (cache_timeout + rc->random_variation)) {
712 /* XXX: If you need to lock both, cache_lock and queue_lock, at
713 * the same time, ALWAYS lock `cache_lock' first! */
714 if (rc->flags == FLAG_NONE) {
717 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
719 rc->flags = FLAG_QUEUED;
721 rc->random_variation = rrd_get_random_variation();
723 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
727 if ((cache_timeout > 0) &&
728 ((cdtime() - cache_flush_last) > cache_flush_timeout))
729 rrd_cache_flush(cache_timeout + random_timeout);
731 pthread_mutex_unlock(&cache_lock);
734 } /* int rrd_cache_insert */
736 static int rrd_cache_destroy(void) /* {{{ */
743 pthread_mutex_lock(&cache_lock);
746 pthread_mutex_unlock(&cache_lock);
750 while (c_avl_pick(cache, &key, &value) == 0) {
759 if (rc->values_num > 0)
762 for (int i = 0; i < rc->values_num; i++)
763 sfree(rc->values[i]);
768 c_avl_destroy(cache);
772 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
773 non_empty, (non_empty == 1) ? "entry" : "entries");
775 DEBUG("rrdtool plugin: No values have been lost "
776 "when destroying the cache.");
779 pthread_mutex_unlock(&cache_lock);
781 } /* }}} int rrd_cache_destroy */
783 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
784 int a = *((int *)a_ptr);
785 int b = *((int *)b_ptr);
793 } /* int rrd_compare_numeric */
795 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
796 user_data_t __attribute__((unused)) * user_data) {
805 if (0 != strcmp(ds->type, vl->type)) {
806 ERROR("rrdtool plugin: DS type does not match value list type");
810 if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
813 if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
816 if (stat(filename, &statbuf) == -1) {
817 if (errno == ENOENT) {
818 status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
821 else if (rrdcreate_config.async)
825 ERROR("stat(%s) failed: %s", filename,
826 sstrerror(errno, errbuf, sizeof(errbuf)));
829 } else if (!S_ISREG(statbuf.st_mode)) {
830 ERROR("stat(%s): Not a regular file!", filename);
834 status = rrd_cache_insert(filename, values, vl->time);
837 } /* int rrd_write */
839 static int rrd_flush(cdtime_t timeout, const char *identifier,
840 __attribute__((unused)) user_data_t *user_data) {
841 pthread_mutex_lock(&cache_lock);
844 pthread_mutex_unlock(&cache_lock);
848 rrd_cache_flush_identifier(timeout, identifier);
850 pthread_mutex_unlock(&cache_lock);
852 } /* int rrd_flush */
854 static int rrd_config(const char *key, const char *value) {
855 if (strcasecmp("CacheTimeout", key) == 0) {
856 double tmp = atof(value);
858 fprintf(stderr, "rrdtool: `CacheTimeout' must "
859 "be greater than 0.\n");
860 ERROR("rrdtool: `CacheTimeout' must "
861 "be greater than 0.\n");
864 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
865 } else if (strcasecmp("CacheFlush", key) == 0) {
866 double tmp = atof(value);
868 fprintf(stderr, "rrdtool: `CacheFlush' must "
869 "be greater than 0.\n");
870 ERROR("rrdtool: `CacheFlush' must "
871 "be greater than 0.\n");
874 cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
875 } else if (strcasecmp("DataDir", key) == 0) {
881 ERROR("rrdtool plugin: strdup failed.");
886 while ((len > 0) && (tmp[len - 1] == '/')) {
892 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
897 if (datadir != NULL) {
902 } else if (strcasecmp("StepSize", key) == 0) {
903 unsigned long temp = strtoul(value, NULL, 0);
905 rrdcreate_config.stepsize = temp;
906 } else if (strcasecmp("HeartBeat", key) == 0) {
907 int temp = atoi(value);
909 rrdcreate_config.heartbeat = temp;
910 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
912 rrdcreate_config.async = 1;
914 rrdcreate_config.async = 0;
915 } else if (strcasecmp("RRARows", key) == 0) {
916 int tmp = atoi(value);
918 fprintf(stderr, "rrdtool: `RRARows' must "
919 "be greater than 0.\n");
920 ERROR("rrdtool: `RRARows' must "
921 "be greater than 0.\n");
924 rrdcreate_config.rrarows = tmp;
925 } else if (strcasecmp("RRATimespan", key) == 0) {
926 char *saveptr = NULL;
932 value_copy = strdup(value);
933 if (value_copy == NULL)
937 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
940 tmp_alloc = realloc(rrdcreate_config.timespans,
941 sizeof(int) * (rrdcreate_config.timespans_num + 1));
942 if (tmp_alloc == NULL) {
943 fprintf(stderr, "rrdtool: realloc failed.\n");
944 ERROR("rrdtool: realloc failed.\n");
948 rrdcreate_config.timespans = tmp_alloc;
949 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
950 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
951 rrdcreate_config.timespans_num++;
952 } /* while (strtok_r) */
954 qsort(/* base = */ rrdcreate_config.timespans,
955 /* nmemb = */ rrdcreate_config.timespans_num,
956 /* size = */ sizeof(rrdcreate_config.timespans[0]),
957 /* compar = */ rrd_compare_numeric);
960 } else if (strcasecmp("XFF", key) == 0) {
961 double tmp = atof(value);
962 if ((tmp < 0.0) || (tmp >= 1.0)) {
963 fprintf(stderr, "rrdtool: `XFF' must "
964 "be in the range 0 to 1 (exclusive).");
965 ERROR("rrdtool: `XFF' must "
966 "be in the range 0 to 1 (exclusive).");
969 rrdcreate_config.xff = tmp;
970 } else if (strcasecmp("WritesPerSecond", key) == 0) {
971 double wps = atof(value);
974 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
975 "greater than or equal to zero.");
977 } else if (wps == 0.0) {
980 write_rate = 1.0 / wps;
982 } else if (strcasecmp("RandomTimeout", key) == 0) {
987 fprintf(stderr, "rrdtool: `RandomTimeout' must "
988 "be greater than or equal to zero.\n");
989 ERROR("rrdtool: `RandomTimeout' must "
990 "be greater then or equal to zero.");
992 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
998 } /* int rrd_config */
1000 static int rrd_shutdown(void) {
1001 pthread_mutex_lock(&cache_lock);
1003 pthread_mutex_unlock(&cache_lock);
1005 pthread_mutex_lock(&queue_lock);
1007 pthread_cond_signal(&queue_cond);
1008 pthread_mutex_unlock(&queue_lock);
1010 if ((queue_thread_running != 0) &&
1011 ((queue_head != NULL) || (flushq_head != NULL))) {
1012 INFO("rrdtool plugin: Shutting down the queue thread. "
1013 "This may take a while.");
1014 } else if (queue_thread_running != 0) {
1015 INFO("rrdtool plugin: Shutting down the queue thread.");
1018 /* Wait for all the values to be written to disk before returning. */
1019 if (queue_thread_running != 0) {
1020 pthread_join(queue_thread, NULL);
1021 memset(&queue_thread, 0, sizeof(queue_thread));
1022 queue_thread_running = 0;
1023 DEBUG("rrdtool plugin: queue_thread exited.");
1026 rrd_cache_destroy();
1029 } /* int rrd_shutdown */
1031 static int rrd_init(void) {
1032 static int init_once = 0;
1039 if (rrdcreate_config.heartbeat <= 0)
1040 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1042 /* Set the cache up */
1043 pthread_mutex_lock(&cache_lock);
1045 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1046 if (cache == NULL) {
1047 pthread_mutex_unlock(&cache_lock);
1048 ERROR("rrdtool plugin: c_avl_create failed.");
1052 cache_flush_last = cdtime();
1053 if (cache_timeout == 0) {
1055 cache_flush_timeout = 0;
1056 } else if (cache_flush_timeout < cache_timeout) {
1057 INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout %.3f\". "
1058 "Ajusting \"CacheFlush\" to %.3f seconds.",
1059 CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1060 CDTIME_T_TO_DOUBLE(cache_timeout),
1061 CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1062 cache_flush_timeout = 10 * cache_timeout;
1065 /* Assure that "cache_timeout + random_variation" is never negative. */
1066 if (random_timeout > cache_timeout) {
1067 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1068 CDTIME_T_TO_DOUBLE(cache_timeout));
1069 random_timeout = cache_timeout;
1072 pthread_mutex_unlock(&cache_lock);
1075 plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1076 /* args = */ NULL, "rrdtool queue");
1078 ERROR("rrdtool plugin: Cannot create queue-thread.");
1081 queue_thread_running = 1;
1083 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1084 " heartbeat = %i; rrarows = %i; xff = %lf;",
1085 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1086 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1087 rrdcreate_config.xff);
1090 } /* int rrd_init */
1092 void module_register(void) {
1093 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1094 plugin_register_init("rrdtool", rrd_init);
1095 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1096 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1097 plugin_register_shutdown("rrdtool", rrd_shutdown);