2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
111 unsigned long *skip_update,
112 int *schedule_smooth);
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
124 static int get_time_from_reading(
128 time_t *current_time,
129 unsigned long *current_time_usec,
132 static int update_pdp_prep(
135 rrd_value_t *pdp_new,
138 static int calculate_elapsed_steps(
140 unsigned long current_time,
141 unsigned long current_time_usec,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
163 unsigned long ds_idx,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
200 static void update_cdp(
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
212 static void initialize_cdp_val(
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
245 static int update_aberrant_cdps(
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
270 static int smooth_all_rras(
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
276 static int write_changes_to_disk(
278 rrd_file_t *rrd_file,
283 * normalize time as returned by gettimeofday. usec part must
286 static inline void normalize_time(
289 if (t->tv_usec < 0) {
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
310 *current_time_usec = tmp_time.tv_usec;
312 *current_time_usec = 0;
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
323 rrd_info_t *result = NULL;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
333 opterr = 0; /* initialize getopt */
336 int option_index = 0;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
388 int option_index = 0;
392 char *opt_daemon = NULL;
395 opterr = 0; /* initialize getopt */
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
405 tmplt = strdup(optarg);
409 if (opt_daemon != NULL)
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
414 rrd_set_error("strdup failed.");
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
438 rrd_set_error("The caching daemon cannot be used together with "
443 if (! rrdc_is_connected(opt_daemon))
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
448 else /* we are connected */
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (const char *const *) (argv + optind + 1)); /* values */
454 rrd_set_error("Failed sending the values to rrdcached: %s",
464 if (opt_daemon != NULL)
473 const char *filename,
478 return _rrd_update(filename, tmplt, argc, argv, NULL);
482 const char *filename,
486 rrd_info_t * pcdp_summary)
491 unsigned long rra_begin; /* byte pointer to the rra
492 * area in the rrd file. this
493 * pointer never changes value */
494 rrd_value_t *pdp_new; /* prepare the incoming data to be added
495 * to the existing entry */
496 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
497 * to the cdp values */
499 long *tmpl_idx; /* index representing the settings
500 * transported by the tmplt index */
501 unsigned long tmpl_cnt = 2; /* time and data */
503 time_t current_time = 0;
504 unsigned long current_time_usec = 0; /* microseconds part of current time */
506 int schedule_smooth = 0;
508 /* number of elapsed PDP steps since last update */
509 unsigned long *rra_step_cnt = NULL;
511 int version; /* rrd version */
512 rrd_file_t *rrd_file;
513 char *arg_copy; /* for processing the argv */
514 unsigned long *skip_update; /* RRAs to advance but not write */
516 /* need at least 1 arguments: data. */
518 rrd_set_error("Not enough arguments");
523 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
526 /* We are now at the beginning of the rra's */
527 rra_begin = rrd_file->header_len;
529 version = atoi(rrd.stat_head->version);
531 initialize_time(¤t_time, ¤t_time_usec, version);
533 /* get exclusive lock to whole file.
534 * lock gets removed when we close the file.
536 if (rrd_lock(rrd_file) != 0) {
537 rrd_set_error("could not lock RRD");
541 if (allocate_data_structures(&rrd, &updvals,
542 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
543 &rra_step_cnt, &skip_update,
548 /* loop through the arguments. */
549 for (arg_i = 0; arg_i < argc; arg_i++) {
550 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
551 rrd_set_error("failed duplication argv entry");
554 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
555 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
556 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
557 &pcdp_summary, version, skip_update,
558 &schedule_smooth) == -1) {
559 if (rrd_test_error()) { /* Should have error string always here */
562 /* Prepend file name to error message */
563 if ((save_error = strdup(rrd_get_error())) != NULL) {
564 rrd_set_error("%s: %s", filename, save_error);
576 /* if we got here and if there is an error and if the file has not been
577 * written to, then close things up and return. */
578 if (rrd_test_error()) {
579 goto err_free_structures;
582 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
583 goto err_free_structures;
587 /* calling the smoothing code here guarantees at most one smoothing
588 * operation per rrd_update call. Unfortunately, it is possible with bulk
589 * updates, or a long-delayed update for smoothing to occur off-schedule.
590 * This really isn't critical except during the burn-in cycles. */
591 if (schedule_smooth) {
592 smooth_all_rras(&rrd, rrd_file, rra_begin);
595 /* rrd_dontneed(rrd_file,&rrd); */
621 * Allocate some important arrays used, and initialize the template.
623 * When it returns, either all of the structures are allocated
624 * or none of them are.
626 * Returns 0 on success, -1 on error.
628 static int allocate_data_structures(
631 rrd_value_t **pdp_temp,
634 unsigned long *tmpl_cnt,
635 unsigned long **rra_step_cnt,
636 unsigned long **skip_update,
637 rrd_value_t **pdp_new)
640 if ((*updvals = (char **) malloc(sizeof(char *)
641 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
642 rrd_set_error("allocating updvals pointer array.");
645 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
646 * rrd->stat_head->ds_cnt)) ==
648 rrd_set_error("allocating pdp_temp.");
649 goto err_free_updvals;
651 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
653 rrd->stat_head->rra_cnt)) ==
655 rrd_set_error("allocating skip_update.");
656 goto err_free_pdp_temp;
658 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
659 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
660 rrd_set_error("allocating tmpl_idx.");
661 goto err_free_skip_update;
663 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
666 rra_cnt))) == NULL) {
667 rrd_set_error("allocating rra_step_cnt.");
668 goto err_free_tmpl_idx;
671 /* initialize tmplt redirector */
672 /* default config example (assume DS 1 is a CDEF DS)
673 tmpl_idx[0] -> 0; (time)
674 tmpl_idx[1] -> 1; (DS 0)
675 tmpl_idx[2] -> 3; (DS 2)
676 tmpl_idx[3] -> 4; (DS 3) */
677 (*tmpl_idx)[0] = 0; /* time */
678 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
679 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
680 (*tmpl_idx)[ii++] = i;
685 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
686 goto err_free_rra_step_cnt;
690 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
691 * rrd->stat_head->ds_cnt)) == NULL) {
692 rrd_set_error("allocating pdp_new.");
693 goto err_free_rra_step_cnt;
698 err_free_rra_step_cnt:
702 err_free_skip_update:
712 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
714 * Returns 0 on success.
716 static int parse_template(
719 unsigned long *tmpl_cnt,
722 char *dsname, *tmplt_copy;
723 unsigned int tmpl_len, i;
726 *tmpl_cnt = 1; /* the first entry is the time */
728 /* we should work on a writeable copy here */
729 if ((tmplt_copy = strdup(tmplt)) == NULL) {
730 rrd_set_error("error copying tmplt '%s'", tmplt);
736 tmpl_len = strlen(tmplt_copy);
737 for (i = 0; i <= tmpl_len; i++) {
738 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
739 tmplt_copy[i] = '\0';
740 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
741 rrd_set_error("tmplt contains more DS definitions than RRD");
743 goto out_free_tmpl_copy;
745 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
746 rrd_set_error("unknown DS name '%s'", dsname);
748 goto out_free_tmpl_copy;
750 /* go to the next entry on the tmplt_copy */
752 dsname = &tmplt_copy[i + 1];
762 * Parse an update string, updates the primary data points (PDPs)
763 * and consolidated data points (CDPs), and writes changes to the RRAs.
765 * Returns 0 on success, -1 on error.
767 static int process_arg(
770 rrd_file_t *rrd_file,
771 unsigned long rra_begin,
772 time_t *current_time,
773 unsigned long *current_time_usec,
774 rrd_value_t *pdp_temp,
775 rrd_value_t *pdp_new,
776 unsigned long *rra_step_cnt,
779 unsigned long tmpl_cnt,
780 rrd_info_t ** pcdp_summary,
782 unsigned long *skip_update,
783 int *schedule_smooth)
785 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
787 /* a vector of future Holt-Winters seasonal coefs */
788 unsigned long elapsed_pdp_st;
790 double interval, pre_int, post_int; /* interval between this and
792 unsigned long proc_pdp_cnt;
794 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
795 current_time, current_time_usec, version) == -1) {
799 interval = (double) (*current_time - rrd->live_head->last_up)
800 + (double) ((long) *current_time_usec -
801 (long) rrd->live_head->last_up_usec) / 1e6f;
803 /* process the data sources and update the pdp_prep
804 * area accordingly */
805 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
809 elapsed_pdp_st = calculate_elapsed_steps(rrd,
811 *current_time_usec, interval,
815 /* has a pdp_st moment occurred since the last run ? */
816 if (elapsed_pdp_st == 0) {
817 /* no we have not passed a pdp_st moment. therefore update is simple */
818 simple_update(rrd, interval, pdp_new);
820 /* an pdp_st has occurred. */
821 if (process_all_pdp_st(rrd, interval,
823 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
826 if (update_all_cdp_prep(rrd, rra_step_cnt,
833 skip_update, schedule_smooth) == -1) {
834 goto err_free_coefficients;
836 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
837 elapsed_pdp_st, pdp_temp,
838 &seasonal_coef) == -1) {
839 goto err_free_coefficients;
841 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
842 *current_time, skip_update,
843 pcdp_summary) == -1) {
844 goto err_free_coefficients;
846 } /* endif a pdp_st has occurred */
847 rrd->live_head->last_up = *current_time;
848 rrd->live_head->last_up_usec = *current_time_usec;
851 *rrd->legacy_last_up = rrd->live_head->last_up;
854 free(last_seasonal_coef);
857 err_free_coefficients:
859 free(last_seasonal_coef);
864 * Parse a DS string (time + colon-separated values), storing the
865 * results in current_time, current_time_usec, and updvals.
867 * Returns 0 on success, -1 on error.
874 unsigned long tmpl_cnt,
875 time_t *current_time,
876 unsigned long *current_time_usec,
884 /* initialize all ds input to unknown except the first one
885 which has always got to be set */
886 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
889 /* separate all ds elements; first must be examined separately
890 due to alternate time syntax */
891 if ((p = strchr(input, '@')) != NULL) {
893 } else if ((p = strchr(input, ':')) != NULL) {
896 rrd_set_error("expected timestamp not found in data source from %s",
902 updvals[tmpl_idx[i++]] = p + 1;
907 updvals[tmpl_idx[i++]] = p + 1;
913 rrd_set_error("expected %lu data source readings (got %lu) from %s",
914 tmpl_cnt - 1, i, input);
918 if (get_time_from_reading(rrd, timesyntax, updvals,
919 current_time, current_time_usec,
927 * Parse the time in a DS string, store it in current_time and
928 * current_time_usec and verify that it's later than the last
929 * update for this DS.
931 * Returns 0 on success, -1 on error.
933 static int get_time_from_reading(
937 time_t *current_time,
938 unsigned long *current_time_usec,
942 char *parsetime_error = NULL;
944 rrd_time_value_t ds_tv;
945 struct timeval tmp_time; /* used for time conversion */
947 /* get the time from the reading ... handle N */
948 if (timesyntax == '@') { /* at-style */
949 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
950 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
953 if (ds_tv.type == RELATIVE_TO_END_TIME ||
954 ds_tv.type == RELATIVE_TO_START_TIME) {
955 rrd_set_error("specifying time relative to the 'start' "
956 "or 'end' makes no sense here: %s", updvals[0]);
959 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
960 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
961 } else if (strcmp(updvals[0], "N") == 0) {
962 gettimeofday(&tmp_time, 0);
963 normalize_time(&tmp_time);
964 *current_time = tmp_time.tv_sec;
965 *current_time_usec = tmp_time.tv_usec;
967 old_locale = setlocale(LC_NUMERIC, "C");
968 tmp = strtod(updvals[0], 0);
969 setlocale(LC_NUMERIC, old_locale);
970 *current_time = floor(tmp);
971 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
973 /* dont do any correction for old version RRDs */
975 *current_time_usec = 0;
977 if (*current_time < rrd->live_head->last_up ||
978 (*current_time == rrd->live_head->last_up &&
979 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
980 rrd_set_error("illegal attempt to update using time %ld when "
981 "last update time is %ld (minimum one second step)",
982 *current_time, rrd->live_head->last_up);
989 * Update pdp_new by interpreting the updvals according to the DS type
990 * (COUNTER, GAUGE, etc.).
992 * Returns 0 on success, -1 on error.
994 static int update_pdp_prep(
997 rrd_value_t *pdp_new,
1000 unsigned long ds_idx;
1002 char *endptr; /* used in the conversion */
1005 enum dst_en dst_idx;
1007 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1008 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1010 /* make sure we do not build diffs with old last_ds values */
1011 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1012 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1013 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1016 /* NOTE: DST_CDEF should never enter this if block, because
1017 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1018 * accidently specified a value for the DST_CDEF. To handle this case,
1019 * an extra check is required. */
1021 if ((updvals[ds_idx + 1][0] != 'U') &&
1022 (dst_idx != DST_CDEF) &&
1023 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1026 /* pdp_new contains rate * time ... eg the bytes transferred during
1027 * the interval. Doing it this way saves a lot of math operations
1032 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1033 if ((updvals[ds_idx + 1][ii] < '0'
1034 || updvals[ds_idx + 1][ii] > '9')
1035 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1036 rrd_set_error("not a simple integer: '%s'",
1037 updvals[ds_idx + 1]);
1041 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1043 rrd_diff(updvals[ds_idx + 1],
1044 rrd->pdp_prep[ds_idx].last_ds);
1045 if (dst_idx == DST_COUNTER) {
1046 /* simple overflow catcher. This will fail
1047 * terribly for non 32 or 64 bit counters
1048 * ... are there any others in SNMP land?
1050 if (pdp_new[ds_idx] < (double) 0.0)
1051 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1052 if (pdp_new[ds_idx] < (double) 0.0)
1053 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1055 rate = pdp_new[ds_idx] / interval;
1057 pdp_new[ds_idx] = DNAN;
1061 old_locale = setlocale(LC_NUMERIC, "C");
1063 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1064 setlocale(LC_NUMERIC, old_locale);
1066 rrd_set_error("converting '%s' to float: %s",
1067 updvals[ds_idx + 1], rrd_strerror(errno));
1070 if (endptr[0] != '\0') {
1072 ("conversion of '%s' to float not complete: tail '%s'",
1073 updvals[ds_idx + 1], endptr);
1076 rate = pdp_new[ds_idx] / interval;
1080 old_locale = setlocale(LC_NUMERIC, "C");
1082 strtod(updvals[ds_idx + 1], &endptr) * interval;
1083 setlocale(LC_NUMERIC, old_locale);
1085 rrd_set_error("converting '%s' to float: %s",
1086 updvals[ds_idx + 1], rrd_strerror(errno));
1089 if (endptr[0] != '\0') {
1091 ("conversion of '%s' to float not complete: tail '%s'",
1092 updvals[ds_idx + 1], endptr);
1095 rate = pdp_new[ds_idx] / interval;
1098 rrd_set_error("rrd contains unknown DS type : '%s'",
1099 rrd->ds_def[ds_idx].dst);
1102 /* break out of this for loop if the error string is set */
1103 if (rrd_test_error()) {
1106 /* make sure pdp_temp is neither too large or too small
1107 * if any of these occur it becomes unknown ...
1108 * sorry folks ... */
1110 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1111 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1112 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1113 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1114 pdp_new[ds_idx] = DNAN;
1117 /* no news is news all the same */
1118 pdp_new[ds_idx] = DNAN;
1122 /* make a copy of the command line argument for the next run */
1124 fprintf(stderr, "prep ds[%lu]\t"
1128 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1131 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1133 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1139 * How many PDP steps have elapsed since the last update? Returns the answer,
1140 * and stores the time between the last update and the last PDP in pre_time,
1141 * and the time between the last PDP and the current time in post_int.
1143 static int calculate_elapsed_steps(
1145 unsigned long current_time,
1146 unsigned long current_time_usec,
1150 unsigned long *proc_pdp_cnt)
1152 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1153 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1155 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1156 * when it was last updated */
1157 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1159 /* when was the current pdp started */
1160 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1161 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1163 /* when did the last pdp_st occur */
1164 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1165 occu_pdp_st = current_time - occu_pdp_age;
1167 if (occu_pdp_st > proc_pdp_st) {
1168 /* OK we passed the pdp_st moment */
1169 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1170 * occurred before the latest
1172 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1173 *post_int = occu_pdp_age; /* how much after it */
1174 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1176 *pre_int = interval;
1180 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1183 printf("proc_pdp_age %lu\t"
1185 "occu_pfp_age %lu\t"
1189 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1190 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1193 /* compute the number of elapsed pdp_st moments */
1194 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1198 * Increment the PDP values by the values in pdp_new, or else initialize them.
1200 static void simple_update(
1203 rrd_value_t *pdp_new)
1207 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1208 if (isnan(pdp_new[i])) {
1209 /* this is not really accurate if we use subsecond data arrival time
1210 should have thought of it when going subsecond resolution ...
1211 sorry next format change we will have it! */
1212 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1215 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1216 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1218 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1227 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1228 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1234 * Call process_pdp_st for each DS.
1236 * Returns 0 on success, -1 on error.
1238 static int process_all_pdp_st(
1243 unsigned long elapsed_pdp_st,
1244 rrd_value_t *pdp_new,
1245 rrd_value_t *pdp_temp)
1247 unsigned long ds_idx;
1249 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1250 rate*seconds which occurred up to the last run.
1251 pdp_new[] contains rate*seconds from the latest run.
1252 pdp_temp[] will contain the rate for cdp */
1254 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1255 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1256 elapsed_pdp_st * rrd->stat_head->pdp_step,
1257 pdp_new, pdp_temp) == -1) {
1261 fprintf(stderr, "PDP UPD ds[%lu]\t"
1262 "elapsed_pdp_st %lu\t"
1265 "new_unkn_sec %5lu\n",
1269 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1270 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1277 * Process an update that occurs after one of the PDP moments.
1278 * Increments the PDP value, sets NAN if time greater than the
1279 * heartbeats have elapsed, processes CDEFs.
1281 * Returns 0 on success, -1 on error.
1283 static int process_pdp_st(
1285 unsigned long ds_idx,
1289 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1290 rrd_value_t *pdp_new,
1291 rrd_value_t *pdp_temp)
1295 /* update pdp_prep to the current pdp_st. */
1296 double pre_unknown = 0.0;
1297 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1298 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1300 rpnstack_t rpnstack; /* used for COMPUTE DS */
1302 rpnstack_init(&rpnstack);
1305 if (isnan(pdp_new[ds_idx])) {
1306 /* a final bit of unknown to be added before calculation
1307 we use a temporary variable for this so that we
1308 don't have to turn integer lines before using the value */
1309 pre_unknown = pre_int;
1311 if (isnan(scratch[PDP_val].u_val)) {
1312 scratch[PDP_val].u_val = 0;
1314 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1317 /* if too much of the pdp_prep is unknown we dump it */
1318 /* if the interval is larger thatn mrhb we get NAN */
1319 if ((interval > mrhb) ||
1320 (rrd->stat_head->pdp_step / 2.0 <
1321 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1322 pdp_temp[ds_idx] = DNAN;
1324 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1325 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1329 /* process CDEF data sources; remember each CDEF DS can
1330 * only reference other DS with a lower index number */
1331 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1335 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1336 /* substitute data values for OP_VARIABLE nodes */
1337 for (i = 0; rpnp[i].op != OP_END; i++) {
1338 if (rpnp[i].op == OP_VARIABLE) {
1339 rpnp[i].op = OP_NUMBER;
1340 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1343 /* run the rpn calculator */
1344 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1346 rpnstack_free(&rpnstack);
1351 /* make pdp_prep ready for the next run */
1352 if (isnan(pdp_new[ds_idx])) {
1353 /* this is not realy accurate if we use subsecond data arival time
1354 should have thought of it when going subsecond resolution ...
1355 sorry next format change we will have it! */
1356 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1357 scratch[PDP_val].u_val = DNAN;
1359 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1360 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1362 rpnstack_free(&rpnstack);
1367 * Iterate over all the RRAs for a given DS and:
1368 * 1. Decide whether to schedule a smooth later
1369 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1372 * Returns 0 on success, -1 on error
1374 static int update_all_cdp_prep(
1376 unsigned long *rra_step_cnt,
1377 unsigned long rra_begin,
1378 rrd_file_t *rrd_file,
1379 unsigned long elapsed_pdp_st,
1380 unsigned long proc_pdp_cnt,
1381 rrd_value_t **last_seasonal_coef,
1382 rrd_value_t **seasonal_coef,
1383 rrd_value_t *pdp_temp,
1384 unsigned long *skip_update,
1385 int *schedule_smooth)
1387 unsigned long rra_idx;
1389 /* index into the CDP scratch array */
1390 enum cf_en current_cf;
1391 unsigned long rra_start;
1393 /* number of rows to be updated in an RRA for a data value. */
1394 unsigned long start_pdp_offset;
1396 rra_start = rra_begin;
1397 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1398 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1400 rrd->rra_def[rra_idx].pdp_cnt -
1401 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1402 skip_update[rra_idx] = 0;
1403 if (start_pdp_offset <= elapsed_pdp_st) {
1404 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1405 rrd->rra_def[rra_idx].pdp_cnt + 1;
1407 rra_step_cnt[rra_idx] = 0;
1410 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1411 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1412 * so that they will be correct for the next observed value; note that for
1413 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1414 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1415 if (rra_step_cnt[rra_idx] > 1) {
1416 skip_update[rra_idx] = 1;
1417 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1418 elapsed_pdp_st, last_seasonal_coef);
1419 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1420 elapsed_pdp_st + 1, seasonal_coef);
1422 /* periodically run a smoother for seasonal effects */
1423 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1426 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1427 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1428 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1431 *schedule_smooth = 1;
1434 if (rrd_test_error())
1438 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1439 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1440 current_cf) == -1) {
1444 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1445 sizeof(rrd_value_t);
1451 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1453 static int do_schedule_smooth(
1455 unsigned long rra_idx,
1456 unsigned long elapsed_pdp_st)
1458 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1459 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1460 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1461 unsigned long seasonal_smooth_idx =
1462 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1463 unsigned long *init_seasonal =
1464 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1466 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1467 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1468 * really an RRA level, not a data source within RRA level parameter, but
1469 * the rra_def is read only for rrd_update (not flushed to disk). */
1470 if (*init_seasonal > BURNIN_CYCLES) {
1471 /* someone has no doubt invented a trick to deal with this wrap around,
1472 * but at least this code is clear. */
1473 if (seasonal_smooth_idx > cur_row) {
1474 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1475 * between PDP and CDP */
1476 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1478 /* can't rely on negative numbers because we are working with
1479 * unsigned values */
1480 return (cur_row + elapsed_pdp_st >= row_cnt
1481 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1483 /* mark off one of the burn-in cycles */
1484 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1488 * For a given RRA, iterate over the data sources and call the appropriate
1489 * consolidation function.
1491 * Returns 0 on success, -1 on error.
1493 static int update_cdp_prep(
1495 unsigned long elapsed_pdp_st,
1496 unsigned long start_pdp_offset,
1497 unsigned long *rra_step_cnt,
1499 rrd_value_t *pdp_temp,
1500 rrd_value_t *last_seasonal_coef,
1501 rrd_value_t *seasonal_coef,
1504 unsigned long ds_idx, cdp_idx;
1506 /* update CDP_PREP areas */
1507 /* loop over data soures within each RRA */
1508 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1510 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1512 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1513 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1514 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1515 elapsed_pdp_st, start_pdp_offset,
1516 rrd->rra_def[rra_idx].pdp_cnt,
1517 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1520 /* Nothing to consolidate if there's one PDP per CDP. However, if
1521 * we've missed some PDPs, let's update null counters etc. */
1522 if (elapsed_pdp_st > 2) {
1523 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1524 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1529 if (rrd_test_error())
1531 } /* endif data sources loop */
1536 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1537 * primary value, secondary value, and # of unknowns.
1539 static void update_cdp(
1542 rrd_value_t pdp_temp_val,
1543 unsigned long rra_step_cnt,
1544 unsigned long elapsed_pdp_st,
1545 unsigned long start_pdp_offset,
1546 unsigned long pdp_cnt,
1551 /* shorthand variables */
1552 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1553 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1554 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1555 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1558 /* If we are in this block, as least 1 CDP value will be written to
1559 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1560 * to be written, then the "fill in" value is the CDP_secondary_val
1562 if (isnan(pdp_temp_val)) {
1563 *cdp_unkn_pdp_cnt += start_pdp_offset;
1564 *cdp_secondary_val = DNAN;
1566 /* CDP_secondary value is the RRA "fill in" value for intermediary
1567 * CDP data entries. No matter the CF, the value is the same because
1568 * the average, max, min, and last of a list of identical values is
1569 * the same, namely, the value itself. */
1570 *cdp_secondary_val = pdp_temp_val;
1573 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1574 *cdp_primary_val = DNAN;
1575 if (current_cf == CF_AVERAGE) {
1577 initialize_average_carry_over(pdp_temp_val,
1579 start_pdp_offset, pdp_cnt);
1581 *cdp_val = pdp_temp_val;
1584 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1585 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1586 } /* endif meets xff value requirement for a valid value */
1587 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1588 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1589 if (isnan(pdp_temp_val))
1590 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1592 *cdp_unkn_pdp_cnt = 0;
1593 } else { /* rra_step_cnt[i] == 0 */
1596 if (isnan(*cdp_val)) {
1597 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1600 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1604 if (isnan(pdp_temp_val)) {
1605 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1608 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1615 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1616 * on the type of consolidation function.
1618 static void initialize_cdp_val(
1621 rrd_value_t pdp_temp_val,
1622 unsigned long elapsed_pdp_st,
1623 unsigned long start_pdp_offset,
1624 unsigned long pdp_cnt)
1626 rrd_value_t cum_val, cur_val;
1628 switch (current_cf) {
1630 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1631 cur_val = IFDNAN(pdp_temp_val, 0.0);
1632 scratch[CDP_primary_val].u_val =
1633 (cum_val + cur_val * start_pdp_offset) /
1634 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1635 scratch[CDP_val].u_val =
1636 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1637 start_pdp_offset, pdp_cnt);
1640 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1641 cur_val = IFDNAN(pdp_temp_val, -DINF);
1644 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1646 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1652 if (cur_val > cum_val)
1653 scratch[CDP_primary_val].u_val = cur_val;
1655 scratch[CDP_primary_val].u_val = cum_val;
1656 /* initialize carry over value */
1657 scratch[CDP_val].u_val = pdp_temp_val;
1660 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1661 cur_val = IFDNAN(pdp_temp_val, DINF);
1664 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1666 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1672 if (cur_val < cum_val)
1673 scratch[CDP_primary_val].u_val = cur_val;
1675 scratch[CDP_primary_val].u_val = cum_val;
1676 /* initialize carry over value */
1677 scratch[CDP_val].u_val = pdp_temp_val;
1681 scratch[CDP_primary_val].u_val = pdp_temp_val;
1682 /* initialize carry over value */
1683 scratch[CDP_val].u_val = pdp_temp_val;
1689 * Update the consolidation function for Holt-Winters functions as
1690 * well as other functions that don't actually consolidate multiple
1693 static void reset_cdp(
1695 unsigned long elapsed_pdp_st,
1696 rrd_value_t *pdp_temp,
1697 rrd_value_t *last_seasonal_coef,
1698 rrd_value_t *seasonal_coef,
1702 enum cf_en current_cf)
1704 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1706 switch (current_cf) {
1709 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1710 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1713 case CF_DEVSEASONAL:
1714 /* need to update cached seasonal values, so they are consistent
1715 * with the bulk update */
1716 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1717 * CDP_last_deviation are the same. */
1718 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1719 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1723 /* need to update the null_count and last_null_count.
1724 * even do this for non-DNAN pdp_temp because the
1725 * algorithm is not learning from batch updates. */
1726 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1727 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1730 scratch[CDP_primary_val].u_val = DNAN;
1731 scratch[CDP_secondary_val].u_val = DNAN;
1734 /* do not count missed bulk values as failures */
1735 scratch[CDP_primary_val].u_val = 0;
1736 scratch[CDP_secondary_val].u_val = 0;
1737 /* need to reset violations buffer.
1738 * could do this more carefully, but for now, just
1739 * assume a bulk update wipes away all violations. */
1740 erase_violations(rrd, cdp_idx, rra_idx);
1745 static rrd_value_t initialize_average_carry_over(
1746 rrd_value_t pdp_temp_val,
1747 unsigned long elapsed_pdp_st,
1748 unsigned long start_pdp_offset,
1749 unsigned long pdp_cnt)
1751 /* initialize carry over value */
1752 if (isnan(pdp_temp_val)) {
1755 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1759 * Update or initialize a CDP value based on the consolidation
1762 * Returns the new value.
1764 static rrd_value_t calculate_cdp_val(
1765 rrd_value_t cdp_val,
1766 rrd_value_t pdp_temp_val,
1767 unsigned long elapsed_pdp_st,
1778 if (isnan(cdp_val)) {
1779 if (current_cf == CF_AVERAGE) {
1780 pdp_temp_val *= elapsed_pdp_st;
1783 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1784 i, ii, pdp_temp_val);
1786 return pdp_temp_val;
1788 if (current_cf == CF_AVERAGE)
1789 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1790 if (current_cf == CF_MINIMUM)
1791 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1792 if (current_cf == CF_MAXIMUM)
1793 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1795 return pdp_temp_val;
1799 * For each RRA, update the seasonal values and then call update_aberrant_CF
1800 * for each data source.
1802 * Return 0 on success, -1 on error.
1804 static int update_aberrant_cdps(
1806 rrd_file_t *rrd_file,
1807 unsigned long rra_begin,
1808 unsigned long elapsed_pdp_st,
1809 rrd_value_t *pdp_temp,
1810 rrd_value_t **seasonal_coef)
1812 unsigned long rra_idx, ds_idx, j;
1814 /* number of PDP steps since the last update that
1815 * are assigned to the first CDP to be generated
1816 * since the last update. */
1817 unsigned short scratch_idx;
1818 unsigned long rra_start;
1819 enum cf_en current_cf;
1821 /* this loop is only entered if elapsed_pdp_st < 3 */
1822 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1823 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1824 rra_start = rra_begin;
1825 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1826 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1827 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1828 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1829 if (scratch_idx == CDP_primary_val) {
1830 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1831 elapsed_pdp_st + 1, seasonal_coef);
1833 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1834 elapsed_pdp_st + 2, seasonal_coef);
1837 if (rrd_test_error())
1839 /* loop over data soures within each RRA */
1840 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1841 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1842 rra_idx * (rrd->stat_head->ds_cnt) +
1843 ds_idx, rra_idx, ds_idx, scratch_idx,
1847 rra_start += rrd->rra_def[rra_idx].row_cnt
1848 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1855 * Move sequentially through the file, writing one RRA at a time. Note this
1856 * architecture divorces the computation of CDP with flushing updated RRA
1859 * Return 0 on success, -1 on error.
1861 static int write_to_rras(
1863 rrd_file_t *rrd_file,
1864 unsigned long *rra_step_cnt,
1865 unsigned long rra_begin,
1866 time_t current_time,
1867 unsigned long *skip_update,
1868 rrd_info_t ** pcdp_summary)
1870 unsigned long rra_idx;
1871 unsigned long rra_start;
1872 time_t rra_time = 0; /* time of update for a RRA */
1874 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1876 /* Ready to write to disk */
1877 rra_start = rra_begin;
1879 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1880 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1881 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1884 unsigned short scratch_idx;
1885 unsigned long step_subtract;
1887 for (scratch_idx = CDP_primary_val,
1889 rra_step_cnt[rra_idx] > 0;
1890 rra_step_cnt[rra_idx]--,
1891 scratch_idx = CDP_secondary_val,
1892 step_subtract = 2) {
1896 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1898 /* increment, with wrap-around */
1899 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1900 rra_ptr->cur_row = 0;
1902 /* we know what our position should be */
1903 rra_pos_new = rra_start
1904 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1906 /* re-seek if the position is wrong or we wrapped around */
1907 if (rra_pos_new != rrd_file->pos) {
1908 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1909 rrd_set_error("seek error in rrd");
1914 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1917 if (skip_update[rra_idx])
1920 if (*pcdp_summary != NULL) {
1921 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1923 rra_time = (current_time - current_time % step_time)
1924 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1928 (rrd_file, rrd, rra_idx, scratch_idx,
1929 pcdp_summary, rra_time) == -1)
1932 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1935 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1942 * Write out one row of values (one value per DS) to the archive.
1944 * Returns 0 on success, -1 on error.
1946 static int write_RRA_row(
1947 rrd_file_t *rrd_file,
1949 unsigned long rra_idx,
1950 unsigned short CDP_scratch_idx,
1951 rrd_info_t ** pcdp_summary,
1954 unsigned long ds_idx, cdp_idx;
1957 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1958 /* compute the cdp index */
1959 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1961 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1962 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1963 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1965 if (*pcdp_summary != NULL) {
1966 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1967 /* append info to the return hash */
1968 *pcdp_summary = rrd_info_push(*pcdp_summary,
1970 ("[%lli]RRA[%s][%lu]DS[%s]", rra_time,
1971 rrd->rra_def[rra_idx].cf_nam,
1972 rrd->rra_def[rra_idx].pdp_cnt,
1973 rrd->ds_def[ds_idx].ds_nam),
1976 if (rrd_write(rrd_file,
1977 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1978 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1979 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1987 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1989 * Returns 0 on success, -1 otherwise
1991 static int smooth_all_rras(
1993 rrd_file_t *rrd_file,
1994 unsigned long rra_begin)
1996 unsigned long rra_start = rra_begin;
1997 unsigned long rra_idx;
1999 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2000 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2001 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2003 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2005 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2006 if (rrd_test_error())
2009 rra_start += rrd->rra_def[rra_idx].row_cnt
2010 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2017 * Flush changes to disk (unless we're using mmap)
2019 * Returns 0 on success, -1 otherwise
2021 static int write_changes_to_disk(
2023 rrd_file_t *rrd_file,
2026 /* we just need to write back the live header portion now */
2027 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2028 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2029 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2031 rrd_set_error("seek rrd for live header writeback");
2035 if (rrd_write(rrd_file, rrd->live_head,
2036 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2037 rrd_set_error("rrd_write live_head to rrd");
2041 if (rrd_write(rrd_file, rrd->legacy_last_up,
2042 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2043 rrd_set_error("rrd_write live_head to rrd");
2049 if (rrd_write(rrd_file, rrd->pdp_prep,
2050 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2051 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2052 rrd_set_error("rrd_write pdp_prep to rrd");
2056 if (rrd_write(rrd_file, rrd->cdp_prep,
2057 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2058 rrd->stat_head->ds_cnt)
2059 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2060 rrd->stat_head->ds_cnt)) {
2062 rrd_set_error("rrd_write cdp_prep to rrd");
2066 if (rrd_write(rrd_file, rrd->rra_ptr,
2067 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2068 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2069 rrd_set_error("rrd_write rra_ptr to rrd");