2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
47 typedef struct rrd_cache_s rrd_cache_t;
49 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
50 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
54 struct rrd_queue_s *next;
56 typedef struct rrd_queue_s rrd_queue_t;
61 static const char *config_keys[] = {
62 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
63 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
64 "XFF", "WritesPerSecond", "RandomTimeout"};
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
67 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
68 * is zero a default, depending on the `interval' member of the value list is
70 static char *datadir = NULL;
71 static double write_rate = 0.0;
72 static rrdcreate_config_t rrdcreate_config = {
78 /* timespans = */ NULL,
79 /* timespans_num = */ 0,
81 /* consolidation_functions = */ NULL,
82 /* consolidation_functions_num = */ 0,
86 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
87 * ALWAYS lock `cache_lock' first! */
88 static cdtime_t cache_timeout = 0;
89 static cdtime_t cache_flush_timeout = 0;
90 static cdtime_t random_timeout = TIME_T_TO_CDTIME_T_STATIC(1);
91 static cdtime_t cache_flush_last;
92 static c_avl_tree_t *cache = NULL;
93 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
95 static rrd_queue_t *queue_head = NULL;
96 static rrd_queue_t *queue_tail = NULL;
97 static rrd_queue_t *flushq_head = NULL;
98 static rrd_queue_t *flushq_tail = NULL;
99 static pthread_t queue_thread;
100 static int queue_thread_running = 1;
101 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
104 #if !HAVE_THREADSAFE_LIBRRD
105 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
108 static int do_shutdown = 0;
110 #if HAVE_THREADSAFE_LIBRRD
111 static int srrd_update(char *filename, char *template, int argc,
115 optind = 0; /* bug in librrd? */
118 status = rrd_update_r(filename, template, argc, (void *)argv);
121 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
126 } /* int srrd_update */
127 /* #endif HAVE_THREADSAFE_LIBRRD */
129 #else /* !HAVE_THREADSAFE_LIBRRD */
130 static int srrd_update(char *filename, char *template, int argc,
137 assert(template == NULL);
140 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
141 if (new_argv == NULL) {
142 ERROR("rrdtool plugin: malloc failed.");
146 new_argv[0] = "update";
147 new_argv[1] = filename;
149 memcpy(new_argv + 2, argv, argc * sizeof(char *));
150 new_argv[new_argc] = NULL;
152 pthread_mutex_lock(&librrd_lock);
153 optind = 0; /* bug in librrd? */
156 status = rrd_update(new_argc, new_argv);
157 pthread_mutex_unlock(&librrd_lock);
160 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
167 } /* int srrd_update */
168 #endif /* !HAVE_THREADSAFE_LIBRRD */
170 static int value_list_to_string_multiple(char *buffer, int buffer_len,
171 const data_set_t *ds,
172 const value_list_t *vl) {
177 memset(buffer, '\0', buffer_len);
179 tt = CDTIME_T_TO_TIME_T(vl->time);
180 status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
181 if ((status < 1) || (status >= buffer_len))
185 for (size_t i = 0; i < ds->ds_num; i++) {
186 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
187 (ds->ds[i].type != DS_TYPE_GAUGE) &&
188 (ds->ds[i].type != DS_TYPE_DERIVE) &&
189 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
192 if (ds->ds[i].type == DS_TYPE_COUNTER)
193 status = ssnprintf(buffer + offset, buffer_len - offset, ":%llu",
194 vl->values[i].counter);
195 else if (ds->ds[i].type == DS_TYPE_GAUGE)
196 status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
197 vl->values[i].gauge);
198 else if (ds->ds[i].type == DS_TYPE_DERIVE)
199 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
200 vl->values[i].derive);
201 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
202 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
203 vl->values[i].absolute);
205 if ((status < 1) || (status >= (buffer_len - offset)))
209 } /* for ds->ds_num */
212 } /* int value_list_to_string_multiple */
214 static int value_list_to_string(char *buffer, int buffer_len,
215 const data_set_t *ds, const value_list_t *vl) {
220 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
222 tt = CDTIME_T_TO_TIME_T(vl->time);
223 switch (ds->ds[0].type) {
225 status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
226 vl->values[0].derive);
229 status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
230 vl->values[0].gauge);
232 case DS_TYPE_COUNTER:
233 status = ssnprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
234 vl->values[0].counter);
236 case DS_TYPE_ABSOLUTE:
237 status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
238 vl->values[0].absolute);
244 if ((status < 1) || (status >= buffer_len))
248 } /* int value_list_to_string */
250 static int value_list_to_filename(char *buffer, size_t buffer_size,
251 value_list_t const *vl) {
252 char const suffix[] = ".rrd";
256 if (datadir != NULL) {
257 size_t datadir_len = strlen(datadir) + 1;
259 if (datadir_len >= buffer_size)
262 sstrncpy(buffer, datadir, buffer_size);
263 buffer[datadir_len - 1] = '/';
264 buffer[datadir_len] = 0;
266 buffer += datadir_len;
267 buffer_size -= datadir_len;
270 status = FORMAT_VL(buffer, buffer_size, vl);
274 len = strlen(buffer);
275 assert(len < buffer_size);
279 if (buffer_size <= sizeof(suffix))
282 memcpy(buffer, suffix, sizeof(suffix));
284 } /* int value_list_to_filename */
286 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
287 struct timeval tv_next_update;
288 struct timeval tv_now;
290 gettimeofday(&tv_next_update, /* timezone = */ NULL);
293 rrd_queue_t *queue_entry;
294 rrd_cache_t *cache_entry;
302 pthread_mutex_lock(&queue_lock);
303 /* Wait for values to arrive */
305 struct timespec ts_wait;
307 while ((flushq_head == NULL) && (queue_head == NULL) &&
309 pthread_cond_wait(&queue_cond, &queue_lock);
311 if ((flushq_head == NULL) && (queue_head == NULL))
314 /* Don't delay if there's something to flush */
315 if (flushq_head != NULL)
318 /* Don't delay if we're shutting down */
319 if (do_shutdown != 0)
322 /* Don't delay if no delay was configured. */
323 if (write_rate <= 0.0)
326 gettimeofday(&tv_now, /* timezone = */ NULL);
327 status = timeval_cmp(tv_next_update, tv_now, NULL);
328 /* We're good to go */
332 /* We're supposed to wait a bit with this update, so we'll
333 * wait for the next addition to the queue or to the end of
334 * the wait period - whichever comes first. */
335 ts_wait.tv_sec = tv_next_update.tv_sec;
336 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
338 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
339 if (status == ETIMEDOUT)
343 /* XXX: If you need to lock both, cache_lock and queue_lock, at
344 * the same time, ALWAYS lock `cache_lock' first! */
346 /* We're in the shutdown phase */
347 if ((flushq_head == NULL) && (queue_head == NULL)) {
348 pthread_mutex_unlock(&queue_lock);
352 if (flushq_head != NULL) {
353 /* Dequeue the first flush entry */
354 queue_entry = flushq_head;
355 if (flushq_head == flushq_tail)
356 flushq_head = flushq_tail = NULL;
358 flushq_head = flushq_head->next;
359 } else /* if (queue_head != NULL) */
361 /* Dequeue the first regular entry */
362 queue_entry = queue_head;
363 if (queue_head == queue_tail)
364 queue_head = queue_tail = NULL;
366 queue_head = queue_head->next;
369 /* Unlock the queue again */
370 pthread_mutex_unlock(&queue_lock);
372 /* We now need the cache lock so the entry isn't updated while
373 * we make a copy of its values */
374 pthread_mutex_lock(&cache_lock);
376 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
379 values = cache_entry->values;
380 values_num = cache_entry->values_num;
382 cache_entry->values = NULL;
383 cache_entry->values_num = 0;
384 cache_entry->flags = FLAG_NONE;
387 pthread_mutex_unlock(&cache_lock);
390 sfree(queue_entry->filename);
395 /* Update `tv_next_update' */
396 if (write_rate > 0.0) {
397 gettimeofday(&tv_now, /* timezone = */ NULL);
398 tv_next_update.tv_sec = tv_now.tv_sec;
399 tv_next_update.tv_usec =
400 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
401 while (tv_next_update.tv_usec > 1000000) {
402 tv_next_update.tv_sec++;
403 tv_next_update.tv_usec -= 1000000;
407 /* Write the values to the RRD-file */
408 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
409 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
410 (values_num == 1) ? "" : "s", queue_entry->filename);
412 for (int i = 0; i < values_num; i++) {
416 sfree(queue_entry->filename);
420 pthread_exit((void *)0);
422 } /* void *rrd_queue_thread */
424 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
425 rrd_queue_t **tail) {
426 rrd_queue_t *queue_entry;
428 queue_entry = malloc(sizeof(*queue_entry));
429 if (queue_entry == NULL)
432 queue_entry->filename = strdup(filename);
433 if (queue_entry->filename == NULL) {
438 queue_entry->next = NULL;
440 pthread_mutex_lock(&queue_lock);
445 (*tail)->next = queue_entry;
448 pthread_cond_signal(&queue_cond);
449 pthread_mutex_unlock(&queue_lock);
452 } /* int rrd_queue_enqueue */
454 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
455 rrd_queue_t **tail) {
459 pthread_mutex_lock(&queue_lock);
464 while (this != NULL) {
465 if (strcmp(this->filename, filename) == 0)
473 pthread_mutex_unlock(&queue_lock);
480 prev->next = this->next;
482 if (this->next == NULL)
485 pthread_mutex_unlock(&queue_lock);
487 sfree(this->filename);
491 } /* int rrd_queue_dequeue */
493 /* XXX: You must hold "cache_lock" when calling this function! */
494 static void rrd_cache_flush(cdtime_t timeout) {
502 c_avl_iterator_t *iter;
504 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
505 CDTIME_T_TO_DOUBLE(timeout));
508 timeout = TIME_T_TO_CDTIME_T(timeout);
510 /* Build a list of entries to be flushed */
511 iter = c_avl_get_iterator(cache);
512 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
513 if (rc->flags != FLAG_NONE)
515 /* timeout == 0 => flush everything */
516 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
518 else if (rc->values_num > 0) {
521 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
523 rc->flags = FLAG_QUEUED;
524 } else /* ancient and no values -> waste of memory */
526 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
529 ERROR("rrdtool plugin: "
530 "realloc failed: %s",
531 sstrerror(errno, errbuf, sizeof(errbuf)));
532 c_avl_iterator_destroy(iter);
537 keys[keys_num] = key;
540 } /* while (c_avl_iterator_next) */
541 c_avl_iterator_destroy(iter);
543 for (int i = 0; i < keys_num; i++) {
544 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
545 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
549 assert(rc->values == NULL);
550 assert(rc->values_num == 0);
555 } /* for (i = 0..keys_num) */
559 cache_flush_last = now;
560 } /* void rrd_cache_flush */
562 static int rrd_cache_flush_identifier(cdtime_t timeout,
563 const char *identifier) {
569 if (identifier == NULL) {
570 rrd_cache_flush(timeout);
577 snprintf(key, sizeof(key), "%s.rrd", identifier);
579 snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
580 key[sizeof(key) - 1] = 0;
582 status = c_avl_get(cache, key, (void *)&rc);
584 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
585 "c_avl_get (%s) failed. Does that file really exist?",
590 if (rc->flags == FLAG_FLUSHQ) {
592 } else if (rc->flags == FLAG_QUEUED) {
593 rrd_queue_dequeue(key, &queue_head, &queue_tail);
594 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
596 rc->flags = FLAG_FLUSHQ;
597 } else if ((now - rc->first_value) < timeout) {
599 } else if (rc->values_num > 0) {
600 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
602 rc->flags = FLAG_FLUSHQ;
606 } /* int rrd_cache_flush_identifier */
608 static int64_t rrd_get_random_variation(void) {
612 if (random_timeout == 0)
615 /* Assure that "cache_timeout + random_variation" is never negative. */
616 if (random_timeout > cache_timeout) {
617 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
618 CDTIME_T_TO_DOUBLE(cache_timeout));
619 random_timeout = cache_timeout;
622 max = (long)(random_timeout / 2);
623 min = max - ((long)random_timeout);
625 return (int64_t)cdrand_range(min, max);
626 } /* int64_t rrd_get_random_variation */
628 static int rrd_cache_insert(const char *filename, const char *value,
629 cdtime_t value_time) {
630 rrd_cache_t *rc = NULL;
634 pthread_mutex_lock(&cache_lock);
636 /* This shouldn't happen, but it did happen at least once, so we'll be
639 pthread_mutex_unlock(&cache_lock);
640 WARNING("rrdtool plugin: cache == NULL.");
644 c_avl_get(cache, filename, (void *)&rc);
647 rc = malloc(sizeof(*rc));
649 pthread_mutex_unlock(&cache_lock);
656 rc->random_variation = rrd_get_random_variation();
657 rc->flags = FLAG_NONE;
661 assert(value_time > 0); /* plugin_dispatch() ensures this. */
662 if (rc->last_value >= value_time) {
663 pthread_mutex_unlock(&cache_lock);
664 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
665 ">= (value_time = %" PRIu64 ")",
666 rc->last_value, value_time);
671 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
672 if (values_new == NULL) {
674 void *cache_key = NULL;
676 sstrerror(errno, errbuf, sizeof(errbuf));
678 c_avl_remove(cache, filename, &cache_key, NULL);
679 pthread_mutex_unlock(&cache_lock);
681 ERROR("rrdtool plugin: realloc failed: %s", errbuf);
688 rc->values = values_new;
690 rc->values[rc->values_num] = strdup(value);
691 if (rc->values[rc->values_num] != NULL)
694 if (rc->values_num == 1)
695 rc->first_value = value_time;
696 rc->last_value = value_time;
698 /* Insert if this is the first value */
700 void *cache_key = strdup(filename);
702 if (cache_key == NULL) {
704 sstrerror(errno, errbuf, sizeof(errbuf));
706 pthread_mutex_unlock(&cache_lock);
708 ERROR("rrdtool plugin: strdup failed: %s", errbuf);
710 sfree(rc->values[0]);
716 c_avl_insert(cache, cache_key, rc);
719 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
720 "values_num = %i; age = %.3f;",
721 filename, rc->values_num,
722 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
724 if ((rc->last_value - rc->first_value) >=
725 (cache_timeout + rc->random_variation)) {
726 /* XXX: If you need to lock both, cache_lock and queue_lock, at
727 * the same time, ALWAYS lock `cache_lock' first! */
728 if (rc->flags == FLAG_NONE) {
731 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
733 rc->flags = FLAG_QUEUED;
735 rc->random_variation = rrd_get_random_variation();
737 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
741 if ((cache_timeout > 0) &&
742 ((cdtime() - cache_flush_last) > cache_flush_timeout))
743 rrd_cache_flush(cache_flush_timeout);
745 pthread_mutex_unlock(&cache_lock);
748 } /* int rrd_cache_insert */
750 static int rrd_cache_destroy(void) /* {{{ */
757 pthread_mutex_lock(&cache_lock);
760 pthread_mutex_unlock(&cache_lock);
764 while (c_avl_pick(cache, &key, &value) == 0) {
773 if (rc->values_num > 0)
776 for (int i = 0; i < rc->values_num; i++)
777 sfree(rc->values[i]);
782 c_avl_destroy(cache);
786 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
787 non_empty, (non_empty == 1) ? "entry" : "entries");
789 DEBUG("rrdtool plugin: No values have been lost "
790 "when destroying the cache.");
793 pthread_mutex_unlock(&cache_lock);
795 } /* }}} int rrd_cache_destroy */
797 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
798 int a = *((int *)a_ptr);
799 int b = *((int *)b_ptr);
807 } /* int rrd_compare_numeric */
809 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
810 user_data_t __attribute__((unused)) * user_data) {
819 if (0 != strcmp(ds->type, vl->type)) {
820 ERROR("rrdtool plugin: DS type does not match value list type");
824 if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
827 if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
830 if (stat(filename, &statbuf) == -1) {
831 if (errno == ENOENT) {
832 status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
835 else if (rrdcreate_config.async)
839 ERROR("stat(%s) failed: %s", filename,
840 sstrerror(errno, errbuf, sizeof(errbuf)));
843 } else if (!S_ISREG(statbuf.st_mode)) {
844 ERROR("stat(%s): Not a regular file!", filename);
848 status = rrd_cache_insert(filename, values, vl->time);
851 } /* int rrd_write */
853 static int rrd_flush(cdtime_t timeout, const char *identifier,
854 __attribute__((unused)) user_data_t *user_data) {
855 pthread_mutex_lock(&cache_lock);
858 pthread_mutex_unlock(&cache_lock);
862 rrd_cache_flush_identifier(timeout, identifier);
864 pthread_mutex_unlock(&cache_lock);
866 } /* int rrd_flush */
868 static int rrd_config(const char *key, const char *value) {
869 if (strcasecmp("CacheTimeout", key) == 0) {
870 double tmp = atof(value);
872 fprintf(stderr, "rrdtool: `CacheTimeout' must "
873 "be greater than 0.\n");
874 ERROR("rrdtool: `CacheTimeout' must "
875 "be greater than 0.\n");
878 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
879 } else if (strcasecmp("CacheFlush", key) == 0) {
880 int tmp = atoi(value);
882 fprintf(stderr, "rrdtool: `CacheFlush' must "
883 "be greater than 0.\n");
884 ERROR("rrdtool: `CacheFlush' must "
885 "be greater than 0.\n");
888 cache_flush_timeout = tmp;
889 } else if (strcasecmp("DataDir", key) == 0) {
895 ERROR("rrdtool plugin: strdup failed.");
900 while ((len > 0) && (tmp[len - 1] == '/')) {
906 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
911 if (datadir != NULL) {
916 } else if (strcasecmp("StepSize", key) == 0) {
917 unsigned long temp = strtoul(value, NULL, 0);
919 rrdcreate_config.stepsize = temp;
920 } else if (strcasecmp("HeartBeat", key) == 0) {
921 int temp = atoi(value);
923 rrdcreate_config.heartbeat = temp;
924 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
926 rrdcreate_config.async = 1;
928 rrdcreate_config.async = 0;
929 } else if (strcasecmp("RRARows", key) == 0) {
930 int tmp = atoi(value);
932 fprintf(stderr, "rrdtool: `RRARows' must "
933 "be greater than 0.\n");
934 ERROR("rrdtool: `RRARows' must "
935 "be greater than 0.\n");
938 rrdcreate_config.rrarows = tmp;
939 } else if (strcasecmp("RRATimespan", key) == 0) {
940 char *saveptr = NULL;
946 value_copy = strdup(value);
947 if (value_copy == NULL)
951 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
954 tmp_alloc = realloc(rrdcreate_config.timespans,
955 sizeof(int) * (rrdcreate_config.timespans_num + 1));
956 if (tmp_alloc == NULL) {
957 fprintf(stderr, "rrdtool: realloc failed.\n");
958 ERROR("rrdtool: realloc failed.\n");
962 rrdcreate_config.timespans = tmp_alloc;
963 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
964 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
965 rrdcreate_config.timespans_num++;
966 } /* while (strtok_r) */
968 qsort(/* base = */ rrdcreate_config.timespans,
969 /* nmemb = */ rrdcreate_config.timespans_num,
970 /* size = */ sizeof(rrdcreate_config.timespans[0]),
971 /* compar = */ rrd_compare_numeric);
974 } else if (strcasecmp("XFF", key) == 0) {
975 double tmp = atof(value);
976 if ((tmp < 0.0) || (tmp >= 1.0)) {
977 fprintf(stderr, "rrdtool: `XFF' must "
978 "be in the range 0 to 1 (exclusive).");
979 ERROR("rrdtool: `XFF' must "
980 "be in the range 0 to 1 (exclusive).");
983 rrdcreate_config.xff = tmp;
984 } else if (strcasecmp("WritesPerSecond", key) == 0) {
985 double wps = atof(value);
988 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
989 "greater than or equal to zero.");
991 } else if (wps == 0.0) {
994 write_rate = 1.0 / wps;
996 } else if (strcasecmp("RandomTimeout", key) == 0) {
1001 fprintf(stderr, "rrdtool: `RandomTimeout' must "
1002 "be greater than or equal to zero.\n");
1003 ERROR("rrdtool: `RandomTimeout' must "
1004 "be greater then or equal to zero.");
1006 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
1012 } /* int rrd_config */
1014 static int rrd_shutdown(void) {
1015 pthread_mutex_lock(&cache_lock);
1017 pthread_mutex_unlock(&cache_lock);
1019 pthread_mutex_lock(&queue_lock);
1021 pthread_cond_signal(&queue_cond);
1022 pthread_mutex_unlock(&queue_lock);
1024 if ((queue_thread_running != 0) &&
1025 ((queue_head != NULL) || (flushq_head != NULL))) {
1026 INFO("rrdtool plugin: Shutting down the queue thread. "
1027 "This may take a while.");
1028 } else if (queue_thread_running != 0) {
1029 INFO("rrdtool plugin: Shutting down the queue thread.");
1032 /* Wait for all the values to be written to disk before returning. */
1033 if (queue_thread_running != 0) {
1034 pthread_join(queue_thread, NULL);
1035 memset(&queue_thread, 0, sizeof(queue_thread));
1036 queue_thread_running = 0;
1037 DEBUG("rrdtool plugin: queue_thread exited.");
1040 rrd_cache_destroy();
1043 } /* int rrd_shutdown */
1045 static int rrd_init(void) {
1046 static int init_once = 0;
1053 if (rrdcreate_config.heartbeat <= 0)
1054 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1056 /* Set the cache up */
1057 pthread_mutex_lock(&cache_lock);
1059 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1060 if (cache == NULL) {
1061 pthread_mutex_unlock(&cache_lock);
1062 ERROR("rrdtool plugin: c_avl_create failed.");
1066 cache_flush_last = cdtime();
1067 if (cache_timeout == 0) {
1068 cache_flush_timeout = 0;
1069 } else if (cache_flush_timeout < cache_timeout)
1070 cache_flush_timeout = 10 * cache_timeout;
1072 pthread_mutex_unlock(&cache_lock);
1075 plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1076 /* args = */ NULL, "rrdtool queue");
1078 ERROR("rrdtool plugin: Cannot create queue-thread.");
1081 queue_thread_running = 1;
1083 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1084 " heartbeat = %i; rrarows = %i; xff = %lf;",
1085 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1086 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1087 rrdcreate_config.xff);
1090 } /* int rrd_init */
1092 void module_register(void) {
1093 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1094 plugin_register_init("rrdtool", rrd_init);
1095 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1096 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1097 plugin_register_shutdown("rrdtool", rrd_shutdown);