2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
111 unsigned long *skip_update,
112 int *schedule_smooth);
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
124 static int get_time_from_reading(
128 time_t *current_time,
129 unsigned long *current_time_usec,
132 static int update_pdp_prep(
135 rrd_value_t *pdp_new,
138 static int calculate_elapsed_steps(
140 unsigned long current_time,
141 unsigned long current_time_usec,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
163 unsigned long ds_idx,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
200 static void update_cdp(
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
212 static void initialize_cdp_val(
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
245 static int update_aberrant_cdps(
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
270 static int smooth_all_rras(
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
276 static int write_changes_to_disk(
278 rrd_file_t *rrd_file,
283 * normalize time as returned by gettimeofday. usec part must
286 static inline void normalize_time(
289 if (t->tv_usec < 0) {
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
310 *current_time_usec = tmp_time.tv_usec;
312 *current_time_usec = 0;
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
323 rrd_info_t *result = NULL;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
333 opterr = 0; /* initialize getopt */
336 int option_index = 0;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
388 int option_index = 0;
392 char *opt_daemon = NULL;
395 opterr = 0; /* initialize getopt */
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
405 tmplt = strdup(optarg);
409 if (opt_daemon != NULL)
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
414 rrd_set_error("strdup failed.");
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
438 rrd_set_error("The caching daemon cannot be used together with "
443 if (! rrdc_is_connected(opt_daemon))
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
448 else /* we are connected */
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (void *) (argv + optind + 1)); /* values */
454 rrd_set_error("Failed sending the values to rrdcached: %s",
464 if (opt_daemon != NULL)
473 const char *filename,
478 return _rrd_update(filename, tmplt, argc, argv, NULL);
482 const char *filename,
486 rrd_info_t * pcdp_summary)
491 unsigned long rra_begin; /* byte pointer to the rra
492 * area in the rrd file. this
493 * pointer never changes value */
494 rrd_value_t *pdp_new; /* prepare the incoming data to be added
495 * to the existing entry */
496 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
497 * to the cdp values */
499 long *tmpl_idx; /* index representing the settings
500 * transported by the tmplt index */
501 unsigned long tmpl_cnt = 2; /* time and data */
503 time_t current_time = 0;
504 unsigned long current_time_usec = 0; /* microseconds part of current time */
506 int schedule_smooth = 0;
508 /* number of elapsed PDP steps since last update */
509 unsigned long *rra_step_cnt = NULL;
511 int version; /* rrd version */
512 rrd_file_t *rrd_file;
513 char *arg_copy; /* for processing the argv */
514 unsigned long *skip_update; /* RRAs to advance but not write */
516 /* need at least 1 arguments: data. */
518 rrd_set_error("Not enough arguments");
522 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
525 /* We are now at the beginning of the rra's */
526 rra_begin = rrd_file->header_len;
528 version = atoi(rrd.stat_head->version);
530 initialize_time(¤t_time, ¤t_time_usec, version);
532 /* get exclusive lock to whole file.
533 * lock gets removed when we close the file.
535 if (rrd_lock(rrd_file) != 0) {
536 rrd_set_error("could not lock RRD");
540 if (allocate_data_structures(&rrd, &updvals,
541 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542 &rra_step_cnt, &skip_update,
547 /* loop through the arguments. */
548 for (arg_i = 0; arg_i < argc; arg_i++) {
549 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550 rrd_set_error("failed duplication argv entry");
553 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
555 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556 &pcdp_summary, version, skip_update,
557 &schedule_smooth) == -1) {
558 if (rrd_test_error()) { /* Should have error string always here */
561 /* Prepend file name to error message */
562 if ((save_error = strdup(rrd_get_error())) != NULL) {
563 rrd_set_error("%s: %s", filename, save_error);
575 /* if we got here and if there is an error and if the file has not been
576 * written to, then close things up and return. */
577 if (rrd_test_error()) {
578 goto err_free_structures;
581 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582 goto err_free_structures;
586 /* calling the smoothing code here guarantees at most one smoothing
587 * operation per rrd_update call. Unfortunately, it is possible with bulk
588 * updates, or a long-delayed update for smoothing to occur off-schedule.
589 * This really isn't critical except during the burn-in cycles. */
590 if (schedule_smooth) {
591 smooth_all_rras(&rrd, rrd_file, rra_begin);
594 /* rrd_dontneed(rrd_file,&rrd); */
620 * Allocate some important arrays used, and initialize the template.
622 * When it returns, either all of the structures are allocated
623 * or none of them are.
625 * Returns 0 on success, -1 on error.
627 static int allocate_data_structures(
630 rrd_value_t **pdp_temp,
633 unsigned long *tmpl_cnt,
634 unsigned long **rra_step_cnt,
635 unsigned long **skip_update,
636 rrd_value_t **pdp_new)
639 if ((*updvals = (char **) malloc(sizeof(char *)
640 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641 rrd_set_error("allocating updvals pointer array.");
644 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
645 * rrd->stat_head->ds_cnt)) ==
647 rrd_set_error("allocating pdp_temp.");
648 goto err_free_updvals;
650 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
652 rrd->stat_head->rra_cnt)) ==
654 rrd_set_error("allocating skip_update.");
655 goto err_free_pdp_temp;
657 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
658 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
659 rrd_set_error("allocating tmpl_idx.");
660 goto err_free_skip_update;
662 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
665 rra_cnt))) == NULL) {
666 rrd_set_error("allocating rra_step_cnt.");
667 goto err_free_tmpl_idx;
670 /* initialize tmplt redirector */
671 /* default config example (assume DS 1 is a CDEF DS)
672 tmpl_idx[0] -> 0; (time)
673 tmpl_idx[1] -> 1; (DS 0)
674 tmpl_idx[2] -> 3; (DS 2)
675 tmpl_idx[3] -> 4; (DS 3) */
676 (*tmpl_idx)[0] = 0; /* time */
677 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
678 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
679 (*tmpl_idx)[ii++] = i;
684 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
685 goto err_free_rra_step_cnt;
689 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
690 * rrd->stat_head->ds_cnt)) == NULL) {
691 rrd_set_error("allocating pdp_new.");
692 goto err_free_rra_step_cnt;
697 err_free_rra_step_cnt:
701 err_free_skip_update:
711 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
713 * Returns 0 on success.
715 static int parse_template(
718 unsigned long *tmpl_cnt,
721 char *dsname, *tmplt_copy;
722 unsigned int tmpl_len, i;
725 *tmpl_cnt = 1; /* the first entry is the time */
727 /* we should work on a writeable copy here */
728 if ((tmplt_copy = strdup(tmplt)) == NULL) {
729 rrd_set_error("error copying tmplt '%s'", tmplt);
735 tmpl_len = strlen(tmplt_copy);
736 for (i = 0; i <= tmpl_len; i++) {
737 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
738 tmplt_copy[i] = '\0';
739 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
740 rrd_set_error("tmplt contains more DS definitions than RRD");
742 goto out_free_tmpl_copy;
744 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
745 rrd_set_error("unknown DS name '%s'", dsname);
747 goto out_free_tmpl_copy;
749 /* go to the next entry on the tmplt_copy */
751 dsname = &tmplt_copy[i + 1];
761 * Parse an update string, updates the primary data points (PDPs)
762 * and consolidated data points (CDPs), and writes changes to the RRAs.
764 * Returns 0 on success, -1 on error.
766 static int process_arg(
769 rrd_file_t *rrd_file,
770 unsigned long rra_begin,
771 time_t *current_time,
772 unsigned long *current_time_usec,
773 rrd_value_t *pdp_temp,
774 rrd_value_t *pdp_new,
775 unsigned long *rra_step_cnt,
778 unsigned long tmpl_cnt,
779 rrd_info_t ** pcdp_summary,
781 unsigned long *skip_update,
782 int *schedule_smooth)
784 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
786 /* a vector of future Holt-Winters seasonal coefs */
787 unsigned long elapsed_pdp_st;
789 double interval, pre_int, post_int; /* interval between this and
791 unsigned long proc_pdp_cnt;
793 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
794 current_time, current_time_usec, version) == -1) {
798 interval = (double) (*current_time - rrd->live_head->last_up)
799 + (double) ((long) *current_time_usec -
800 (long) rrd->live_head->last_up_usec) / 1e6f;
802 /* process the data sources and update the pdp_prep
803 * area accordingly */
804 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
808 elapsed_pdp_st = calculate_elapsed_steps(rrd,
810 *current_time_usec, interval,
814 /* has a pdp_st moment occurred since the last run ? */
815 if (elapsed_pdp_st == 0) {
816 /* no we have not passed a pdp_st moment. therefore update is simple */
817 simple_update(rrd, interval, pdp_new);
819 /* an pdp_st has occurred. */
820 if (process_all_pdp_st(rrd, interval,
822 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
825 if (update_all_cdp_prep(rrd, rra_step_cnt,
832 skip_update, schedule_smooth) == -1) {
833 goto err_free_coefficients;
835 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
836 elapsed_pdp_st, pdp_temp,
837 &seasonal_coef) == -1) {
838 goto err_free_coefficients;
840 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
841 *current_time, skip_update,
842 pcdp_summary) == -1) {
843 goto err_free_coefficients;
845 } /* endif a pdp_st has occurred */
846 rrd->live_head->last_up = *current_time;
847 rrd->live_head->last_up_usec = *current_time_usec;
850 *rrd->legacy_last_up = rrd->live_head->last_up;
853 free(last_seasonal_coef);
856 err_free_coefficients:
858 free(last_seasonal_coef);
863 * Parse a DS string (time + colon-separated values), storing the
864 * results in current_time, current_time_usec, and updvals.
866 * Returns 0 on success, -1 on error.
873 unsigned long tmpl_cnt,
874 time_t *current_time,
875 unsigned long *current_time_usec,
883 /* initialize all ds input to unknown except the first one
884 which has always got to be set */
885 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
888 /* separate all ds elements; first must be examined separately
889 due to alternate time syntax */
890 if ((p = strchr(input, '@')) != NULL) {
892 } else if ((p = strchr(input, ':')) != NULL) {
895 rrd_set_error("expected timestamp not found in data source from %s",
901 updvals[tmpl_idx[i++]] = p + 1;
906 updvals[tmpl_idx[i++]] = p + 1;
912 rrd_set_error("expected %lu data source readings (got %lu) from %s",
913 tmpl_cnt - 1, i, input);
917 if (get_time_from_reading(rrd, timesyntax, updvals,
918 current_time, current_time_usec,
926 * Parse the time in a DS string, store it in current_time and
927 * current_time_usec and verify that it's later than the last
928 * update for this DS.
930 * Returns 0 on success, -1 on error.
932 static int get_time_from_reading(
936 time_t *current_time,
937 unsigned long *current_time_usec,
941 char *parsetime_error = NULL;
943 rrd_time_value_t ds_tv;
944 struct timeval tmp_time; /* used for time conversion */
946 /* get the time from the reading ... handle N */
947 if (timesyntax == '@') { /* at-style */
948 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
949 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
952 if (ds_tv.type == RELATIVE_TO_END_TIME ||
953 ds_tv.type == RELATIVE_TO_START_TIME) {
954 rrd_set_error("specifying time relative to the 'start' "
955 "or 'end' makes no sense here: %s", updvals[0]);
958 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
959 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
960 } else if (strcmp(updvals[0], "N") == 0) {
961 gettimeofday(&tmp_time, 0);
962 normalize_time(&tmp_time);
963 *current_time = tmp_time.tv_sec;
964 *current_time_usec = tmp_time.tv_usec;
966 old_locale = setlocale(LC_NUMERIC, "C");
967 tmp = strtod(updvals[0], 0);
968 setlocale(LC_NUMERIC, old_locale);
969 *current_time = floor(tmp);
970 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
972 /* dont do any correction for old version RRDs */
974 *current_time_usec = 0;
976 if (*current_time < rrd->live_head->last_up ||
977 (*current_time == rrd->live_head->last_up &&
978 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
979 rrd_set_error("illegal attempt to update using time %ld when "
980 "last update time is %ld (minimum one second step)",
981 *current_time, rrd->live_head->last_up);
988 * Update pdp_new by interpreting the updvals according to the DS type
989 * (COUNTER, GAUGE, etc.).
991 * Returns 0 on success, -1 on error.
993 static int update_pdp_prep(
996 rrd_value_t *pdp_new,
999 unsigned long ds_idx;
1001 char *endptr; /* used in the conversion */
1004 enum dst_en dst_idx;
1006 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1007 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1009 /* make sure we do not build diffs with old last_ds values */
1010 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1011 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1012 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1015 /* NOTE: DST_CDEF should never enter this if block, because
1016 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1017 * accidently specified a value for the DST_CDEF. To handle this case,
1018 * an extra check is required. */
1020 if ((updvals[ds_idx + 1][0] != 'U') &&
1021 (dst_idx != DST_CDEF) &&
1022 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1025 /* pdp_new contains rate * time ... eg the bytes transferred during
1026 * the interval. Doing it this way saves a lot of math operations
1031 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1032 if ((updvals[ds_idx + 1][ii] < '0'
1033 || updvals[ds_idx + 1][ii] > '9')
1034 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1035 rrd_set_error("not a simple integer: '%s'",
1036 updvals[ds_idx + 1]);
1040 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1042 rrd_diff(updvals[ds_idx + 1],
1043 rrd->pdp_prep[ds_idx].last_ds);
1044 if (dst_idx == DST_COUNTER) {
1045 /* simple overflow catcher. This will fail
1046 * terribly for non 32 or 64 bit counters
1047 * ... are there any others in SNMP land?
1049 if (pdp_new[ds_idx] < (double) 0.0)
1050 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1051 if (pdp_new[ds_idx] < (double) 0.0)
1052 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1054 rate = pdp_new[ds_idx] / interval;
1056 pdp_new[ds_idx] = DNAN;
1060 old_locale = setlocale(LC_NUMERIC, "C");
1062 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1063 setlocale(LC_NUMERIC, old_locale);
1065 rrd_set_error("converting '%s' to float: %s",
1066 updvals[ds_idx + 1], rrd_strerror(errno));
1069 if (endptr[0] != '\0') {
1071 ("conversion of '%s' to float not complete: tail '%s'",
1072 updvals[ds_idx + 1], endptr);
1075 rate = pdp_new[ds_idx] / interval;
1079 old_locale = setlocale(LC_NUMERIC, "C");
1081 strtod(updvals[ds_idx + 1], &endptr) * interval;
1082 setlocale(LC_NUMERIC, old_locale);
1084 rrd_set_error("converting '%s' to float: %s",
1085 updvals[ds_idx + 1], rrd_strerror(errno));
1088 if (endptr[0] != '\0') {
1090 ("conversion of '%s' to float not complete: tail '%s'",
1091 updvals[ds_idx + 1], endptr);
1094 rate = pdp_new[ds_idx] / interval;
1097 rrd_set_error("rrd contains unknown DS type : '%s'",
1098 rrd->ds_def[ds_idx].dst);
1101 /* break out of this for loop if the error string is set */
1102 if (rrd_test_error()) {
1105 /* make sure pdp_temp is neither too large or too small
1106 * if any of these occur it becomes unknown ...
1107 * sorry folks ... */
1109 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1110 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1111 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1112 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1113 pdp_new[ds_idx] = DNAN;
1116 /* no news is news all the same */
1117 pdp_new[ds_idx] = DNAN;
1121 /* make a copy of the command line argument for the next run */
1123 fprintf(stderr, "prep ds[%lu]\t"
1127 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1130 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1132 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1138 * How many PDP steps have elapsed since the last update? Returns the answer,
1139 * and stores the time between the last update and the last PDP in pre_time,
1140 * and the time between the last PDP and the current time in post_int.
1142 static int calculate_elapsed_steps(
1144 unsigned long current_time,
1145 unsigned long current_time_usec,
1149 unsigned long *proc_pdp_cnt)
1151 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1152 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1154 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1155 * when it was last updated */
1156 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1158 /* when was the current pdp started */
1159 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1160 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1162 /* when did the last pdp_st occur */
1163 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1164 occu_pdp_st = current_time - occu_pdp_age;
1166 if (occu_pdp_st > proc_pdp_st) {
1167 /* OK we passed the pdp_st moment */
1168 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1169 * occurred before the latest
1171 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1172 *post_int = occu_pdp_age; /* how much after it */
1173 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1175 *pre_int = interval;
1179 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1182 printf("proc_pdp_age %lu\t"
1184 "occu_pfp_age %lu\t"
1188 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1189 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1192 /* compute the number of elapsed pdp_st moments */
1193 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1197 * Increment the PDP values by the values in pdp_new, or else initialize them.
1199 static void simple_update(
1202 rrd_value_t *pdp_new)
1206 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1207 if (isnan(pdp_new[i])) {
1208 /* this is not really accurate if we use subsecond data arrival time
1209 should have thought of it when going subsecond resolution ...
1210 sorry next format change we will have it! */
1211 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1214 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1215 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1217 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1226 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1227 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1233 * Call process_pdp_st for each DS.
1235 * Returns 0 on success, -1 on error.
1237 static int process_all_pdp_st(
1242 unsigned long elapsed_pdp_st,
1243 rrd_value_t *pdp_new,
1244 rrd_value_t *pdp_temp)
1246 unsigned long ds_idx;
1248 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1249 rate*seconds which occurred up to the last run.
1250 pdp_new[] contains rate*seconds from the latest run.
1251 pdp_temp[] will contain the rate for cdp */
1253 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1254 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1255 elapsed_pdp_st * rrd->stat_head->pdp_step,
1256 pdp_new, pdp_temp) == -1) {
1260 fprintf(stderr, "PDP UPD ds[%lu]\t"
1261 "elapsed_pdp_st %lu\t"
1264 "new_unkn_sec %5lu\n",
1268 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1269 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1276 * Process an update that occurs after one of the PDP moments.
1277 * Increments the PDP value, sets NAN if time greater than the
1278 * heartbeats have elapsed, processes CDEFs.
1280 * Returns 0 on success, -1 on error.
1282 static int process_pdp_st(
1284 unsigned long ds_idx,
1288 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1289 rrd_value_t *pdp_new,
1290 rrd_value_t *pdp_temp)
1294 /* update pdp_prep to the current pdp_st. */
1295 double pre_unknown = 0.0;
1296 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1297 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1299 rpnstack_t rpnstack; /* used for COMPUTE DS */
1301 rpnstack_init(&rpnstack);
1304 if (isnan(pdp_new[ds_idx])) {
1305 /* a final bit of unknown to be added before calculation
1306 we use a temporary variable for this so that we
1307 don't have to turn integer lines before using the value */
1308 pre_unknown = pre_int;
1310 if (isnan(scratch[PDP_val].u_val)) {
1311 scratch[PDP_val].u_val = 0;
1313 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1316 /* if too much of the pdp_prep is unknown we dump it */
1317 /* if the interval is larger thatn mrhb we get NAN */
1318 if ((interval > mrhb) ||
1319 (rrd->stat_head->pdp_step / 2.0 <
1320 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1321 pdp_temp[ds_idx] = DNAN;
1323 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1324 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1328 /* process CDEF data sources; remember each CDEF DS can
1329 * only reference other DS with a lower index number */
1330 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1334 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1335 /* substitute data values for OP_VARIABLE nodes */
1336 for (i = 0; rpnp[i].op != OP_END; i++) {
1337 if (rpnp[i].op == OP_VARIABLE) {
1338 rpnp[i].op = OP_NUMBER;
1339 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1342 /* run the rpn calculator */
1343 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1345 rpnstack_free(&rpnstack);
1350 /* make pdp_prep ready for the next run */
1351 if (isnan(pdp_new[ds_idx])) {
1352 /* this is not realy accurate if we use subsecond data arival time
1353 should have thought of it when going subsecond resolution ...
1354 sorry next format change we will have it! */
1355 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1356 scratch[PDP_val].u_val = DNAN;
1358 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1359 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1361 rpnstack_free(&rpnstack);
1366 * Iterate over all the RRAs for a given DS and:
1367 * 1. Decide whether to schedule a smooth later
1368 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1371 * Returns 0 on success, -1 on error
1373 static int update_all_cdp_prep(
1375 unsigned long *rra_step_cnt,
1376 unsigned long rra_begin,
1377 rrd_file_t *rrd_file,
1378 unsigned long elapsed_pdp_st,
1379 unsigned long proc_pdp_cnt,
1380 rrd_value_t **last_seasonal_coef,
1381 rrd_value_t **seasonal_coef,
1382 rrd_value_t *pdp_temp,
1383 unsigned long *skip_update,
1384 int *schedule_smooth)
1386 unsigned long rra_idx;
1388 /* index into the CDP scratch array */
1389 enum cf_en current_cf;
1390 unsigned long rra_start;
1392 /* number of rows to be updated in an RRA for a data value. */
1393 unsigned long start_pdp_offset;
1395 rra_start = rra_begin;
1396 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1397 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1399 rrd->rra_def[rra_idx].pdp_cnt -
1400 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1401 skip_update[rra_idx] = 0;
1402 if (start_pdp_offset <= elapsed_pdp_st) {
1403 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1404 rrd->rra_def[rra_idx].pdp_cnt + 1;
1406 rra_step_cnt[rra_idx] = 0;
1409 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1410 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1411 * so that they will be correct for the next observed value; note that for
1412 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1413 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1414 if (rra_step_cnt[rra_idx] > 1) {
1415 skip_update[rra_idx] = 1;
1416 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1417 elapsed_pdp_st, last_seasonal_coef);
1418 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1419 elapsed_pdp_st + 1, seasonal_coef);
1421 /* periodically run a smoother for seasonal effects */
1422 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1425 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1426 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1427 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1430 *schedule_smooth = 1;
1433 if (rrd_test_error())
1437 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1438 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1439 current_cf) == -1) {
1443 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1444 sizeof(rrd_value_t);
1450 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1452 static int do_schedule_smooth(
1454 unsigned long rra_idx,
1455 unsigned long elapsed_pdp_st)
1457 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1458 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1459 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1460 unsigned long seasonal_smooth_idx =
1461 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1462 unsigned long *init_seasonal =
1463 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1465 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1466 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1467 * really an RRA level, not a data source within RRA level parameter, but
1468 * the rra_def is read only for rrd_update (not flushed to disk). */
1469 if (*init_seasonal > BURNIN_CYCLES) {
1470 /* someone has no doubt invented a trick to deal with this wrap around,
1471 * but at least this code is clear. */
1472 if (seasonal_smooth_idx > cur_row) {
1473 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1474 * between PDP and CDP */
1475 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1477 /* can't rely on negative numbers because we are working with
1478 * unsigned values */
1479 return (cur_row + elapsed_pdp_st >= row_cnt
1480 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1482 /* mark off one of the burn-in cycles */
1483 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1487 * For a given RRA, iterate over the data sources and call the appropriate
1488 * consolidation function.
1490 * Returns 0 on success, -1 on error.
1492 static int update_cdp_prep(
1494 unsigned long elapsed_pdp_st,
1495 unsigned long start_pdp_offset,
1496 unsigned long *rra_step_cnt,
1498 rrd_value_t *pdp_temp,
1499 rrd_value_t *last_seasonal_coef,
1500 rrd_value_t *seasonal_coef,
1503 unsigned long ds_idx, cdp_idx;
1505 /* update CDP_PREP areas */
1506 /* loop over data soures within each RRA */
1507 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1509 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1511 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1512 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1513 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1514 elapsed_pdp_st, start_pdp_offset,
1515 rrd->rra_def[rra_idx].pdp_cnt,
1516 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1519 /* Nothing to consolidate if there's one PDP per CDP. However, if
1520 * we've missed some PDPs, let's update null counters etc. */
1521 if (elapsed_pdp_st > 2) {
1522 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1523 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1528 if (rrd_test_error())
1530 } /* endif data sources loop */
1535 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1536 * primary value, secondary value, and # of unknowns.
1538 static void update_cdp(
1541 rrd_value_t pdp_temp_val,
1542 unsigned long rra_step_cnt,
1543 unsigned long elapsed_pdp_st,
1544 unsigned long start_pdp_offset,
1545 unsigned long pdp_cnt,
1550 /* shorthand variables */
1551 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1552 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1553 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1554 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1557 /* If we are in this block, as least 1 CDP value will be written to
1558 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1559 * to be written, then the "fill in" value is the CDP_secondary_val
1561 if (isnan(pdp_temp_val)) {
1562 *cdp_unkn_pdp_cnt += start_pdp_offset;
1563 *cdp_secondary_val = DNAN;
1565 /* CDP_secondary value is the RRA "fill in" value for intermediary
1566 * CDP data entries. No matter the CF, the value is the same because
1567 * the average, max, min, and last of a list of identical values is
1568 * the same, namely, the value itself. */
1569 *cdp_secondary_val = pdp_temp_val;
1572 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1573 *cdp_primary_val = DNAN;
1574 if (current_cf == CF_AVERAGE) {
1576 initialize_average_carry_over(pdp_temp_val,
1578 start_pdp_offset, pdp_cnt);
1580 *cdp_val = pdp_temp_val;
1583 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1584 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1585 } /* endif meets xff value requirement for a valid value */
1586 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1587 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1588 if (isnan(pdp_temp_val))
1589 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1591 *cdp_unkn_pdp_cnt = 0;
1592 } else { /* rra_step_cnt[i] == 0 */
1595 if (isnan(*cdp_val)) {
1596 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1599 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1603 if (isnan(pdp_temp_val)) {
1604 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1607 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1614 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1615 * on the type of consolidation function.
1617 static void initialize_cdp_val(
1620 rrd_value_t pdp_temp_val,
1621 unsigned long elapsed_pdp_st,
1622 unsigned long start_pdp_offset,
1623 unsigned long pdp_cnt)
1625 rrd_value_t cum_val, cur_val;
1627 switch (current_cf) {
1629 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1630 cur_val = IFDNAN(pdp_temp_val, 0.0);
1631 scratch[CDP_primary_val].u_val =
1632 (cum_val + cur_val * start_pdp_offset) /
1633 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1634 scratch[CDP_val].u_val =
1635 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1636 start_pdp_offset, pdp_cnt);
1639 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1640 cur_val = IFDNAN(pdp_temp_val, -DINF);
1643 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1645 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1651 if (cur_val > cum_val)
1652 scratch[CDP_primary_val].u_val = cur_val;
1654 scratch[CDP_primary_val].u_val = cum_val;
1655 /* initialize carry over value */
1656 scratch[CDP_val].u_val = pdp_temp_val;
1659 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1660 cur_val = IFDNAN(pdp_temp_val, DINF);
1663 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1665 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1671 if (cur_val < cum_val)
1672 scratch[CDP_primary_val].u_val = cur_val;
1674 scratch[CDP_primary_val].u_val = cum_val;
1675 /* initialize carry over value */
1676 scratch[CDP_val].u_val = pdp_temp_val;
1680 scratch[CDP_primary_val].u_val = pdp_temp_val;
1681 /* initialize carry over value */
1682 scratch[CDP_val].u_val = pdp_temp_val;
1688 * Update the consolidation function for Holt-Winters functions as
1689 * well as other functions that don't actually consolidate multiple
1692 static void reset_cdp(
1694 unsigned long elapsed_pdp_st,
1695 rrd_value_t *pdp_temp,
1696 rrd_value_t *last_seasonal_coef,
1697 rrd_value_t *seasonal_coef,
1701 enum cf_en current_cf)
1703 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1705 switch (current_cf) {
1708 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1709 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1712 case CF_DEVSEASONAL:
1713 /* need to update cached seasonal values, so they are consistent
1714 * with the bulk update */
1715 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1716 * CDP_last_deviation are the same. */
1717 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1718 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1722 /* need to update the null_count and last_null_count.
1723 * even do this for non-DNAN pdp_temp because the
1724 * algorithm is not learning from batch updates. */
1725 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1726 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1729 scratch[CDP_primary_val].u_val = DNAN;
1730 scratch[CDP_secondary_val].u_val = DNAN;
1733 /* do not count missed bulk values as failures */
1734 scratch[CDP_primary_val].u_val = 0;
1735 scratch[CDP_secondary_val].u_val = 0;
1736 /* need to reset violations buffer.
1737 * could do this more carefully, but for now, just
1738 * assume a bulk update wipes away all violations. */
1739 erase_violations(rrd, cdp_idx, rra_idx);
1744 static rrd_value_t initialize_average_carry_over(
1745 rrd_value_t pdp_temp_val,
1746 unsigned long elapsed_pdp_st,
1747 unsigned long start_pdp_offset,
1748 unsigned long pdp_cnt)
1750 /* initialize carry over value */
1751 if (isnan(pdp_temp_val)) {
1754 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1758 * Update or initialize a CDP value based on the consolidation
1761 * Returns the new value.
1763 static rrd_value_t calculate_cdp_val(
1764 rrd_value_t cdp_val,
1765 rrd_value_t pdp_temp_val,
1766 unsigned long elapsed_pdp_st,
1777 if (isnan(cdp_val)) {
1778 if (current_cf == CF_AVERAGE) {
1779 pdp_temp_val *= elapsed_pdp_st;
1782 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1783 i, ii, pdp_temp_val);
1785 return pdp_temp_val;
1787 if (current_cf == CF_AVERAGE)
1788 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1789 if (current_cf == CF_MINIMUM)
1790 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1791 if (current_cf == CF_MAXIMUM)
1792 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1794 return pdp_temp_val;
1798 * For each RRA, update the seasonal values and then call update_aberrant_CF
1799 * for each data source.
1801 * Return 0 on success, -1 on error.
1803 static int update_aberrant_cdps(
1805 rrd_file_t *rrd_file,
1806 unsigned long rra_begin,
1807 unsigned long elapsed_pdp_st,
1808 rrd_value_t *pdp_temp,
1809 rrd_value_t **seasonal_coef)
1811 unsigned long rra_idx, ds_idx, j;
1813 /* number of PDP steps since the last update that
1814 * are assigned to the first CDP to be generated
1815 * since the last update. */
1816 unsigned short scratch_idx;
1817 unsigned long rra_start;
1818 enum cf_en current_cf;
1820 /* this loop is only entered if elapsed_pdp_st < 3 */
1821 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1822 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1823 rra_start = rra_begin;
1824 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1825 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1826 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1827 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1828 if (scratch_idx == CDP_primary_val) {
1829 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1830 elapsed_pdp_st + 1, seasonal_coef);
1832 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1833 elapsed_pdp_st + 2, seasonal_coef);
1836 if (rrd_test_error())
1838 /* loop over data soures within each RRA */
1839 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1840 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1841 rra_idx * (rrd->stat_head->ds_cnt) +
1842 ds_idx, rra_idx, ds_idx, scratch_idx,
1846 rra_start += rrd->rra_def[rra_idx].row_cnt
1847 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1854 * Move sequentially through the file, writing one RRA at a time. Note this
1855 * architecture divorces the computation of CDP with flushing updated RRA
1858 * Return 0 on success, -1 on error.
1860 static int write_to_rras(
1862 rrd_file_t *rrd_file,
1863 unsigned long *rra_step_cnt,
1864 unsigned long rra_begin,
1865 time_t current_time,
1866 unsigned long *skip_update,
1867 rrd_info_t ** pcdp_summary)
1869 unsigned long rra_idx;
1870 unsigned long rra_start;
1871 time_t rra_time = 0; /* time of update for a RRA */
1873 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1875 /* Ready to write to disk */
1876 rra_start = rra_begin;
1878 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1879 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1880 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1883 unsigned short scratch_idx;
1884 unsigned long step_subtract;
1886 for (scratch_idx = CDP_primary_val,
1888 rra_step_cnt[rra_idx] > 0;
1889 rra_step_cnt[rra_idx]--,
1890 scratch_idx = CDP_secondary_val,
1891 step_subtract = 2) {
1895 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1897 /* increment, with wrap-around */
1898 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1899 rra_ptr->cur_row = 0;
1901 /* we know what our position should be */
1902 rra_pos_new = rra_start
1903 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1905 /* re-seek if the position is wrong or we wrapped around */
1906 if (rra_pos_new != rrd_file->pos) {
1907 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1908 rrd_set_error("seek error in rrd");
1913 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1916 if (skip_update[rra_idx])
1919 if (*pcdp_summary != NULL) {
1920 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1922 rra_time = (current_time - current_time % step_time)
1923 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1927 (rrd_file, rrd, rra_idx, scratch_idx,
1928 pcdp_summary, rra_time) == -1)
1932 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1939 * Write out one row of values (one value per DS) to the archive.
1941 * Returns 0 on success, -1 on error.
1943 static int write_RRA_row(
1944 rrd_file_t *rrd_file,
1946 unsigned long rra_idx,
1947 unsigned short CDP_scratch_idx,
1948 rrd_info_t ** pcdp_summary,
1951 unsigned long ds_idx, cdp_idx;
1954 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1955 /* compute the cdp index */
1956 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1958 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1959 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1960 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1962 if (*pcdp_summary != NULL) {
1963 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1964 /* append info to the return hash */
1965 *pcdp_summary = rrd_info_push(*pcdp_summary,
1967 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1968 rrd->rra_def[rra_idx].cf_nam,
1969 rrd->rra_def[rra_idx].pdp_cnt,
1970 rrd->ds_def[ds_idx].ds_nam),
1973 if (rrd_write(rrd_file,
1974 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1975 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1976 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1984 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1986 * Returns 0 on success, -1 otherwise
1988 static int smooth_all_rras(
1990 rrd_file_t *rrd_file,
1991 unsigned long rra_begin)
1993 unsigned long rra_start = rra_begin;
1994 unsigned long rra_idx;
1996 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1997 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1998 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2000 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2002 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2003 if (rrd_test_error())
2006 rra_start += rrd->rra_def[rra_idx].row_cnt
2007 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2014 * Flush changes to disk (unless we're using mmap)
2016 * Returns 0 on success, -1 otherwise
2018 static int write_changes_to_disk(
2020 rrd_file_t *rrd_file,
2023 /* we just need to write back the live header portion now */
2024 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2025 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2026 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2028 rrd_set_error("seek rrd for live header writeback");
2032 if (rrd_write(rrd_file, rrd->live_head,
2033 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2034 rrd_set_error("rrd_write live_head to rrd");
2038 if (rrd_write(rrd_file, rrd->legacy_last_up,
2039 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2040 rrd_set_error("rrd_write live_head to rrd");
2046 if (rrd_write(rrd_file, rrd->pdp_prep,
2047 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2048 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2049 rrd_set_error("rrd_write pdp_prep to rrd");
2053 if (rrd_write(rrd_file, rrd->cdp_prep,
2054 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2055 rrd->stat_head->ds_cnt)
2056 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2057 rrd->stat_head->ds_cnt)) {
2059 rrd_set_error("rrd_write cdp_prep to rrd");
2063 if (rrd_write(rrd_file, rrd->rra_ptr,
2064 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2065 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2066 rrd_set_error("rrd_write rra_ptr to rrd");