1 /*****************************************************************************
2 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
30 #include <sys/timeb.h>
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
44 static int gettimeofday(
46 struct __timezone *tz)
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 /* FUNCTION PROTOTYPES */
75 static int allocate_data_structures(
76 rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp,
77 const char *tmplt, long **tmpl_idx, unsigned long *tmpl_cnt,
78 unsigned long **rra_step_cnt, rrd_value_t **pdp_new);
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81 unsigned long *tmpl_cnt, long *tmpl_idx);
83 static int process_arg(
87 unsigned long rra_begin,
88 unsigned long *rra_current,
90 unsigned long *current_time_usec,
91 rrd_value_t *pdp_temp,
93 unsigned long *rra_step_cnt,
96 unsigned long tmpl_cnt,
97 info_t **pcdp_summary,
99 int *schedule_smooth);
101 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
102 unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec,
105 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals,
106 time_t *current_time, unsigned long *current_time_usec, int version);
108 static int update_pdp_prep(rrd_t *rrd, char **updvals,
109 rrd_value_t *pdp_new, double interval);
111 static int calculate_elapsed_steps(rrd_t *rrd,
112 unsigned long current_time, unsigned long current_time_usec,
113 double interval, double *pre_int, double *post_int,
114 unsigned long *proc_pdp_cnt);
116 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
118 static int process_all_pdp_st(rrd_t *rrd, double interval,
119 double pre_int, double post_int, unsigned long elapsed_pdp_st,
120 rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
122 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
123 double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new,
124 rrd_value_t *pdp_temp);
126 static int update_all_cdp_prep(
127 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
128 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
129 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
130 rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth);
132 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx,
133 unsigned long elapsed_pdp_st);
135 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st,
136 unsigned long start_pdp_offset, unsigned long *rra_step_cnt,
137 int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
138 rrd_value_t *seasonal_coef, int current_cf);
140 static void update_cdp(unival *scratch, int current_cf,
141 rrd_value_t pdp_temp_val, unsigned long rra_step_cnt,
142 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
143 unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
145 static void initialize_cdp_val(unival *scratch, int current_cf,
146 rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st,
147 unsigned long start_pdp_offset, unsigned long pdp_cnt);
149 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st,
150 rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
151 rrd_value_t *seasonal_coef,
152 int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
154 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
155 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
156 unsigned long pdp_cnt);
158 static rrd_value_t calculate_cdp_val(
159 rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
160 unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
162 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file,
163 unsigned long rra_begin, unsigned long *rra_current,
164 unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
166 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file,
167 unsigned long *rra_step_cnt, unsigned long rra_begin,
168 unsigned long *rra_current, time_t current_time, info_t **pcdp_summary);
170 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
171 unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
174 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file,
175 unsigned long rra_begin);
178 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file,
183 * normalize time as returned by gettimeofday. usec part must
186 static inline void normalize_time(
189 if (t->tv_usec < 0) {
196 * Sets current_time and current_time_usec based on the current time.
197 * current_time_usec is set to 0 if the version number is 1 or 2.
199 static inline void initialize_time(
200 time_t *current_time, unsigned long *current_time_usec,
203 struct timeval tmp_time; /* used for time conversion */
205 gettimeofday(&tmp_time, 0);
206 normalize_time(&tmp_time);
207 *current_time = tmp_time.tv_sec;
209 *current_time_usec = tmp_time.tv_usec;
211 *current_time_usec = 0;
215 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
217 info_t *rrd_update_v(
222 info_t *result = NULL;
224 struct option long_options[] = {
225 {"template", required_argument, 0, 't'},
231 opterr = 0; /* initialize getopt */
234 int option_index = 0;
237 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
248 rrd_set_error("unknown option '%s'", argv[optind - 1]);
253 /* need at least 2 arguments: filename, data. */
254 if (argc - optind < 2) {
255 rrd_set_error("Not enough arguments");
259 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
260 rc.u_int = _rrd_update(argv[optind], tmplt,
262 (const char **) (argv + optind + 1), result);
263 result->value.u_int = rc.u_int;
272 struct option long_options[] = {
273 {"template", required_argument, 0, 't'},
276 int option_index = 0;
282 opterr = 0; /* initialize getopt */
285 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
292 tmplt = strdup(optarg);
296 rrd_set_error("unknown option '%s'", argv[optind - 1]);
301 /* need at least 2 arguments: filename, data. */
302 if (argc - optind < 2) {
303 rrd_set_error("Not enough arguments");
308 rc = rrd_update_r(argv[optind], tmplt,
309 argc - optind - 1, (const char **) (argv + optind + 1));
315 const char *filename,
320 return _rrd_update(filename, tmplt, argc, argv, NULL);
324 const char *filename,
328 info_t *pcdp_summary)
333 unsigned long rra_begin; /* byte pointer to the rra
334 * area in the rrd file. this
335 * pointer never changes value */
336 unsigned long rra_current; /* byte pointer to the current write
337 * spot in the rrd file. */
338 rrd_value_t *pdp_new; /* prepare the incoming data to be added
339 * to the existing entry */
340 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
341 * to the cdp values */
343 long *tmpl_idx; /* index representing the settings
344 * transported by the tmplt index */
345 unsigned long tmpl_cnt = 2; /* time and data */
347 time_t current_time = 0;
348 unsigned long current_time_usec = 0; /* microseconds part of current time */
350 int schedule_smooth = 0;
352 /* number of elapsed PDP steps since last update */
353 unsigned long *rra_step_cnt = NULL;
355 int version; /* rrd version */
356 rrd_file_t *rrd_file;
357 char *arg_copy; /* for processing the argv */
359 /* need at least 1 arguments: data. */
361 rrd_set_error("Not enough arguments");
365 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
368 /* We are now at the beginning of the rra's */
369 rra_current = rra_begin = rrd_file->header_len;
371 version = atoi(rrd.stat_head->version);
373 initialize_time(¤t_time, ¤t_time_usec, version);
375 /* get exclusive lock to whole file.
376 * lock gets removed when we close the file.
378 if (LockRRD(rrd_file->fd) != 0) {
379 rrd_set_error("could not lock RRD");
383 if (allocate_data_structures(&rrd, &updvals,
384 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
385 &rra_step_cnt, &pdp_new) == -1) {
389 /* loop through the arguments. */
390 for (arg_i = 0; arg_i < argc; arg_i++) {
391 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
392 rrd_set_error("failed duplication argv entry");
395 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
396 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
397 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary,
398 version, &schedule_smooth) == -1) {
407 /* if we got here and if there is an error and if the file has not been
408 * written to, then close things up and return. */
409 if (rrd_test_error()) {
410 goto err_free_structures;
414 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
415 goto err_free_structures;
419 /* calling the smoothing code here guarantees at most one smoothing
420 * operation per rrd_update call. Unfortunately, it is possible with bulk
421 * updates, or a long-delayed update for smoothing to occur off-schedule.
422 * This really isn't critical except during the burn-in cycles. */
423 if (schedule_smooth) {
424 smooth_all_rras(&rrd, rrd_file, rra_begin);
427 /* rrd_dontneed(rrd_file,&rrd); */
451 * get exclusive lock to whole file.
452 * lock gets removed when we close the file
454 * returns 0 on success
462 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
465 if (_fstat(in_file, &st) == 0) {
466 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
473 lock.l_type = F_WRLCK; /* exclusive write lock */
474 lock.l_len = 0; /* whole file */
475 lock.l_start = 0; /* start of file */
476 lock.l_whence = SEEK_SET; /* end of file */
478 rcstat = fcntl(in_file, F_SETLK, &lock);
486 * Allocate some important arrays used, and initialize the template.
488 * When it returns, either all of the structures are allocated
489 * or none of them are.
491 * Returns 0 on success, -1 on error.
493 static int allocate_data_structures(
494 rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt,
495 long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt,
496 rrd_value_t **pdp_new)
499 if ((*updvals = (char **)malloc(sizeof(char *)
500 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
501 rrd_set_error("allocating updvals pointer array");
505 if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
506 * rrd->stat_head->ds_cnt)) == NULL) {
507 rrd_set_error("allocating pdp_temp ...");
508 goto err_free_updvals;
511 if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
512 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513 rrd_set_error("allocating tmpl_idx ...");
514 goto err_free_pdp_temp;
516 if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long)
517 * (rrd->stat_head->rra_cnt))) == NULL) {
518 rrd_set_error("allocating rra_step_cnt...");
519 goto err_free_tmpl_idx;
522 /* initialize tmplt redirector */
523 /* default config example (assume DS 1 is a CDEF DS)
524 tmpl_idx[0] -> 0; (time)
525 tmpl_idx[1] -> 1; (DS 0)
526 tmpl_idx[2] -> 3; (DS 2)
527 tmpl_idx[3] -> 4; (DS 3) */
528 (*tmpl_idx)[0] = 0; /* time */
529 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
530 if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
531 (*tmpl_idx)[ii++] = i;
536 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
537 goto err_free_tmpl_idx;
541 if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
542 * rrd->stat_head->ds_cnt)) == NULL) {
543 rrd_set_error("allocating pdp_new ...");
544 goto err_free_tmpl_idx;
559 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
561 * Returns 0 on success.
563 static int parse_template(
564 rrd_t *rrd, const char *tmplt,
565 unsigned long *tmpl_cnt, long *tmpl_idx)
567 char *dsname, *tmplt_copy;
568 unsigned int tmpl_len, i;
570 *tmpl_cnt = 1; /* the first entry is the time */
572 /* we should work on a writeable copy here */
573 if ((tmplt_copy = strdup(tmplt)) == NULL) {
574 rrd_set_error("error copying tmplt '%s'", tmplt);
579 tmpl_len = strlen(tmplt_copy);
580 for (i = 0; i <= tmpl_len; i++) {
581 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
582 tmplt_copy[i] = '\0';
583 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
584 rrd_set_error("tmplt contains more DS definitions than RRD");
588 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
589 rrd_set_error("unknown DS name '%s'", dsname);
593 /* go to the next entry on the tmplt_copy */
595 dsname = &tmplt_copy[i+1];
603 * Parse an update string, updates the primary data points (PDPs)
604 * and consolidated data points (CDPs), and writes changes to the RRAs.
606 * Returns 0 on success, -1 on error.
608 static int process_arg(
611 rrd_file_t *rrd_file,
612 unsigned long rra_begin,
613 unsigned long *rra_current,
614 time_t *current_time,
615 unsigned long *current_time_usec,
616 rrd_value_t *pdp_temp,
617 rrd_value_t *pdp_new,
618 unsigned long *rra_step_cnt,
621 unsigned long tmpl_cnt,
622 info_t **pcdp_summary,
624 int *schedule_smooth)
626 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
628 /* a vector of future Holt-Winters seasonal coefs */
629 unsigned long elapsed_pdp_st;
631 double interval, pre_int, post_int; /* interval between this and
633 unsigned long proc_pdp_cnt;
634 unsigned long rra_start;
636 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
637 current_time, current_time_usec, version) == -1) {
640 /* seek to the beginning of the rra's */
641 if (*rra_current != rra_begin) {
643 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
644 rrd_set_error("seek error in rrd");
648 *rra_current = rra_begin;
650 rra_start = rra_begin;
652 interval = (double) (*current_time - rrd->live_head->last_up)
653 + (double) ((long) *current_time_usec -
654 (long) rrd->live_head->last_up_usec) / 1e6f;
656 /* process the data sources and update the pdp_prep
657 * area accordingly */
658 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
662 elapsed_pdp_st = calculate_elapsed_steps(rrd,
663 *current_time, *current_time_usec,
664 interval, &pre_int, &post_int,
667 /* has a pdp_st moment occurred since the last run ? */
668 if (elapsed_pdp_st == 0) {
669 /* no we have not passed a pdp_st moment. therefore update is simple */
670 simple_update(rrd, interval, pdp_new);
672 /* an pdp_st has occurred. */
673 if (process_all_pdp_st(rrd, interval,
676 pdp_new, pdp_temp) == -1)
680 if (update_all_cdp_prep(rrd, rra_step_cnt,
686 pdp_temp, rra_current,
687 schedule_smooth) == -1)
689 goto err_free_coefficients;
691 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
692 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
694 goto err_free_coefficients;
696 if (write_to_rras(rrd, rrd_file,
697 rra_step_cnt, rra_begin, rra_current,
698 *current_time, pcdp_summary) == -1)
700 goto err_free_coefficients;
702 } /* endif a pdp_st has occurred */
703 rrd->live_head->last_up = *current_time;
704 rrd->live_head->last_up_usec = *current_time_usec;
707 free(last_seasonal_coef);
710 err_free_coefficients:
712 free(last_seasonal_coef);
717 * Parse a DS string (time + colon-separated values), storing the
718 * results in current_time, current_time_usec, and updvals.
720 * Returns 0 on success, -1 on error.
723 rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
724 unsigned long tmpl_cnt, time_t *current_time,
725 unsigned long *current_time_usec, int version)
732 /* initialize all ds input to unknown except the first one
733 which has always got to be set */
734 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
737 /* separate all ds elements; first must be examined separately
738 due to alternate time syntax */
739 if ((p = strchr(input, '@')) != NULL) {
741 } else if ((p = strchr(input, ':')) != NULL) {
744 rrd_set_error("expected timestamp not found in data source from %s",
750 updvals[tmpl_idx[i++]] = p+1;
755 updvals[tmpl_idx[i++]] = p+1;
761 rrd_set_error("expected %lu data source readings (got %lu) from %s",
762 tmpl_cnt - 1, i, input);
766 if (get_time_from_reading(rrd, timesyntax, updvals,
767 current_time, current_time_usec,
775 * Parse the time in a DS string, store it in current_time and
776 * current_time_usec and verify that it's later than the last
777 * update for this DS.
779 * Returns 0 on success, -1 on error.
781 static int get_time_from_reading(
782 rrd_t *rrd, char timesyntax, char **updvals,
783 time_t *current_time, unsigned long *current_time_usec,
787 char *parsetime_error = NULL;
789 struct rrd_time_value ds_tv;
790 struct timeval tmp_time; /* used for time conversion */
792 /* get the time from the reading ... handle N */
793 if (timesyntax == '@') { /* at-style */
794 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
795 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
798 if (ds_tv.type == RELATIVE_TO_END_TIME ||
799 ds_tv.type == RELATIVE_TO_START_TIME) {
800 rrd_set_error("specifying time relative to the 'start' "
801 "or 'end' makes no sense here: %s", updvals[0]);
804 *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
805 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
806 } else if (strcmp(updvals[0], "N") == 0) {
807 gettimeofday(&tmp_time, 0);
808 normalize_time(&tmp_time);
809 *current_time = tmp_time.tv_sec;
810 *current_time_usec = tmp_time.tv_usec;
812 old_locale = setlocale(LC_NUMERIC, "C");
813 tmp = strtod(updvals[0], 0);
814 setlocale(LC_NUMERIC, old_locale);
815 *current_time = floor(tmp);
816 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
818 /* dont do any correction for old version RRDs */
820 *current_time_usec = 0;
822 if (*current_time < rrd->live_head->last_up ||
823 (*current_time == rrd->live_head->last_up &&
824 (long) *current_time_usec <=
825 (long) rrd->live_head->last_up_usec)) {
826 rrd_set_error("illegal attempt to update using time %ld when "
827 "last update time is %ld (minimum one second step)",
828 *current_time, rrd->live_head->last_up);
835 * Update pdp_new by interpreting the updvals according to the DS type
836 * (COUNTER, GAUGE, etc.).
838 * Returns 0 on success, -1 on error.
840 static int update_pdp_prep(
841 rrd_t *rrd, char **updvals,
842 rrd_value_t *pdp_new, double interval)
844 unsigned long ds_idx;
846 char *endptr; /* used in the conversion */
851 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
852 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
854 /* make sure we do not build diffs with old last_ds values */
855 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
856 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
857 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
860 /* NOTE: DST_CDEF should never enter this if block, because
861 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
862 * accidently specified a value for the DST_CDEF. To handle this case,
863 * an extra check is required. */
865 if ((updvals[ds_idx+1][0] != 'U') &&
866 (dst_idx != DST_CDEF) &&
867 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
870 /* pdp_new contains rate * time ... eg the bytes transferred during
871 * the interval. Doing it this way saves a lot of math operations
876 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
877 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9')
878 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
879 rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
883 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
884 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
885 if (dst_idx == DST_COUNTER) {
886 /* simple overflow catcher. This will fail
887 * terribly for non 32 or 64 bit counters
888 * ... are there any others in SNMP land?
890 if (pdp_new[ds_idx] < (double) 0.0)
891 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
892 if (pdp_new[ds_idx] < (double) 0.0)
893 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
895 rate = pdp_new[ds_idx] / interval;
897 pdp_new[ds_idx] = DNAN;
901 old_locale = setlocale(LC_NUMERIC, "C");
903 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
904 setlocale(LC_NUMERIC, old_locale);
906 rrd_set_error("converting '%s' to float: %s",
907 updvals[ds_idx + 1], rrd_strerror(errno));
910 if (endptr[0] != '\0') {
911 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
912 updvals[ds_idx + 1], endptr);
915 rate = pdp_new[ds_idx] / interval;
919 old_locale = setlocale(LC_NUMERIC, "C");
920 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
921 setlocale(LC_NUMERIC, old_locale);
923 rrd_set_error("converting '%s' to float: %s",
924 updvals[ds_idx + 1], rrd_strerror(errno));
927 if (endptr[0] != '\0') {
928 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
929 updvals[ds_idx + 1], endptr);
932 rate = pdp_new[ds_idx] / interval;
935 rrd_set_error("rrd contains unknown DS type : '%s'",
936 rrd->ds_def[ds_idx].dst);
939 /* break out of this for loop if the error string is set */
940 if (rrd_test_error()) {
943 /* make sure pdp_temp is neither too large or too small
944 * if any of these occur it becomes unknown ...
947 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
948 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
949 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
950 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
951 pdp_new[ds_idx] = DNAN;
954 /* no news is news all the same */
955 pdp_new[ds_idx] = DNAN;
959 /* make a copy of the command line argument for the next run */
961 fprintf(stderr, "prep ds[%lu]\t"
965 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
967 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
968 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
974 * How many PDP steps have elapsed since the last update? Returns the answer,
975 * and stores the time between the last update and the last PDP in pre_time,
976 * and the time between the last PDP and the current time in post_int.
978 static int calculate_elapsed_steps(
980 unsigned long current_time,
981 unsigned long current_time_usec,
985 unsigned long *proc_pdp_cnt)
988 unsigned long proc_pdp_st; /* which pdp_st was the last
990 unsigned long occu_pdp_st; /* when was the pdp_st
991 * before the last update
993 unsigned long proc_pdp_age; /* how old was the data in
994 * the pdp prep area when it
995 * was last updated */
996 unsigned long occu_pdp_age; /* how long ago was the last
999 /* when was the current pdp started */
1000 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1001 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1003 /* when did the last pdp_st occur */
1004 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1005 occu_pdp_st = current_time - occu_pdp_age;
1007 if (occu_pdp_st > proc_pdp_st) {
1008 /* OK we passed the pdp_st moment */
1009 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1010 * occurred before the latest
1012 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1013 *post_int = occu_pdp_age; /* how much after it */
1014 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1016 *pre_int = interval;
1020 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1023 printf("proc_pdp_age %lu\t"
1025 "occu_pfp_age %lu\t"
1029 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1030 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1033 /* compute the number of elapsed pdp_st moments */
1034 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1038 * Increment the PDP values by the values in pdp_new, or else initialize them.
1040 static void simple_update(
1041 rrd_t *rrd, double interval, rrd_value_t *pdp_new)
1044 for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1045 if (isnan(pdp_new[i])) {
1046 /* this is not really accurate if we use subsecond data arrival time
1047 should have thought of it when going subsecond resolution ...
1048 sorry next format change we will have it! */
1049 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1051 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1052 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1054 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1063 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1064 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1070 * Call process_pdp_st for each DS.
1072 * Returns 0 on success, -1 on error.
1074 static int process_all_pdp_st(
1075 rrd_t *rrd, double interval, double pre_int, double post_int,
1076 unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1078 unsigned long ds_idx;
1079 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1080 rate*seconds which occurred up to the last run.
1081 pdp_new[] contains rate*seconds from the latest run.
1082 pdp_temp[] will contain the rate for cdp */
1084 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1085 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1086 elapsed_pdp_st * rrd->stat_head->pdp_step,
1087 pdp_new, pdp_temp) == -1) {
1091 fprintf(stderr, "PDP UPD ds[%lu]\t"
1094 "new_unkn_sec %5lu\n",
1095 ds_idx, pdp_temp[ds_idx],
1096 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1097 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1104 * Process an update that occurs after one of the PDP moments.
1105 * Increments the PDP value, sets NAN if time greater than the
1106 * heartbeats have elapsed, processes CDEFs.
1108 * Returns 0 on success, -1 on error.
1110 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
1111 double pre_int, double post_int, long diff_pdp_st,
1112 rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1115 /* update pdp_prep to the current pdp_st. */
1116 double pre_unknown = 0.0;
1117 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1118 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1120 rpnstack_t rpnstack; /* used for COMPUTE DS */
1121 rpnstack_init(&rpnstack);
1124 if (isnan(pdp_new[ds_idx])) {
1125 /* a final bit of unknown to be added bevore calculation
1126 we use a temporary variable for this so that we
1127 don't have to turn integer lines before using the value */
1128 pre_unknown = pre_int;
1130 if (isnan(scratch[PDP_val].u_val)) {
1131 scratch[PDP_val].u_val = 0;
1133 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1136 /* if too much of the pdp_prep is unknown we dump it */
1137 /* if the interval is larger thatn mrhb we get NAN */
1138 if ((interval > mrhb) ||
1139 (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1140 pdp_temp[ds_idx] = DNAN;
1142 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1143 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1146 /* process CDEF data sources; remember each CDEF DS can
1147 * only reference other DS with a lower index number */
1148 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1151 rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1152 /* substitute data values for OP_VARIABLE nodes */
1153 for (i = 0; rpnp[i].op != OP_END; i++) {
1154 if (rpnp[i].op == OP_VARIABLE) {
1155 rpnp[i].op = OP_NUMBER;
1156 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1159 /* run the rpn calculator */
1160 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1162 rpnstack_free(&rpnstack);
1167 /* make pdp_prep ready for the next run */
1168 if (isnan(pdp_new[ds_idx])) {
1169 /* this is not realy accurate if we use subsecond data arival time
1170 should have thought of it when going subsecond resolution ...
1171 sorry next format change we will have it! */
1172 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1173 scratch[PDP_val].u_val = DNAN;
1175 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1176 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1178 rpnstack_free(&rpnstack);
1183 * Iterate over all the RRAs for a given DS and:
1184 * 1. Decide whether to schedule a smooth later
1185 * 2. Shift the seasonal array if it's a bulk update
1188 * Returns 0 on success, -1 on error
1190 static int update_all_cdp_prep(
1191 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
1192 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1193 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
1194 rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth)
1196 unsigned long rra_idx;
1197 /* index into the CDP scratch array */
1198 enum cf_en current_cf;
1199 unsigned long rra_start;
1200 /* number of rows to be updated in an RRA for a data value. */
1201 unsigned long start_pdp_offset;
1203 rra_start = rra_begin;
1204 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1205 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1206 start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1207 if (start_pdp_offset <= elapsed_pdp_st) {
1208 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1209 rrd->rra_def[rra_idx].pdp_cnt + 1;
1211 rra_step_cnt[rra_idx] = 0;
1214 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1215 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1216 * so that they will be correct for the next observed value; note that for
1217 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1218 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1219 if (rra_step_cnt[rra_idx] > 2) {
1220 /* skip update by resetting rra_step_cnt[rra_idx], note that this is not data
1221 * source specific; this is due to the bulk update, not a DNAN value
1222 * for the specific data source. */
1223 rra_step_cnt[rra_idx] = 0;
1224 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1225 elapsed_pdp_st, last_seasonal_coef);
1226 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1227 elapsed_pdp_st + 1, seasonal_coef);
1229 /* periodically run a smoother for seasonal effects */
1230 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1232 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1233 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1234 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1236 *schedule_smooth = 1;
1238 *rra_current = rrd_tell(rrd_file);
1240 /* if cf is DEVSEASONAL or SEASONAL */
1241 if (rrd_test_error())
1244 if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt,
1245 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef,
1246 current_cf) == -1) {
1249 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1255 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1257 static int do_schedule_smooth(
1258 rrd_t *rrd, unsigned long rra_idx,
1259 unsigned long elapsed_pdp_st)
1261 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1262 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1263 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1264 unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1265 unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1267 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1268 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1269 * really an RRA level, not a data source within RRA level parameter, but
1270 * the rra_def is read only for rrd_update (not flushed to disk). */
1271 if (*init_seasonal > BURNIN_CYCLES) {
1272 /* someone has no doubt invented a trick to deal with this wrap around,
1273 * but at least this code is clear. */
1274 if (seasonal_smooth_idx > cur_row) {
1275 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1276 * between PDP and CDP */
1277 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1279 /* can't rely on negative numbers because we are working with
1280 * unsigned values */
1281 return (cur_row + elapsed_pdp_st >= row_cnt
1282 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1284 /* mark off one of the burn-in cycles */
1285 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1289 * For a given RRA, iterate over the data sources and call the appropriate
1290 * consolidation function.
1292 * Returns 0 on success, -1 on error.
1294 static int update_cdp_prep(
1296 unsigned long elapsed_pdp_st,
1297 unsigned long start_pdp_offset,
1298 unsigned long *rra_step_cnt,
1300 rrd_value_t *pdp_temp,
1301 rrd_value_t *last_seasonal_coef,
1302 rrd_value_t *seasonal_coef,
1305 unsigned long ds_idx, cdp_idx;
1306 /* update CDP_PREP areas */
1307 /* loop over data soures within each RRA */
1308 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1310 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1312 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1313 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1314 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1315 elapsed_pdp_st, start_pdp_offset,
1316 rrd->rra_def[rra_idx].pdp_cnt,
1317 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1319 /* Nothing to consolidate if there's one PDP per CDP. However, if
1320 * we've missed some PDPs, let's update null counters etc. */
1321 if (elapsed_pdp_st > 2) {
1322 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef,
1323 rra_idx, ds_idx, cdp_idx, current_cf);
1327 if (rrd_test_error())
1329 } /* endif data sources loop */
1334 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1335 * primary value, secondary value, and # of unknowns.
1337 static void update_cdp(
1340 rrd_value_t pdp_temp_val,
1341 unsigned long rra_step_cnt,
1342 unsigned long elapsed_pdp_st,
1343 unsigned long start_pdp_offset,
1344 unsigned long pdp_cnt,
1348 /* shorthand variables */
1349 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1350 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1351 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1352 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1355 /* If we are in this block, as least 1 CDP value will be written to
1356 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1357 * to be written, then the "fill in" value is the CDP_secondary_val
1359 if (isnan(pdp_temp_val)) {
1360 *cdp_unkn_pdp_cnt += start_pdp_offset;
1361 *cdp_secondary_val = DNAN;
1363 /* CDP_secondary value is the RRA "fill in" value for intermediary
1364 * CDP data entries. No matter the CF, the value is the same because
1365 * the average, max, min, and last of a list of identical values is
1366 * the same, namely, the value itself. */
1367 *cdp_secondary_val = pdp_temp_val;
1370 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1371 *cdp_primary_val = DNAN;
1372 if (current_cf == CF_AVERAGE) {
1373 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1374 start_pdp_offset, pdp_cnt);
1376 *cdp_val = pdp_temp_val;
1379 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1380 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1381 } /* endif meets xff value requirement for a valid value */
1382 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1383 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1384 if (isnan(pdp_temp_val))
1385 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1387 *cdp_unkn_pdp_cnt = 0;
1388 } else { /* rra_step_cnt[i] == 0 */
1391 if (isnan(*cdp_val)) {
1392 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1395 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1399 if (isnan(pdp_temp_val)) {
1400 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1402 *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1408 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1409 * on the type of consolidation function.
1411 static void initialize_cdp_val(
1414 rrd_value_t pdp_temp_val,
1415 unsigned long elapsed_pdp_st,
1416 unsigned long start_pdp_offset,
1417 unsigned long pdp_cnt)
1419 rrd_value_t cum_val, cur_val;
1421 switch (current_cf) {
1423 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1424 cur_val = IFDNAN(pdp_temp_val, 0.0);
1425 scratch[CDP_primary_val].u_val =
1426 (cum_val + cur_val * start_pdp_offset) /
1427 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1428 scratch[CDP_val].u_val = initialize_average_carry_over(
1429 pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1432 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1433 cur_val = IFDNAN(pdp_temp_val, -DINF);
1436 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1438 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1444 if (cur_val > cum_val)
1445 scratch[CDP_primary_val].u_val = cur_val;
1447 scratch[CDP_primary_val].u_val = cum_val;
1448 /* initialize carry over value */
1449 scratch[CDP_val].u_val = pdp_temp_val;
1452 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1453 cur_val = IFDNAN(pdp_temp_val, DINF);
1456 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1457 fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1463 if (cur_val < cum_val)
1464 scratch[CDP_primary_val].u_val = cur_val;
1466 scratch[CDP_primary_val].u_val = cum_val;
1467 /* initialize carry over value */
1468 scratch[CDP_val].u_val = pdp_temp_val;
1472 scratch[CDP_primary_val].u_val = pdp_temp_val;
1473 /* initialize carry over value */
1474 scratch[CDP_val].u_val = pdp_temp_val;
1480 * Update the consolidation function for Holt-Winters functions as
1481 * well as other functions that don't actually consolidate multiple
1484 static void reset_cdp(
1486 unsigned long elapsed_pdp_st,
1487 rrd_value_t *pdp_temp,
1488 rrd_value_t *last_seasonal_coef,
1489 rrd_value_t *seasonal_coef,
1490 int rra_idx, int ds_idx, int cdp_idx,
1491 enum cf_en current_cf)
1493 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1495 switch (current_cf) {
1498 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1499 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1502 case CF_DEVSEASONAL:
1503 /* need to update cached seasonal values, so they are consistent
1504 * with the bulk update */
1505 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1506 * CDP_last_deviation are the same. */
1507 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1508 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1512 /* need to update the null_count and last_null_count.
1513 * even do this for non-DNAN pdp_temp because the
1514 * algorithm is not learning from batch updates. */
1515 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1516 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1519 scratch[CDP_primary_val].u_val = DNAN;
1520 scratch[CDP_secondary_val].u_val = DNAN;
1523 /* do not count missed bulk values as failures */
1524 scratch[CDP_primary_val].u_val = 0;
1525 scratch[CDP_secondary_val].u_val = 0;
1526 /* need to reset violations buffer.
1527 * could do this more carefully, but for now, just
1528 * assume a bulk update wipes away all violations. */
1529 erase_violations(rrd, cdp_idx, rra_idx);
1534 static rrd_value_t initialize_average_carry_over(
1535 rrd_value_t pdp_temp_val,
1536 unsigned long elapsed_pdp_st,
1537 unsigned long start_pdp_offset,
1538 unsigned long pdp_cnt)
1540 /* initialize carry over value */
1541 if (isnan(pdp_temp_val)) {
1544 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1548 * Update or initialize a CDP value based on the consolidation
1551 * Returns the new value.
1553 static rrd_value_t calculate_cdp_val(
1554 rrd_value_t cdp_val,
1555 rrd_value_t pdp_temp_val,
1556 unsigned long elapsed_pdp_st,
1557 int current_cf, int i, int ii)
1559 if (isnan(cdp_val)) {
1560 if (current_cf == CF_AVERAGE) {
1561 pdp_temp_val *= elapsed_pdp_st;
1564 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1565 i, ii, pdp_temp_val);
1567 return pdp_temp_val;
1569 if (current_cf == CF_AVERAGE)
1570 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1571 if (current_cf == CF_MINIMUM)
1572 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1573 if (current_cf == CF_MAXIMUM)
1574 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1576 return pdp_temp_val;
1580 * For each RRA, update the seasonal values and then call update_aberrant_CF
1581 * for each data source.
1583 * Return 0 on success, -1 on error.
1585 static int update_aberrant_cdps(
1586 rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1587 unsigned long *rra_current, unsigned long elapsed_pdp_st,
1588 rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef)
1590 unsigned long rra_idx, ds_idx, j;
1592 /* number of PDP steps since the last update that
1593 * are assigned to the first CDP to be generated
1594 * since the last update. */
1595 unsigned short scratch_idx;
1596 unsigned long rra_start;
1597 enum cf_en current_cf;
1599 /* this loop is only entered if elapsed_pdp_st < 3 */
1600 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1601 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1602 rra_start = rra_begin;
1603 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1604 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1605 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1606 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1607 if (scratch_idx == CDP_primary_val) {
1608 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1609 elapsed_pdp_st + 1, seasonal_coef);
1611 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1612 elapsed_pdp_st + 2, seasonal_coef);
1614 *rra_current = rrd_tell(rrd_file);
1616 if (rrd_test_error())
1618 /* loop over data soures within each RRA */
1619 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1620 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1621 rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1622 rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1625 rra_start += rrd->rra_def[rra_idx].row_cnt
1626 * rrd->stat_head->ds_cnt
1627 * sizeof(rrd_value_t);
1634 * Move sequentially through the file, writing one RRA at a time. Note this
1635 * architecture divorces the computation of CDP with flushing updated RRA
1638 * Return 0 on success, -1 on error.
1640 static int write_to_rras(
1642 rrd_file_t *rrd_file,
1643 unsigned long *rra_step_cnt,
1644 unsigned long rra_begin,
1645 unsigned long *rra_current,
1646 time_t current_time,
1647 info_t **pcdp_summary)
1649 unsigned long rra_idx;
1650 unsigned long rra_start;
1651 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1652 /* number of PDP steps since the last update that
1653 * are assigned to the first CDP to be generated
1654 * since the last update. */
1655 unsigned short scratch_idx;
1656 time_t rra_time = 0; /* time of update for a RRA */
1658 /* Ready to write to disk */
1659 rra_start = rra_begin;
1660 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1661 /* skip unless there's something to write */
1662 if (rra_step_cnt[rra_idx]) {
1663 /* write the first row */
1665 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1667 rrd->rra_ptr[rra_idx].cur_row++;
1668 if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1669 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1670 /* positition on the first row */
1671 rra_pos_tmp = rra_start +
1672 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1673 sizeof(rrd_value_t);
1674 if (rra_pos_tmp != *rra_current) {
1675 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1676 rrd_set_error("seek error in rrd");
1679 *rra_current = rra_pos_tmp;
1682 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1684 scratch_idx = CDP_primary_val;
1685 if (*pcdp_summary != NULL) {
1686 rra_time = (current_time - current_time
1687 % (rrd->rra_def[rra_idx].pdp_cnt *
1688 rrd->stat_head->pdp_step))
1689 - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1690 rrd->stat_head->pdp_step);
1692 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, scratch_idx,
1693 pcdp_summary, &rra_time) == -1)
1695 if (rrd_test_error())
1698 /* write other rows of the bulk update, if any */
1699 scratch_idx = CDP_secondary_val;
1700 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1701 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1704 "Wraparound for RRA %s, %lu updates left\n",
1705 rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1708 rrd->rra_ptr[rra_idx].cur_row = 0;
1709 /* seek back to beginning of current rra */
1710 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1711 rrd_set_error("seek error in rrd");
1715 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1718 *rra_current = rra_start;
1720 if (*pcdp_summary != NULL) {
1721 rra_time = (current_time - current_time
1722 % (rrd->rra_def[rra_idx].pdp_cnt *
1723 rrd->stat_head->pdp_step))
1725 ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1726 rrd->stat_head->pdp_step);
1728 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1729 scratch_idx, pcdp_summary, &rra_time) == -1)
1733 if (rrd_test_error())
1736 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1737 sizeof(rrd_value_t);
1744 * Write out one row of values (one value per DS) to the archive.
1746 * Returns 0 on success, -1 on error.
1748 static int write_RRA_row(
1749 rrd_file_t *rrd_file,
1751 unsigned long rra_idx,
1752 unsigned long *rra_current,
1753 unsigned short CDP_scratch_idx,
1754 info_t **pcdp_summary,
1757 unsigned long ds_idx, cdp_idx;
1760 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1761 /* compute the cdp index */
1762 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1764 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1765 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1766 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1768 if (pcdp_summary != NULL) {
1769 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1770 /* append info to the return hash */
1771 *pcdp_summary = info_push(*pcdp_summary,
1772 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", *rra_time,
1773 rrd->rra_def[rra_idx].cf_nam,
1774 rrd->rra_def[rra_idx].pdp_cnt,
1775 rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1777 if (rrd_write(rrd_file,
1778 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1779 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1780 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1783 *rra_current += sizeof(rrd_value_t);
1789 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1791 * Returns 0 on success, -1 otherwise
1793 static int smooth_all_rras(
1795 rrd_file_t *rrd_file,
1796 unsigned long rra_begin)
1798 unsigned long rra_start = rra_begin;
1799 unsigned long rra_idx;
1800 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1801 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1802 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1804 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1806 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1807 if (rrd_test_error())
1810 rra_start += rrd->rra_def[rra_idx].row_cnt
1811 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1818 * Flush changes to disk (unless we're using mmap)
1820 * Returns 0 on success, -1 otherwise
1822 static int write_changes_to_disk(
1823 rrd_t *rrd, rrd_file_t *rrd_file, int version)
1825 /* we just need to write back the live header portion now*/
1826 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1827 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1828 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1830 rrd_set_error("seek rrd for live header writeback");
1834 if (rrd_write(rrd_file, rrd->live_head,
1835 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1836 rrd_set_error("rrd_write live_head to rrd");
1840 if (rrd_write(rrd_file, &rrd->live_head->last_up,
1841 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1842 rrd_set_error("rrd_write live_head to rrd");
1848 if (rrd_write(rrd_file, rrd->pdp_prep,
1849 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1850 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1851 rrd_set_error("rrd_write pdp_prep to rrd");
1855 if (rrd_write(rrd_file, rrd->cdp_prep,
1856 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1857 rrd->stat_head->ds_cnt)
1858 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1859 rrd->stat_head->ds_cnt)) {
1861 rrd_set_error("rrd_write cdp_prep to rrd");
1865 if (rrd_write(rrd_file, rrd->rra_ptr,
1866 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1867 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1868 rrd_set_error("rrd_write rra_ptr to rrd");