2 /*****************************************************************************
3 * RRDtool 1.3rc2 Copyright by Tobi Oetiker, 1997-2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
8 *****************************************************************************/
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 #include <sys/timeb.h>
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
45 static int gettimeofday(
47 struct __timezone *tz)
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
62 /* FUNCTION PROTOTYPES */
76 static int allocate_data_structures(
79 rrd_value_t **pdp_temp,
82 unsigned long *tmpl_cnt,
83 unsigned long **rra_step_cnt,
84 unsigned long **skip_update,
85 rrd_value_t **pdp_new);
87 static int parse_template(
90 unsigned long *tmpl_cnt,
93 static int process_arg(
97 unsigned long rra_begin,
98 unsigned long *rra_current,
100 unsigned long *current_time_usec,
101 rrd_value_t *pdp_temp,
102 rrd_value_t *pdp_new,
103 unsigned long *rra_step_cnt,
106 unsigned long tmpl_cnt,
107 info_t **pcdp_summary,
109 unsigned long *skip_update,
110 int *schedule_smooth);
117 unsigned long tmpl_cnt,
118 time_t *current_time,
119 unsigned long *current_time_usec,
122 static int get_time_from_reading(
126 time_t *current_time,
127 unsigned long *current_time_usec,
130 static int update_pdp_prep(
133 rrd_value_t *pdp_new,
136 static int calculate_elapsed_steps(
138 unsigned long current_time,
139 unsigned long current_time_usec,
143 unsigned long *proc_pdp_cnt);
145 static void simple_update(
148 rrd_value_t *pdp_new);
150 static int process_all_pdp_st(
155 unsigned long elapsed_pdp_st,
156 rrd_value_t *pdp_new,
157 rrd_value_t *pdp_temp);
159 static int process_pdp_st(
161 unsigned long ds_idx,
166 rrd_value_t *pdp_new,
167 rrd_value_t *pdp_temp);
169 static int update_all_cdp_prep(
171 unsigned long *rra_step_cnt,
172 unsigned long rra_begin,
173 rrd_file_t *rrd_file,
174 unsigned long elapsed_pdp_st,
175 unsigned long proc_pdp_cnt,
176 rrd_value_t **last_seasonal_coef,
177 rrd_value_t **seasonal_coef,
178 rrd_value_t *pdp_temp,
179 unsigned long *rra_current,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
199 static void update_cdp(
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
211 static void initialize_cdp_val(
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
244 static int update_aberrant_cdps(
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long *rra_current,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 unsigned long *rra_current,
260 unsigned long *skip_update,
261 info_t **pcdp_summary);
263 static int write_RRA_row(
264 rrd_file_t *rrd_file,
266 unsigned long rra_idx,
267 unsigned long *rra_current,
268 unsigned short CDP_scratch_idx,
269 info_t **pcdp_summary,
272 static int smooth_all_rras(
274 rrd_file_t *rrd_file,
275 unsigned long rra_begin);
278 static int write_changes_to_disk(
280 rrd_file_t *rrd_file,
285 * normalize time as returned by gettimeofday. usec part must
288 static inline void normalize_time(
291 if (t->tv_usec < 0) {
298 * Sets current_time and current_time_usec based on the current time.
299 * current_time_usec is set to 0 if the version number is 1 or 2.
301 static inline void initialize_time(
302 time_t *current_time,
303 unsigned long *current_time_usec,
306 struct timeval tmp_time; /* used for time conversion */
308 gettimeofday(&tmp_time, 0);
309 normalize_time(&tmp_time);
310 *current_time = tmp_time.tv_sec;
312 *current_time_usec = tmp_time.tv_usec;
314 *current_time_usec = 0;
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
320 info_t *rrd_update_v(
325 info_t *result = NULL;
327 struct option long_options[] = {
328 {"template", required_argument, 0, 't'},
334 opterr = 0; /* initialize getopt */
337 int option_index = 0;
340 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
351 rrd_set_error("unknown option '%s'", argv[optind - 1]);
356 /* need at least 2 arguments: filename, data. */
357 if (argc - optind < 2) {
358 rrd_set_error("Not enough arguments");
362 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363 rc.u_int = _rrd_update(argv[optind], tmplt,
365 (const char **) (argv + optind + 1), result);
366 result->value.u_int = rc.u_int;
375 struct option long_options[] = {
376 {"template", required_argument, 0, 't'},
379 int option_index = 0;
385 opterr = 0; /* initialize getopt */
388 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
395 tmplt = strdup(optarg);
399 rrd_set_error("unknown option '%s'", argv[optind - 1]);
404 /* need at least 2 arguments: filename, data. */
405 if (argc - optind < 2) {
406 rrd_set_error("Not enough arguments");
410 rc = rrd_update_r(argv[optind], tmplt,
411 argc - optind - 1, (const char **) (argv + optind + 1));
418 const char *filename,
423 return _rrd_update(filename, tmplt, argc, argv, NULL);
427 const char *filename,
431 info_t *pcdp_summary)
436 unsigned long rra_begin; /* byte pointer to the rra
437 * area in the rrd file. this
438 * pointer never changes value */
439 unsigned long rra_current; /* byte pointer to the current write
440 * spot in the rrd file. */
441 rrd_value_t *pdp_new; /* prepare the incoming data to be added
442 * to the existing entry */
443 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
444 * to the cdp values */
446 long *tmpl_idx; /* index representing the settings
447 * transported by the tmplt index */
448 unsigned long tmpl_cnt = 2; /* time and data */
450 time_t current_time = 0;
451 unsigned long current_time_usec = 0; /* microseconds part of current time */
453 int schedule_smooth = 0;
455 /* number of elapsed PDP steps since last update */
456 unsigned long *rra_step_cnt = NULL;
458 int version; /* rrd version */
459 rrd_file_t *rrd_file;
460 char *arg_copy; /* for processing the argv */
461 unsigned long *skip_update; /* RRAs to advance but not write */
463 /* need at least 1 arguments: data. */
465 rrd_set_error("Not enough arguments");
469 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
472 /* We are now at the beginning of the rra's */
473 rra_current = rra_begin = rrd_file->header_len;
475 version = atoi(rrd.stat_head->version);
477 initialize_time(¤t_time, ¤t_time_usec, version);
479 /* get exclusive lock to whole file.
480 * lock gets removed when we close the file.
482 if (LockRRD(rrd_file->fd) != 0) {
483 rrd_set_error("could not lock RRD");
487 if (allocate_data_structures(&rrd, &updvals,
488 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489 &rra_step_cnt, &skip_update,
494 /* loop through the arguments. */
495 for (arg_i = 0; arg_i < argc; arg_i++) {
496 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497 rrd_set_error("failed duplication argv entry");
500 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
502 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503 &pcdp_summary, version, skip_update,
504 &schedule_smooth) == -1) {
513 /* if we got here and if there is an error and if the file has not been
514 * written to, then close things up and return. */
515 if (rrd_test_error()) {
516 goto err_free_structures;
519 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
520 goto err_free_structures;
524 /* calling the smoothing code here guarantees at most one smoothing
525 * operation per rrd_update call. Unfortunately, it is possible with bulk
526 * updates, or a long-delayed update for smoothing to occur off-schedule.
527 * This really isn't critical except during the burn-in cycles. */
528 if (schedule_smooth) {
529 smooth_all_rras(&rrd, rrd_file, rra_begin);
532 /* rrd_dontneed(rrd_file,&rrd); */
558 * get exclusive lock to whole file.
559 * lock gets removed when we close the file
561 * returns 0 on success
569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
572 if (_fstat(in_file, &st) == 0) {
573 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
580 lock.l_type = F_WRLCK; /* exclusive write lock */
581 lock.l_len = 0; /* whole file */
582 lock.l_start = 0; /* start of file */
583 lock.l_whence = SEEK_SET; /* end of file */
585 rcstat = fcntl(in_file, F_SETLK, &lock);
593 * Allocate some important arrays used, and initialize the template.
595 * When it returns, either all of the structures are allocated
596 * or none of them are.
598 * Returns 0 on success, -1 on error.
600 static int allocate_data_structures(
603 rrd_value_t **pdp_temp,
606 unsigned long *tmpl_cnt,
607 unsigned long **rra_step_cnt,
608 unsigned long **skip_update,
609 rrd_value_t **pdp_new)
612 if ((*updvals = (char **) malloc(sizeof(char *)
613 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
614 rrd_set_error("allocating updvals pointer array.");
617 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
618 * rrd->stat_head->ds_cnt)) ==
620 rrd_set_error("allocating pdp_temp.");
621 goto err_free_updvals;
623 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
625 rrd->stat_head->rra_cnt)) ==
627 rrd_set_error("allocating skip_update.");
628 goto err_free_pdp_temp;
630 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
631 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
632 rrd_set_error("allocating tmpl_idx.");
633 goto err_free_skip_update;
635 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
638 rra_cnt))) == NULL) {
639 rrd_set_error("allocating rra_step_cnt.");
640 goto err_free_tmpl_idx;
643 /* initialize tmplt redirector */
644 /* default config example (assume DS 1 is a CDEF DS)
645 tmpl_idx[0] -> 0; (time)
646 tmpl_idx[1] -> 1; (DS 0)
647 tmpl_idx[2] -> 3; (DS 2)
648 tmpl_idx[3] -> 4; (DS 3) */
649 (*tmpl_idx)[0] = 0; /* time */
650 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
651 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
652 (*tmpl_idx)[ii++] = i;
657 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
658 goto err_free_rra_step_cnt;
662 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
663 * rrd->stat_head->ds_cnt)) == NULL) {
664 rrd_set_error("allocating pdp_new.");
665 goto err_free_rra_step_cnt;
670 err_free_rra_step_cnt:
674 err_free_skip_update:
684 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
686 * Returns 0 on success.
688 static int parse_template(
691 unsigned long *tmpl_cnt,
694 char *dsname, *tmplt_copy;
695 unsigned int tmpl_len, i;
698 *tmpl_cnt = 1; /* the first entry is the time */
700 /* we should work on a writeable copy here */
701 if ((tmplt_copy = strdup(tmplt)) == NULL) {
702 rrd_set_error("error copying tmplt '%s'", tmplt);
708 tmpl_len = strlen(tmplt_copy);
709 for (i = 0; i <= tmpl_len; i++) {
710 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
711 tmplt_copy[i] = '\0';
712 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
713 rrd_set_error("tmplt contains more DS definitions than RRD");
715 goto out_free_tmpl_copy;
717 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
718 rrd_set_error("unknown DS name '%s'", dsname);
720 goto out_free_tmpl_copy;
722 /* go to the next entry on the tmplt_copy */
724 dsname = &tmplt_copy[i + 1];
734 * Parse an update string, updates the primary data points (PDPs)
735 * and consolidated data points (CDPs), and writes changes to the RRAs.
737 * Returns 0 on success, -1 on error.
739 static int process_arg(
742 rrd_file_t *rrd_file,
743 unsigned long rra_begin,
744 unsigned long *rra_current,
745 time_t *current_time,
746 unsigned long *current_time_usec,
747 rrd_value_t *pdp_temp,
748 rrd_value_t *pdp_new,
749 unsigned long *rra_step_cnt,
752 unsigned long tmpl_cnt,
753 info_t **pcdp_summary,
755 unsigned long *skip_update,
756 int *schedule_smooth)
758 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
760 /* a vector of future Holt-Winters seasonal coefs */
761 unsigned long elapsed_pdp_st;
763 double interval, pre_int, post_int; /* interval between this and
765 unsigned long proc_pdp_cnt;
766 unsigned long rra_start;
768 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769 current_time, current_time_usec, version) == -1) {
772 /* seek to the beginning of the rra's */
773 if (*rra_current != rra_begin) {
775 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
776 rrd_set_error("seek error in rrd");
780 *rra_current = rra_begin;
782 rra_start = rra_begin;
784 interval = (double) (*current_time - rrd->live_head->last_up)
785 + (double) ((long) *current_time_usec -
786 (long) rrd->live_head->last_up_usec) / 1e6f;
788 /* process the data sources and update the pdp_prep
789 * area accordingly */
790 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
794 elapsed_pdp_st = calculate_elapsed_steps(rrd,
796 *current_time_usec, interval,
800 /* has a pdp_st moment occurred since the last run ? */
801 if (elapsed_pdp_st == 0) {
802 /* no we have not passed a pdp_st moment. therefore update is simple */
803 simple_update(rrd, interval, pdp_new);
805 /* an pdp_st has occurred. */
806 if (process_all_pdp_st(rrd, interval,
808 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
811 if (update_all_cdp_prep(rrd, rra_step_cnt,
817 pdp_temp, rra_current,
818 skip_update, schedule_smooth) == -1) {
819 goto err_free_coefficients;
821 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
822 elapsed_pdp_st, pdp_temp,
823 &seasonal_coef) == -1) {
824 goto err_free_coefficients;
826 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
827 rra_current, *current_time, skip_update,
828 pcdp_summary) == -1) {
829 goto err_free_coefficients;
831 } /* endif a pdp_st has occurred */
832 rrd->live_head->last_up = *current_time;
833 rrd->live_head->last_up_usec = *current_time_usec;
836 free(last_seasonal_coef);
839 err_free_coefficients:
841 free(last_seasonal_coef);
846 * Parse a DS string (time + colon-separated values), storing the
847 * results in current_time, current_time_usec, and updvals.
849 * Returns 0 on success, -1 on error.
856 unsigned long tmpl_cnt,
857 time_t *current_time,
858 unsigned long *current_time_usec,
866 /* initialize all ds input to unknown except the first one
867 which has always got to be set */
868 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
871 /* separate all ds elements; first must be examined separately
872 due to alternate time syntax */
873 if ((p = strchr(input, '@')) != NULL) {
875 } else if ((p = strchr(input, ':')) != NULL) {
878 rrd_set_error("expected timestamp not found in data source from %s",
884 updvals[tmpl_idx[i++]] = p + 1;
889 updvals[tmpl_idx[i++]] = p + 1;
895 rrd_set_error("expected %lu data source readings (got %lu) from %s",
896 tmpl_cnt - 1, i, input);
900 if (get_time_from_reading(rrd, timesyntax, updvals,
901 current_time, current_time_usec,
909 * Parse the time in a DS string, store it in current_time and
910 * current_time_usec and verify that it's later than the last
911 * update for this DS.
913 * Returns 0 on success, -1 on error.
915 static int get_time_from_reading(
919 time_t *current_time,
920 unsigned long *current_time_usec,
924 char *parsetime_error = NULL;
926 struct rrd_time_value ds_tv;
927 struct timeval tmp_time; /* used for time conversion */
929 /* get the time from the reading ... handle N */
930 if (timesyntax == '@') { /* at-style */
931 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
932 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
935 if (ds_tv.type == RELATIVE_TO_END_TIME ||
936 ds_tv.type == RELATIVE_TO_START_TIME) {
937 rrd_set_error("specifying time relative to the 'start' "
938 "or 'end' makes no sense here: %s", updvals[0]);
941 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
942 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
943 } else if (strcmp(updvals[0], "N") == 0) {
944 gettimeofday(&tmp_time, 0);
945 normalize_time(&tmp_time);
946 *current_time = tmp_time.tv_sec;
947 *current_time_usec = tmp_time.tv_usec;
949 old_locale = setlocale(LC_NUMERIC, "C");
950 tmp = strtod(updvals[0], 0);
951 setlocale(LC_NUMERIC, old_locale);
952 *current_time = floor(tmp);
953 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
955 /* dont do any correction for old version RRDs */
957 *current_time_usec = 0;
959 if (*current_time < rrd->live_head->last_up ||
960 (*current_time == rrd->live_head->last_up &&
961 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
962 rrd_set_error("illegal attempt to update using time %ld when "
963 "last update time is %ld (minimum one second step)",
964 *current_time, rrd->live_head->last_up);
971 * Update pdp_new by interpreting the updvals according to the DS type
972 * (COUNTER, GAUGE, etc.).
974 * Returns 0 on success, -1 on error.
976 static int update_pdp_prep(
979 rrd_value_t *pdp_new,
982 unsigned long ds_idx;
984 char *endptr; /* used in the conversion */
989 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
990 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
992 /* make sure we do not build diffs with old last_ds values */
993 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
994 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
995 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
998 /* NOTE: DST_CDEF should never enter this if block, because
999 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1000 * accidently specified a value for the DST_CDEF. To handle this case,
1001 * an extra check is required. */
1003 if ((updvals[ds_idx + 1][0] != 'U') &&
1004 (dst_idx != DST_CDEF) &&
1005 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1008 /* pdp_new contains rate * time ... eg the bytes transferred during
1009 * the interval. Doing it this way saves a lot of math operations
1014 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1015 if ((updvals[ds_idx + 1][ii] < '0'
1016 || updvals[ds_idx + 1][ii] > '9')
1017 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1018 rrd_set_error("not a simple integer: '%s'",
1019 updvals[ds_idx + 1]);
1023 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1025 rrd_diff(updvals[ds_idx + 1],
1026 rrd->pdp_prep[ds_idx].last_ds);
1027 if (dst_idx == DST_COUNTER) {
1028 /* simple overflow catcher. This will fail
1029 * terribly for non 32 or 64 bit counters
1030 * ... are there any others in SNMP land?
1032 if (pdp_new[ds_idx] < (double) 0.0)
1033 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1034 if (pdp_new[ds_idx] < (double) 0.0)
1035 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1037 rate = pdp_new[ds_idx] / interval;
1039 pdp_new[ds_idx] = DNAN;
1043 old_locale = setlocale(LC_NUMERIC, "C");
1045 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1046 setlocale(LC_NUMERIC, old_locale);
1048 rrd_set_error("converting '%s' to float: %s",
1049 updvals[ds_idx + 1], rrd_strerror(errno));
1052 if (endptr[0] != '\0') {
1054 ("conversion of '%s' to float not complete: tail '%s'",
1055 updvals[ds_idx + 1], endptr);
1058 rate = pdp_new[ds_idx] / interval;
1062 old_locale = setlocale(LC_NUMERIC, "C");
1064 strtod(updvals[ds_idx + 1], &endptr) * interval;
1065 setlocale(LC_NUMERIC, old_locale);
1067 rrd_set_error("converting '%s' to float: %s",
1068 updvals[ds_idx + 1], rrd_strerror(errno));
1071 if (endptr[0] != '\0') {
1073 ("conversion of '%s' to float not complete: tail '%s'",
1074 updvals[ds_idx + 1], endptr);
1077 rate = pdp_new[ds_idx] / interval;
1080 rrd_set_error("rrd contains unknown DS type : '%s'",
1081 rrd->ds_def[ds_idx].dst);
1084 /* break out of this for loop if the error string is set */
1085 if (rrd_test_error()) {
1088 /* make sure pdp_temp is neither too large or too small
1089 * if any of these occur it becomes unknown ...
1090 * sorry folks ... */
1092 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1093 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1094 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1095 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1096 pdp_new[ds_idx] = DNAN;
1099 /* no news is news all the same */
1100 pdp_new[ds_idx] = DNAN;
1104 /* make a copy of the command line argument for the next run */
1106 fprintf(stderr, "prep ds[%lu]\t"
1110 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1115 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1121 * How many PDP steps have elapsed since the last update? Returns the answer,
1122 * and stores the time between the last update and the last PDP in pre_time,
1123 * and the time between the last PDP and the current time in post_int.
1125 static int calculate_elapsed_steps(
1127 unsigned long current_time,
1128 unsigned long current_time_usec,
1132 unsigned long *proc_pdp_cnt)
1134 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1135 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1137 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1138 * when it was last updated */
1139 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1141 /* when was the current pdp started */
1142 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1143 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1145 /* when did the last pdp_st occur */
1146 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1147 occu_pdp_st = current_time - occu_pdp_age;
1149 if (occu_pdp_st > proc_pdp_st) {
1150 /* OK we passed the pdp_st moment */
1151 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1152 * occurred before the latest
1154 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1155 *post_int = occu_pdp_age; /* how much after it */
1156 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1158 *pre_int = interval;
1162 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1165 printf("proc_pdp_age %lu\t"
1167 "occu_pfp_age %lu\t"
1171 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1172 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1175 /* compute the number of elapsed pdp_st moments */
1176 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1180 * Increment the PDP values by the values in pdp_new, or else initialize them.
1182 static void simple_update(
1185 rrd_value_t *pdp_new)
1189 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1190 if (isnan(pdp_new[i])) {
1191 /* this is not really accurate if we use subsecond data arrival time
1192 should have thought of it when going subsecond resolution ...
1193 sorry next format change we will have it! */
1194 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1197 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1198 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1200 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1209 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1210 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1216 * Call process_pdp_st for each DS.
1218 * Returns 0 on success, -1 on error.
1220 static int process_all_pdp_st(
1225 unsigned long elapsed_pdp_st,
1226 rrd_value_t *pdp_new,
1227 rrd_value_t *pdp_temp)
1229 unsigned long ds_idx;
1231 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1232 rate*seconds which occurred up to the last run.
1233 pdp_new[] contains rate*seconds from the latest run.
1234 pdp_temp[] will contain the rate for cdp */
1236 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1237 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1238 elapsed_pdp_st * rrd->stat_head->pdp_step,
1239 pdp_new, pdp_temp) == -1) {
1243 fprintf(stderr, "PDP UPD ds[%lu]\t"
1244 "elapsed_pdp_st %lu\t"
1247 "new_unkn_sec %5lu\n",
1251 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1252 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1259 * Process an update that occurs after one of the PDP moments.
1260 * Increments the PDP value, sets NAN if time greater than the
1261 * heartbeats have elapsed, processes CDEFs.
1263 * Returns 0 on success, -1 on error.
1265 static int process_pdp_st(
1267 unsigned long ds_idx,
1271 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1272 rrd_value_t *pdp_new,
1273 rrd_value_t *pdp_temp)
1277 /* update pdp_prep to the current pdp_st. */
1278 double pre_unknown = 0.0;
1279 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1280 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1282 rpnstack_t rpnstack; /* used for COMPUTE DS */
1284 rpnstack_init(&rpnstack);
1287 if (isnan(pdp_new[ds_idx])) {
1288 /* a final bit of unknown to be added before calculation
1289 we use a temporary variable for this so that we
1290 don't have to turn integer lines before using the value */
1291 pre_unknown = pre_int;
1293 if (isnan(scratch[PDP_val].u_val)) {
1294 scratch[PDP_val].u_val = 0;
1296 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1299 /* if too much of the pdp_prep is unknown we dump it */
1300 /* if the interval is larger thatn mrhb we get NAN */
1301 if ((interval > mrhb) ||
1302 (rrd->stat_head->pdp_step / 2.0 <
1303 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1304 pdp_temp[ds_idx] = DNAN;
1306 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1307 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1311 /* process CDEF data sources; remember each CDEF DS can
1312 * only reference other DS with a lower index number */
1313 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1317 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1318 /* substitute data values for OP_VARIABLE nodes */
1319 for (i = 0; rpnp[i].op != OP_END; i++) {
1320 if (rpnp[i].op == OP_VARIABLE) {
1321 rpnp[i].op = OP_NUMBER;
1322 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1325 /* run the rpn calculator */
1326 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1328 rpnstack_free(&rpnstack);
1333 /* make pdp_prep ready for the next run */
1334 if (isnan(pdp_new[ds_idx])) {
1335 /* this is not realy accurate if we use subsecond data arival time
1336 should have thought of it when going subsecond resolution ...
1337 sorry next format change we will have it! */
1338 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1339 scratch[PDP_val].u_val = DNAN;
1341 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1342 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1344 rpnstack_free(&rpnstack);
1349 * Iterate over all the RRAs for a given DS and:
1350 * 1. Decide whether to schedule a smooth later
1351 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1354 * Returns 0 on success, -1 on error
1356 static int update_all_cdp_prep(
1358 unsigned long *rra_step_cnt,
1359 unsigned long rra_begin,
1360 rrd_file_t *rrd_file,
1361 unsigned long elapsed_pdp_st,
1362 unsigned long proc_pdp_cnt,
1363 rrd_value_t **last_seasonal_coef,
1364 rrd_value_t **seasonal_coef,
1365 rrd_value_t *pdp_temp,
1366 unsigned long *rra_current,
1367 unsigned long *skip_update,
1368 int *schedule_smooth)
1370 unsigned long rra_idx;
1372 /* index into the CDP scratch array */
1373 enum cf_en current_cf;
1374 unsigned long rra_start;
1376 /* number of rows to be updated in an RRA for a data value. */
1377 unsigned long start_pdp_offset;
1379 rra_start = rra_begin;
1380 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1381 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1383 rrd->rra_def[rra_idx].pdp_cnt -
1384 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1385 skip_update[rra_idx] = 0;
1386 if (start_pdp_offset <= elapsed_pdp_st) {
1387 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1388 rrd->rra_def[rra_idx].pdp_cnt + 1;
1390 rra_step_cnt[rra_idx] = 0;
1393 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1394 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1395 * so that they will be correct for the next observed value; note that for
1396 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1397 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1398 if (rra_step_cnt[rra_idx] > 1) {
1399 skip_update[rra_idx] = 1;
1400 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1401 elapsed_pdp_st, last_seasonal_coef);
1402 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1403 elapsed_pdp_st + 1, seasonal_coef);
1405 /* periodically run a smoother for seasonal effects */
1406 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1409 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1410 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1411 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1414 *schedule_smooth = 1;
1416 *rra_current = rrd_tell(rrd_file);
1418 if (rrd_test_error())
1422 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1423 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1424 current_cf) == -1) {
1428 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1429 sizeof(rrd_value_t);
1435 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1437 static int do_schedule_smooth(
1439 unsigned long rra_idx,
1440 unsigned long elapsed_pdp_st)
1442 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1443 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1444 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1445 unsigned long seasonal_smooth_idx =
1446 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1447 unsigned long *init_seasonal =
1448 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1450 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1451 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1452 * really an RRA level, not a data source within RRA level parameter, but
1453 * the rra_def is read only for rrd_update (not flushed to disk). */
1454 if (*init_seasonal > BURNIN_CYCLES) {
1455 /* someone has no doubt invented a trick to deal with this wrap around,
1456 * but at least this code is clear. */
1457 if (seasonal_smooth_idx > cur_row) {
1458 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1459 * between PDP and CDP */
1460 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1462 /* can't rely on negative numbers because we are working with
1463 * unsigned values */
1464 return (cur_row + elapsed_pdp_st >= row_cnt
1465 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1467 /* mark off one of the burn-in cycles */
1468 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1472 * For a given RRA, iterate over the data sources and call the appropriate
1473 * consolidation function.
1475 * Returns 0 on success, -1 on error.
1477 static int update_cdp_prep(
1479 unsigned long elapsed_pdp_st,
1480 unsigned long start_pdp_offset,
1481 unsigned long *rra_step_cnt,
1483 rrd_value_t *pdp_temp,
1484 rrd_value_t *last_seasonal_coef,
1485 rrd_value_t *seasonal_coef,
1488 unsigned long ds_idx, cdp_idx;
1490 /* update CDP_PREP areas */
1491 /* loop over data soures within each RRA */
1492 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1494 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1496 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1497 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1498 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1499 elapsed_pdp_st, start_pdp_offset,
1500 rrd->rra_def[rra_idx].pdp_cnt,
1501 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1504 /* Nothing to consolidate if there's one PDP per CDP. However, if
1505 * we've missed some PDPs, let's update null counters etc. */
1506 if (elapsed_pdp_st > 2) {
1507 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1508 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1513 if (rrd_test_error())
1515 } /* endif data sources loop */
1520 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1521 * primary value, secondary value, and # of unknowns.
1523 static void update_cdp(
1526 rrd_value_t pdp_temp_val,
1527 unsigned long rra_step_cnt,
1528 unsigned long elapsed_pdp_st,
1529 unsigned long start_pdp_offset,
1530 unsigned long pdp_cnt,
1535 /* shorthand variables */
1536 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1537 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1538 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1539 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1542 /* If we are in this block, as least 1 CDP value will be written to
1543 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1544 * to be written, then the "fill in" value is the CDP_secondary_val
1546 if (isnan(pdp_temp_val)) {
1547 *cdp_unkn_pdp_cnt += start_pdp_offset;
1548 *cdp_secondary_val = DNAN;
1550 /* CDP_secondary value is the RRA "fill in" value for intermediary
1551 * CDP data entries. No matter the CF, the value is the same because
1552 * the average, max, min, and last of a list of identical values is
1553 * the same, namely, the value itself. */
1554 *cdp_secondary_val = pdp_temp_val;
1557 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1558 *cdp_primary_val = DNAN;
1559 if (current_cf == CF_AVERAGE) {
1561 initialize_average_carry_over(pdp_temp_val,
1563 start_pdp_offset, pdp_cnt);
1565 *cdp_val = pdp_temp_val;
1568 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1569 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1570 } /* endif meets xff value requirement for a valid value */
1571 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1572 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1573 if (isnan(pdp_temp_val))
1574 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1576 *cdp_unkn_pdp_cnt = 0;
1577 } else { /* rra_step_cnt[i] == 0 */
1580 if (isnan(*cdp_val)) {
1581 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1584 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1588 if (isnan(pdp_temp_val)) {
1589 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1592 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1599 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1600 * on the type of consolidation function.
1602 static void initialize_cdp_val(
1605 rrd_value_t pdp_temp_val,
1606 unsigned long elapsed_pdp_st,
1607 unsigned long start_pdp_offset,
1608 unsigned long pdp_cnt)
1610 rrd_value_t cum_val, cur_val;
1612 switch (current_cf) {
1614 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1615 cur_val = IFDNAN(pdp_temp_val, 0.0);
1616 scratch[CDP_primary_val].u_val =
1617 (cum_val + cur_val * start_pdp_offset) /
1618 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1619 scratch[CDP_val].u_val =
1620 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1621 start_pdp_offset, pdp_cnt);
1624 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1625 cur_val = IFDNAN(pdp_temp_val, -DINF);
1628 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1630 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1636 if (cur_val > cum_val)
1637 scratch[CDP_primary_val].u_val = cur_val;
1639 scratch[CDP_primary_val].u_val = cum_val;
1640 /* initialize carry over value */
1641 scratch[CDP_val].u_val = pdp_temp_val;
1644 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1645 cur_val = IFDNAN(pdp_temp_val, DINF);
1648 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1650 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1656 if (cur_val < cum_val)
1657 scratch[CDP_primary_val].u_val = cur_val;
1659 scratch[CDP_primary_val].u_val = cum_val;
1660 /* initialize carry over value */
1661 scratch[CDP_val].u_val = pdp_temp_val;
1665 scratch[CDP_primary_val].u_val = pdp_temp_val;
1666 /* initialize carry over value */
1667 scratch[CDP_val].u_val = pdp_temp_val;
1673 * Update the consolidation function for Holt-Winters functions as
1674 * well as other functions that don't actually consolidate multiple
1677 static void reset_cdp(
1679 unsigned long elapsed_pdp_st,
1680 rrd_value_t *pdp_temp,
1681 rrd_value_t *last_seasonal_coef,
1682 rrd_value_t *seasonal_coef,
1686 enum cf_en current_cf)
1688 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1690 switch (current_cf) {
1693 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1694 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1697 case CF_DEVSEASONAL:
1698 /* need to update cached seasonal values, so they are consistent
1699 * with the bulk update */
1700 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1701 * CDP_last_deviation are the same. */
1702 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1703 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1707 /* need to update the null_count and last_null_count.
1708 * even do this for non-DNAN pdp_temp because the
1709 * algorithm is not learning from batch updates. */
1710 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1711 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1714 scratch[CDP_primary_val].u_val = DNAN;
1715 scratch[CDP_secondary_val].u_val = DNAN;
1718 /* do not count missed bulk values as failures */
1719 scratch[CDP_primary_val].u_val = 0;
1720 scratch[CDP_secondary_val].u_val = 0;
1721 /* need to reset violations buffer.
1722 * could do this more carefully, but for now, just
1723 * assume a bulk update wipes away all violations. */
1724 erase_violations(rrd, cdp_idx, rra_idx);
1729 static rrd_value_t initialize_average_carry_over(
1730 rrd_value_t pdp_temp_val,
1731 unsigned long elapsed_pdp_st,
1732 unsigned long start_pdp_offset,
1733 unsigned long pdp_cnt)
1735 /* initialize carry over value */
1736 if (isnan(pdp_temp_val)) {
1739 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1743 * Update or initialize a CDP value based on the consolidation
1746 * Returns the new value.
1748 static rrd_value_t calculate_cdp_val(
1749 rrd_value_t cdp_val,
1750 rrd_value_t pdp_temp_val,
1751 unsigned long elapsed_pdp_st,
1762 if (isnan(cdp_val)) {
1763 if (current_cf == CF_AVERAGE) {
1764 pdp_temp_val *= elapsed_pdp_st;
1767 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1768 i, ii, pdp_temp_val);
1770 return pdp_temp_val;
1772 if (current_cf == CF_AVERAGE)
1773 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1774 if (current_cf == CF_MINIMUM)
1775 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1776 if (current_cf == CF_MAXIMUM)
1777 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1779 return pdp_temp_val;
1783 * For each RRA, update the seasonal values and then call update_aberrant_CF
1784 * for each data source.
1786 * Return 0 on success, -1 on error.
1788 static int update_aberrant_cdps(
1790 rrd_file_t *rrd_file,
1791 unsigned long rra_begin,
1792 unsigned long *rra_current,
1793 unsigned long elapsed_pdp_st,
1794 rrd_value_t *pdp_temp,
1795 rrd_value_t **seasonal_coef)
1797 unsigned long rra_idx, ds_idx, j;
1799 /* number of PDP steps since the last update that
1800 * are assigned to the first CDP to be generated
1801 * since the last update. */
1802 unsigned short scratch_idx;
1803 unsigned long rra_start;
1804 enum cf_en current_cf;
1806 /* this loop is only entered if elapsed_pdp_st < 3 */
1807 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1808 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1809 rra_start = rra_begin;
1810 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1811 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1812 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1813 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1814 if (scratch_idx == CDP_primary_val) {
1815 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1816 elapsed_pdp_st + 1, seasonal_coef);
1818 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1819 elapsed_pdp_st + 2, seasonal_coef);
1821 *rra_current = rrd_tell(rrd_file);
1823 if (rrd_test_error())
1825 /* loop over data soures within each RRA */
1826 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1827 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1828 rra_idx * (rrd->stat_head->ds_cnt) +
1829 ds_idx, rra_idx, ds_idx, scratch_idx,
1833 rra_start += rrd->rra_def[rra_idx].row_cnt
1834 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1841 * Move sequentially through the file, writing one RRA at a time. Note this
1842 * architecture divorces the computation of CDP with flushing updated RRA
1845 * Return 0 on success, -1 on error.
1847 static int write_to_rras(
1849 rrd_file_t *rrd_file,
1850 unsigned long *rra_step_cnt,
1851 unsigned long rra_begin,
1852 unsigned long *rra_current,
1853 time_t current_time,
1854 unsigned long *skip_update,
1855 info_t **pcdp_summary)
1857 unsigned long rra_idx;
1858 unsigned long rra_start;
1859 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1860 time_t rra_time = 0; /* time of update for a RRA */
1862 /* Ready to write to disk */
1863 rra_start = rra_begin;
1864 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1865 /* skip unless there's something to write */
1866 if (rra_step_cnt[rra_idx]) {
1867 /* write the first row */
1869 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1871 rrd->rra_ptr[rra_idx].cur_row++;
1872 if (rrd->rra_ptr[rra_idx].cur_row >=
1873 rrd->rra_def[rra_idx].row_cnt)
1874 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1875 /* position on the first row */
1876 rra_pos_tmp = rra_start +
1877 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1878 sizeof(rrd_value_t);
1879 if (rra_pos_tmp != *rra_current) {
1880 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1881 rrd_set_error("seek error in rrd");
1884 *rra_current = rra_pos_tmp;
1887 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1889 if (!skip_update[rra_idx]) {
1890 if (*pcdp_summary != NULL) {
1891 rra_time = (current_time - current_time
1892 % (rrd->rra_def[rra_idx].pdp_cnt *
1893 rrd->stat_head->pdp_step))
1895 ((rra_step_cnt[rra_idx] -
1896 1) * rrd->rra_def[rra_idx].pdp_cnt *
1897 rrd->stat_head->pdp_step);
1900 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1901 pcdp_summary, rra_time) == -1)
1905 /* write other rows of the bulk update, if any */
1906 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1907 if (++rrd->rra_ptr[rra_idx].cur_row ==
1908 rrd->rra_def[rra_idx].row_cnt) {
1911 "Wraparound for RRA %s, %lu updates left\n",
1912 rrd->rra_def[rra_idx].cf_nam,
1913 rra_step_cnt[rra_idx] - 1);
1916 rrd->rra_ptr[rra_idx].cur_row = 0;
1917 /* seek back to beginning of current rra */
1918 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1919 rrd_set_error("seek error in rrd");
1923 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1926 *rra_current = rra_start;
1928 if (!skip_update[rra_idx]) {
1929 if (*pcdp_summary != NULL) {
1930 rra_time = (current_time - current_time
1931 % (rrd->rra_def[rra_idx].pdp_cnt *
1932 rrd->stat_head->pdp_step))
1934 ((rra_step_cnt[rra_idx] -
1935 2) * rrd->rra_def[rra_idx].pdp_cnt *
1936 rrd->stat_head->pdp_step);
1938 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1939 CDP_secondary_val, pcdp_summary,
1945 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1946 sizeof(rrd_value_t);
1953 * Write out one row of values (one value per DS) to the archive.
1955 * Returns 0 on success, -1 on error.
1957 static int write_RRA_row(
1958 rrd_file_t *rrd_file,
1960 unsigned long rra_idx,
1961 unsigned long *rra_current,
1962 unsigned short CDP_scratch_idx,
1963 info_t **pcdp_summary,
1966 unsigned long ds_idx, cdp_idx;
1969 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1970 /* compute the cdp index */
1971 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1973 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1974 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1975 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1977 if (*pcdp_summary != NULL) {
1978 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1979 /* append info to the return hash */
1980 *pcdp_summary = info_push(*pcdp_summary,
1981 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1983 rrd->rra_def[rra_idx].
1985 rrd->rra_def[rra_idx].
1987 rrd->ds_def[ds_idx].
1988 ds_nam), RD_I_VAL, iv);
1990 if (rrd_write(rrd_file,
1991 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1992 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1993 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1996 *rra_current += sizeof(rrd_value_t);
2002 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2004 * Returns 0 on success, -1 otherwise
2006 static int smooth_all_rras(
2008 rrd_file_t *rrd_file,
2009 unsigned long rra_begin)
2011 unsigned long rra_start = rra_begin;
2012 unsigned long rra_idx;
2014 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2015 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2016 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2018 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2020 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2021 if (rrd_test_error())
2024 rra_start += rrd->rra_def[rra_idx].row_cnt
2025 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2032 * Flush changes to disk (unless we're using mmap)
2034 * Returns 0 on success, -1 otherwise
2036 static int write_changes_to_disk(
2038 rrd_file_t *rrd_file,
2041 /* we just need to write back the live header portion now */
2042 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2043 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2044 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2046 rrd_set_error("seek rrd for live header writeback");
2050 if (rrd_write(rrd_file, rrd->live_head,
2051 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2052 rrd_set_error("rrd_write live_head to rrd");
2056 if (rrd_write(rrd_file, &rrd->live_head->last_up,
2057 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2058 rrd_set_error("rrd_write live_head to rrd");
2064 if (rrd_write(rrd_file, rrd->pdp_prep,
2065 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2066 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2067 rrd_set_error("rrd_write pdp_prep to rrd");
2071 if (rrd_write(rrd_file, rrd->cdp_prep,
2072 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2073 rrd->stat_head->ds_cnt)
2074 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2075 rrd->stat_head->ds_cnt)) {
2077 rrd_set_error("rrd_write cdp_prep to rrd");
2081 if (rrd_write(rrd_file, rrd->rra_ptr,
2082 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2083 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2084 rrd_set_error("rrd_write rra_ptr to rrd");