1 /*****************************************************************************
2 * RRDtool 1.4.3 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
8 *****************************************************************************/
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(
49 struct __timezone *tz)
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
64 /* FUNCTION PROTOTYPES */
78 static int allocate_data_structures(
81 rrd_value_t **pdp_temp,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
92 unsigned long *tmpl_cnt,
95 static int process_arg(
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
110 unsigned long *skip_update,
111 int *schedule_smooth);
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
123 static int get_time_from_reading(
127 time_t *current_time,
128 unsigned long *current_time_usec,
131 static int update_pdp_prep(
134 rrd_value_t *pdp_new,
137 static int calculate_elapsed_steps(
139 unsigned long current_time,
140 unsigned long current_time_usec,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
162 unsigned long ds_idx,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
199 static void update_cdp(
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
211 static void initialize_cdp_val(
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
244 static int update_aberrant_cdps(
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
269 static int smooth_all_rras(
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
275 static int write_changes_to_disk(
277 rrd_file_t *rrd_file,
282 * normalize time as returned by gettimeofday. usec part must
285 static void normalize_time(
288 if (t->tv_usec < 0) {
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
309 *current_time_usec = tmp_time.tv_usec;
311 *current_time_usec = 0;
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
322 rrd_info_t *result = NULL;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
332 opterr = 0; /* initialize getopt */
335 int option_index = 0;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
387 int option_index = 0;
391 char *opt_daemon = NULL;
394 opterr = 0; /* initialize getopt */
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
404 tmplt = strdup(optarg);
408 if (opt_daemon != NULL)
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
413 rrd_set_error("strdup failed.");
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437 rrd_set_error("The caching daemon cannot be used together with "
442 if (! rrdc_is_connected(opt_daemon))
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
447 else /* we are connected */
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
453 rrd_set_error("Failed sending the values to rrdcached: %s",
463 if (opt_daemon != NULL)
472 const char *filename,
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
481 const char *filename,
485 rrd_info_t * pcdp_summary)
487 return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
491 const char *filename,
495 rrd_info_t * pcdp_summary)
500 unsigned long rra_begin; /* byte pointer to the rra
501 * area in the rrd file. this
502 * pointer never changes value */
503 rrd_value_t *pdp_new; /* prepare the incoming data to be added
504 * to the existing entry */
505 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
506 * to the cdp values */
508 long *tmpl_idx; /* index representing the settings
509 * transported by the tmplt index */
510 unsigned long tmpl_cnt = 2; /* time and data */
512 time_t current_time = 0;
513 unsigned long current_time_usec = 0; /* microseconds part of current time */
515 int schedule_smooth = 0;
517 /* number of elapsed PDP steps since last update */
518 unsigned long *rra_step_cnt = NULL;
520 int version; /* rrd version */
521 rrd_file_t *rrd_file;
522 char *arg_copy; /* for processing the argv */
523 unsigned long *skip_update; /* RRAs to advance but not write */
525 /* need at least 1 arguments: data. */
527 rrd_set_error("Not enough arguments");
532 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
535 /* We are now at the beginning of the rra's */
536 rra_begin = rrd_file->header_len;
538 version = atoi(rrd.stat_head->version);
540 initialize_time(¤t_time, ¤t_time_usec, version);
542 /* get exclusive lock to whole file.
543 * lock gets removed when we close the file.
545 if (rrd_lock(rrd_file) != 0) {
546 rrd_set_error("could not lock RRD");
550 if (allocate_data_structures(&rrd, &updvals,
551 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552 &rra_step_cnt, &skip_update,
557 /* loop through the arguments. */
558 for (arg_i = 0; arg_i < argc; arg_i++) {
559 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560 rrd_set_error("failed duplication argv entry");
563 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
565 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566 &pcdp_summary, version, skip_update,
567 &schedule_smooth) == -1) {
568 if (rrd_test_error()) { /* Should have error string always here */
571 /* Prepend file name to error message */
572 if ((save_error = strdup(rrd_get_error())) != NULL) {
573 rrd_set_error("%s: %s", filename, save_error);
585 /* if we got here and if there is an error and if the file has not been
586 * written to, then close things up and return. */
587 if (rrd_test_error()) {
588 goto err_free_structures;
591 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592 goto err_free_structures;
596 /* calling the smoothing code here guarantees at most one smoothing
597 * operation per rrd_update call. Unfortunately, it is possible with bulk
598 * updates, or a long-delayed update for smoothing to occur off-schedule.
599 * This really isn't critical except during the burn-in cycles. */
600 if (schedule_smooth) {
601 smooth_all_rras(&rrd, rrd_file, rra_begin);
604 /* rrd_dontneed(rrd_file,&rrd); */
630 * Allocate some important arrays used, and initialize the template.
632 * When it returns, either all of the structures are allocated
633 * or none of them are.
635 * Returns 0 on success, -1 on error.
637 static int allocate_data_structures(
640 rrd_value_t **pdp_temp,
643 unsigned long *tmpl_cnt,
644 unsigned long **rra_step_cnt,
645 unsigned long **skip_update,
646 rrd_value_t **pdp_new)
649 if ((*updvals = (char **) malloc(sizeof(char *)
650 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651 rrd_set_error("allocating updvals pointer array.");
654 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655 * rrd->stat_head->ds_cnt)) ==
657 rrd_set_error("allocating pdp_temp.");
658 goto err_free_updvals;
660 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
662 rrd->stat_head->rra_cnt)) ==
664 rrd_set_error("allocating skip_update.");
665 goto err_free_pdp_temp;
667 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669 rrd_set_error("allocating tmpl_idx.");
670 goto err_free_skip_update;
672 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
675 rra_cnt))) == NULL) {
676 rrd_set_error("allocating rra_step_cnt.");
677 goto err_free_tmpl_idx;
680 /* initialize tmplt redirector */
681 /* default config example (assume DS 1 is a CDEF DS)
682 tmpl_idx[0] -> 0; (time)
683 tmpl_idx[1] -> 1; (DS 0)
684 tmpl_idx[2] -> 3; (DS 2)
685 tmpl_idx[3] -> 4; (DS 3) */
686 (*tmpl_idx)[0] = 0; /* time */
687 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689 (*tmpl_idx)[ii++] = i;
694 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695 goto err_free_rra_step_cnt;
699 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700 * rrd->stat_head->ds_cnt)) == NULL) {
701 rrd_set_error("allocating pdp_new.");
702 goto err_free_rra_step_cnt;
707 err_free_rra_step_cnt:
711 err_free_skip_update:
721 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
723 * Returns 0 on success.
725 static int parse_template(
728 unsigned long *tmpl_cnt,
731 char *dsname, *tmplt_copy;
732 unsigned int tmpl_len, i;
735 *tmpl_cnt = 1; /* the first entry is the time */
737 /* we should work on a writeable copy here */
738 if ((tmplt_copy = strdup(tmplt)) == NULL) {
739 rrd_set_error("error copying tmplt '%s'", tmplt);
745 tmpl_len = strlen(tmplt_copy);
746 for (i = 0; i <= tmpl_len; i++) {
747 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748 tmplt_copy[i] = '\0';
749 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750 rrd_set_error("tmplt contains more DS definitions than RRD");
752 goto out_free_tmpl_copy;
754 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755 rrd_set_error("unknown DS name '%s'", dsname);
757 goto out_free_tmpl_copy;
759 /* go to the next entry on the tmplt_copy */
761 dsname = &tmplt_copy[i + 1];
771 * Parse an update string, updates the primary data points (PDPs)
772 * and consolidated data points (CDPs), and writes changes to the RRAs.
774 * Returns 0 on success, -1 on error.
776 static int process_arg(
779 rrd_file_t *rrd_file,
780 unsigned long rra_begin,
781 time_t *current_time,
782 unsigned long *current_time_usec,
783 rrd_value_t *pdp_temp,
784 rrd_value_t *pdp_new,
785 unsigned long *rra_step_cnt,
788 unsigned long tmpl_cnt,
789 rrd_info_t ** pcdp_summary,
791 unsigned long *skip_update,
792 int *schedule_smooth)
794 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
796 /* a vector of future Holt-Winters seasonal coefs */
797 unsigned long elapsed_pdp_st;
799 double interval, pre_int, post_int; /* interval between this and
801 unsigned long proc_pdp_cnt;
803 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804 current_time, current_time_usec, version) == -1) {
808 interval = (double) (*current_time - rrd->live_head->last_up)
809 + (double) ((long) *current_time_usec -
810 (long) rrd->live_head->last_up_usec) / 1e6f;
812 /* process the data sources and update the pdp_prep
813 * area accordingly */
814 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
818 elapsed_pdp_st = calculate_elapsed_steps(rrd,
820 *current_time_usec, interval,
824 /* has a pdp_st moment occurred since the last run ? */
825 if (elapsed_pdp_st == 0) {
826 /* no we have not passed a pdp_st moment. therefore update is simple */
827 simple_update(rrd, interval, pdp_new);
829 /* an pdp_st has occurred. */
830 if (process_all_pdp_st(rrd, interval,
832 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
835 if (update_all_cdp_prep(rrd, rra_step_cnt,
842 skip_update, schedule_smooth) == -1) {
843 goto err_free_coefficients;
845 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846 elapsed_pdp_st, pdp_temp,
847 &seasonal_coef) == -1) {
848 goto err_free_coefficients;
850 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851 *current_time, skip_update,
852 pcdp_summary) == -1) {
853 goto err_free_coefficients;
855 } /* endif a pdp_st has occurred */
856 rrd->live_head->last_up = *current_time;
857 rrd->live_head->last_up_usec = *current_time_usec;
860 *rrd->legacy_last_up = rrd->live_head->last_up;
863 free(last_seasonal_coef);
866 err_free_coefficients:
868 free(last_seasonal_coef);
873 * Parse a DS string (time + colon-separated values), storing the
874 * results in current_time, current_time_usec, and updvals.
876 * Returns 0 on success, -1 on error.
883 unsigned long tmpl_cnt,
884 time_t *current_time,
885 unsigned long *current_time_usec,
893 /* initialize all ds input to unknown except the first one
894 which has always got to be set */
895 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
898 /* separate all ds elements; first must be examined separately
899 due to alternate time syntax */
900 if ((p = strchr(input, '@')) != NULL) {
902 } else if ((p = strchr(input, ':')) != NULL) {
905 rrd_set_error("expected timestamp not found in data source from %s",
911 updvals[tmpl_idx[i++]] = p + 1;
916 updvals[tmpl_idx[i++]] = p + 1;
919 rrd_set_error("found extra data on update argument: %s",p+1);
926 rrd_set_error("expected %lu data source readings (got %lu) from %s",
927 tmpl_cnt - 1, i - 1, input);
931 if (get_time_from_reading(rrd, timesyntax, updvals,
932 current_time, current_time_usec,
940 * Parse the time in a DS string, store it in current_time and
941 * current_time_usec and verify that it's later than the last
942 * update for this DS.
944 * Returns 0 on success, -1 on error.
946 static int get_time_from_reading(
950 time_t *current_time,
951 unsigned long *current_time_usec,
955 char *parsetime_error = NULL;
957 rrd_time_value_t ds_tv;
958 struct timeval tmp_time; /* used for time conversion */
960 /* get the time from the reading ... handle N */
961 if (timesyntax == '@') { /* at-style */
962 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
966 if (ds_tv.type == RELATIVE_TO_END_TIME ||
967 ds_tv.type == RELATIVE_TO_START_TIME) {
968 rrd_set_error("specifying time relative to the 'start' "
969 "or 'end' makes no sense here: %s", updvals[0]);
972 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974 } else if (strcmp(updvals[0], "N") == 0) {
975 gettimeofday(&tmp_time, 0);
976 normalize_time(&tmp_time);
977 *current_time = tmp_time.tv_sec;
978 *current_time_usec = tmp_time.tv_usec;
980 old_locale = setlocale(LC_NUMERIC, "C");
982 tmp = strtod(updvals[0], 0);
984 rrd_set_error("converting '%s' to float: %s",
985 updvals[0], rrd_strerror(errno));
988 setlocale(LC_NUMERIC, old_locale);
990 gettimeofday(&tmp_time, 0);
991 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
994 *current_time = floor(tmp);
995 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
997 /* dont do any correction for old version RRDs */
999 *current_time_usec = 0;
1001 if (*current_time < rrd->live_head->last_up ||
1002 (*current_time == rrd->live_head->last_up &&
1003 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1004 rrd_set_error("illegal attempt to update using time %ld when "
1005 "last update time is %ld (minimum one second step)",
1006 *current_time, rrd->live_head->last_up);
1013 * Update pdp_new by interpreting the updvals according to the DS type
1014 * (COUNTER, GAUGE, etc.).
1016 * Returns 0 on success, -1 on error.
1018 static int update_pdp_prep(
1021 rrd_value_t *pdp_new,
1024 unsigned long ds_idx;
1026 char *endptr; /* used in the conversion */
1029 enum dst_en dst_idx;
1031 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1032 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1034 /* make sure we do not build diffs with old last_ds values */
1035 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1036 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1037 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1040 /* NOTE: DST_CDEF should never enter this if block, because
1041 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1042 * accidently specified a value for the DST_CDEF. To handle this case,
1043 * an extra check is required. */
1045 if ((updvals[ds_idx + 1][0] != 'U') &&
1046 (dst_idx != DST_CDEF) &&
1047 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1050 /* pdp_new contains rate * time ... eg the bytes transferred during
1051 * the interval. Doing it this way saves a lot of math operations
1056 /* Check if this is a valid integer. `U' is already handled in
1057 * another branch. */
1058 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1059 if ((ii == 0) && (dst_idx == DST_DERIVE)
1060 && (updvals[ds_idx + 1][ii] == '-'))
1063 if ((updvals[ds_idx + 1][ii] < '0')
1064 || (updvals[ds_idx + 1][ii] > '9')) {
1065 rrd_set_error("not a simple %s integer: '%s'",
1066 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1067 updvals[ds_idx + 1]);
1070 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1072 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1074 rrd_diff(updvals[ds_idx + 1],
1075 rrd->pdp_prep[ds_idx].last_ds);
1076 if (dst_idx == DST_COUNTER) {
1077 /* simple overflow catcher. This will fail
1078 * terribly for non 32 or 64 bit counters
1079 * ... are there any others in SNMP land?
1081 if (pdp_new[ds_idx] < (double) 0.0)
1082 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1083 if (pdp_new[ds_idx] < (double) 0.0)
1084 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1086 rate = pdp_new[ds_idx] / interval;
1088 pdp_new[ds_idx] = DNAN;
1092 old_locale = setlocale(LC_NUMERIC, "C");
1094 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1096 rrd_set_error("converting '%s' to float: %s",
1097 updvals[ds_idx + 1], rrd_strerror(errno));
1100 setlocale(LC_NUMERIC, old_locale);
1101 if (endptr[0] != '\0') {
1103 ("conversion of '%s' to float not complete: tail '%s'",
1104 updvals[ds_idx + 1], endptr);
1107 rate = pdp_new[ds_idx] / interval;
1110 old_locale = setlocale(LC_NUMERIC, "C");
1113 strtod(updvals[ds_idx + 1], &endptr) * interval;
1115 rrd_set_error("converting '%s' to float: %s",
1116 updvals[ds_idx + 1], rrd_strerror(errno));
1119 setlocale(LC_NUMERIC, old_locale);
1120 if (endptr[0] != '\0') {
1122 ("conversion of '%s' to float not complete: tail '%s'",
1123 updvals[ds_idx + 1], endptr);
1126 rate = pdp_new[ds_idx] / interval;
1129 rrd_set_error("rrd contains unknown DS type : '%s'",
1130 rrd->ds_def[ds_idx].dst);
1133 /* break out of this for loop if the error string is set */
1134 if (rrd_test_error()) {
1137 /* make sure pdp_temp is neither too large or too small
1138 * if any of these occur it becomes unknown ...
1139 * sorry folks ... */
1141 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1142 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1143 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1144 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1145 pdp_new[ds_idx] = DNAN;
1148 /* no news is news all the same */
1149 pdp_new[ds_idx] = DNAN;
1153 /* make a copy of the command line argument for the next run */
1155 fprintf(stderr, "prep ds[%lu]\t"
1159 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1162 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1164 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1170 * How many PDP steps have elapsed since the last update? Returns the answer,
1171 * and stores the time between the last update and the last PDP in pre_time,
1172 * and the time between the last PDP and the current time in post_int.
1174 static int calculate_elapsed_steps(
1176 unsigned long current_time,
1177 unsigned long current_time_usec,
1181 unsigned long *proc_pdp_cnt)
1183 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1184 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1186 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1187 * when it was last updated */
1188 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1190 /* when was the current pdp started */
1191 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1192 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1194 /* when did the last pdp_st occur */
1195 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1196 occu_pdp_st = current_time - occu_pdp_age;
1198 if (occu_pdp_st > proc_pdp_st) {
1199 /* OK we passed the pdp_st moment */
1200 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1201 * occurred before the latest
1203 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1204 *post_int = occu_pdp_age; /* how much after it */
1205 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1207 *pre_int = interval;
1211 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1214 printf("proc_pdp_age %lu\t"
1216 "occu_pfp_age %lu\t"
1220 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1221 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1224 /* compute the number of elapsed pdp_st moments */
1225 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1229 * Increment the PDP values by the values in pdp_new, or else initialize them.
1231 static void simple_update(
1234 rrd_value_t *pdp_new)
1238 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1239 if (isnan(pdp_new[i])) {
1240 /* this is not really accurate if we use subsecond data arrival time
1241 should have thought of it when going subsecond resolution ...
1242 sorry next format change we will have it! */
1243 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1246 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1247 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1249 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1258 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1259 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1265 * Call process_pdp_st for each DS.
1267 * Returns 0 on success, -1 on error.
1269 static int process_all_pdp_st(
1274 unsigned long elapsed_pdp_st,
1275 rrd_value_t *pdp_new,
1276 rrd_value_t *pdp_temp)
1278 unsigned long ds_idx;
1280 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1281 rate*seconds which occurred up to the last run.
1282 pdp_new[] contains rate*seconds from the latest run.
1283 pdp_temp[] will contain the rate for cdp */
1285 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1286 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1287 elapsed_pdp_st * rrd->stat_head->pdp_step,
1288 pdp_new, pdp_temp) == -1) {
1292 fprintf(stderr, "PDP UPD ds[%lu]\t"
1293 "elapsed_pdp_st %lu\t"
1296 "new_unkn_sec %5lu\n",
1300 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1301 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1308 * Process an update that occurs after one of the PDP moments.
1309 * Increments the PDP value, sets NAN if time greater than the
1310 * heartbeats have elapsed, processes CDEFs.
1312 * Returns 0 on success, -1 on error.
1314 static int process_pdp_st(
1316 unsigned long ds_idx,
1320 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1321 rrd_value_t *pdp_new,
1322 rrd_value_t *pdp_temp)
1326 /* update pdp_prep to the current pdp_st. */
1327 double pre_unknown = 0.0;
1328 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1329 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1331 rpnstack_t rpnstack; /* used for COMPUTE DS */
1333 rpnstack_init(&rpnstack);
1336 if (isnan(pdp_new[ds_idx])) {
1337 /* a final bit of unknown to be added before calculation
1338 we use a temporary variable for this so that we
1339 don't have to turn integer lines before using the value */
1340 pre_unknown = pre_int;
1342 if (isnan(scratch[PDP_val].u_val)) {
1343 scratch[PDP_val].u_val = 0;
1345 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1348 /* if too much of the pdp_prep is unknown we dump it */
1349 /* if the interval is larger thatn mrhb we get NAN */
1350 if ((interval > mrhb) ||
1351 (rrd->stat_head->pdp_step / 2.0 <
1352 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1353 pdp_temp[ds_idx] = DNAN;
1355 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1356 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1360 /* process CDEF data sources; remember each CDEF DS can
1361 * only reference other DS with a lower index number */
1362 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1366 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1368 rpnstack_free(&rpnstack);
1371 /* substitute data values for OP_VARIABLE nodes */
1372 for (i = 0; rpnp[i].op != OP_END; i++) {
1373 if (rpnp[i].op == OP_VARIABLE) {
1374 rpnp[i].op = OP_NUMBER;
1375 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1378 /* run the rpn calculator */
1379 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1381 rpnstack_free(&rpnstack);
1387 /* make pdp_prep ready for the next run */
1388 if (isnan(pdp_new[ds_idx])) {
1389 /* this is not realy accurate if we use subsecond data arival time
1390 should have thought of it when going subsecond resolution ...
1391 sorry next format change we will have it! */
1392 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1393 scratch[PDP_val].u_val = DNAN;
1395 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1396 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1398 rpnstack_free(&rpnstack);
1403 * Iterate over all the RRAs for a given DS and:
1404 * 1. Decide whether to schedule a smooth later
1405 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1408 * Returns 0 on success, -1 on error
1410 static int update_all_cdp_prep(
1412 unsigned long *rra_step_cnt,
1413 unsigned long rra_begin,
1414 rrd_file_t *rrd_file,
1415 unsigned long elapsed_pdp_st,
1416 unsigned long proc_pdp_cnt,
1417 rrd_value_t **last_seasonal_coef,
1418 rrd_value_t **seasonal_coef,
1419 rrd_value_t *pdp_temp,
1420 unsigned long *skip_update,
1421 int *schedule_smooth)
1423 unsigned long rra_idx;
1425 /* index into the CDP scratch array */
1426 enum cf_en current_cf;
1427 unsigned long rra_start;
1429 /* number of rows to be updated in an RRA for a data value. */
1430 unsigned long start_pdp_offset;
1432 rra_start = rra_begin;
1433 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1434 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1436 rrd->rra_def[rra_idx].pdp_cnt -
1437 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1438 skip_update[rra_idx] = 0;
1439 if (start_pdp_offset <= elapsed_pdp_st) {
1440 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1441 rrd->rra_def[rra_idx].pdp_cnt + 1;
1443 rra_step_cnt[rra_idx] = 0;
1446 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1447 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1448 * so that they will be correct for the next observed value; note that for
1449 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1450 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1451 if (rra_step_cnt[rra_idx] > 1) {
1452 skip_update[rra_idx] = 1;
1453 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454 elapsed_pdp_st, last_seasonal_coef);
1455 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1456 elapsed_pdp_st + 1, seasonal_coef);
1458 /* periodically run a smoother for seasonal effects */
1459 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1462 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1463 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1464 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1467 *schedule_smooth = 1;
1470 if (rrd_test_error())
1474 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1475 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1476 current_cf) == -1) {
1480 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1481 sizeof(rrd_value_t);
1487 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1489 static int do_schedule_smooth(
1491 unsigned long rra_idx,
1492 unsigned long elapsed_pdp_st)
1494 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1495 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1496 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1497 unsigned long seasonal_smooth_idx =
1498 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1499 unsigned long *init_seasonal =
1500 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1502 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1503 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1504 * really an RRA level, not a data source within RRA level parameter, but
1505 * the rra_def is read only for rrd_update (not flushed to disk). */
1506 if (*init_seasonal > BURNIN_CYCLES) {
1507 /* someone has no doubt invented a trick to deal with this wrap around,
1508 * but at least this code is clear. */
1509 if (seasonal_smooth_idx > cur_row) {
1510 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1511 * between PDP and CDP */
1512 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1514 /* can't rely on negative numbers because we are working with
1515 * unsigned values */
1516 return (cur_row + elapsed_pdp_st >= row_cnt
1517 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1519 /* mark off one of the burn-in cycles */
1520 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1524 * For a given RRA, iterate over the data sources and call the appropriate
1525 * consolidation function.
1527 * Returns 0 on success, -1 on error.
1529 static int update_cdp_prep(
1531 unsigned long elapsed_pdp_st,
1532 unsigned long start_pdp_offset,
1533 unsigned long *rra_step_cnt,
1535 rrd_value_t *pdp_temp,
1536 rrd_value_t *last_seasonal_coef,
1537 rrd_value_t *seasonal_coef,
1540 unsigned long ds_idx, cdp_idx;
1542 /* update CDP_PREP areas */
1543 /* loop over data soures within each RRA */
1544 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1546 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1548 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1549 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1550 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1551 elapsed_pdp_st, start_pdp_offset,
1552 rrd->rra_def[rra_idx].pdp_cnt,
1553 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1556 /* Nothing to consolidate if there's one PDP per CDP. However, if
1557 * we've missed some PDPs, let's update null counters etc. */
1558 if (elapsed_pdp_st > 2) {
1559 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1560 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1565 if (rrd_test_error())
1567 } /* endif data sources loop */
1572 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1573 * primary value, secondary value, and # of unknowns.
1575 static void update_cdp(
1578 rrd_value_t pdp_temp_val,
1579 unsigned long rra_step_cnt,
1580 unsigned long elapsed_pdp_st,
1581 unsigned long start_pdp_offset,
1582 unsigned long pdp_cnt,
1587 /* shorthand variables */
1588 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1589 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1590 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1591 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1594 /* If we are in this block, as least 1 CDP value will be written to
1595 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1596 * to be written, then the "fill in" value is the CDP_secondary_val
1598 if (isnan(pdp_temp_val)) {
1599 *cdp_unkn_pdp_cnt += start_pdp_offset;
1600 *cdp_secondary_val = DNAN;
1602 /* CDP_secondary value is the RRA "fill in" value for intermediary
1603 * CDP data entries. No matter the CF, the value is the same because
1604 * the average, max, min, and last of a list of identical values is
1605 * the same, namely, the value itself. */
1606 *cdp_secondary_val = pdp_temp_val;
1609 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1610 *cdp_primary_val = DNAN;
1612 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1613 start_pdp_offset, pdp_cnt);
1616 initialize_carry_over(pdp_temp_val,current_cf,
1618 start_pdp_offset, pdp_cnt);
1619 /* endif meets xff value requirement for a valid value */
1620 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1621 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1622 if (isnan(pdp_temp_val))
1623 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1625 *cdp_unkn_pdp_cnt = 0;
1626 } else { /* rra_step_cnt[i] == 0 */
1629 if (isnan(*cdp_val)) {
1630 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1633 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1637 if (isnan(pdp_temp_val)) {
1638 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1641 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1648 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1649 * on the type of consolidation function.
1651 static void initialize_cdp_val(
1654 rrd_value_t pdp_temp_val,
1655 unsigned long start_pdp_offset,
1656 unsigned long pdp_cnt)
1658 rrd_value_t cum_val, cur_val;
1660 switch (current_cf) {
1662 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1663 cur_val = IFDNAN(pdp_temp_val, 0.0);
1664 scratch[CDP_primary_val].u_val =
1665 (cum_val + cur_val * start_pdp_offset) /
1666 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1669 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1670 cur_val = IFDNAN(pdp_temp_val, -DINF);
1674 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1676 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1682 if (cur_val > cum_val)
1683 scratch[CDP_primary_val].u_val = cur_val;
1685 scratch[CDP_primary_val].u_val = cum_val;
1688 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1689 cur_val = IFDNAN(pdp_temp_val, DINF);
1692 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1694 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1700 if (cur_val < cum_val)
1701 scratch[CDP_primary_val].u_val = cur_val;
1703 scratch[CDP_primary_val].u_val = cum_val;
1707 scratch[CDP_primary_val].u_val = pdp_temp_val;
1713 * Update the consolidation function for Holt-Winters functions as
1714 * well as other functions that don't actually consolidate multiple
1717 static void reset_cdp(
1719 unsigned long elapsed_pdp_st,
1720 rrd_value_t *pdp_temp,
1721 rrd_value_t *last_seasonal_coef,
1722 rrd_value_t *seasonal_coef,
1726 enum cf_en current_cf)
1728 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1730 switch (current_cf) {
1733 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1734 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1737 case CF_DEVSEASONAL:
1738 /* need to update cached seasonal values, so they are consistent
1739 * with the bulk update */
1740 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1741 * CDP_last_deviation are the same. */
1742 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1743 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1747 /* need to update the null_count and last_null_count.
1748 * even do this for non-DNAN pdp_temp because the
1749 * algorithm is not learning from batch updates. */
1750 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1751 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1754 scratch[CDP_primary_val].u_val = DNAN;
1755 scratch[CDP_secondary_val].u_val = DNAN;
1758 /* do not count missed bulk values as failures */
1759 scratch[CDP_primary_val].u_val = 0;
1760 scratch[CDP_secondary_val].u_val = 0;
1761 /* need to reset violations buffer.
1762 * could do this more carefully, but for now, just
1763 * assume a bulk update wipes away all violations. */
1764 erase_violations(rrd, cdp_idx, rra_idx);
1769 static rrd_value_t initialize_carry_over(
1770 rrd_value_t pdp_temp_val,
1772 unsigned long elapsed_pdp_st,
1773 unsigned long start_pdp_offset,
1774 unsigned long pdp_cnt)
1776 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1777 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1778 switch (current_cf) {
1790 switch (current_cf) {
1792 return pdp_temp_val * pdp_into_cdp_cnt ;
1794 return pdp_temp_val;
1800 * Update or initialize a CDP value based on the consolidation
1803 * Returns the new value.
1805 static rrd_value_t calculate_cdp_val(
1806 rrd_value_t cdp_val,
1807 rrd_value_t pdp_temp_val,
1808 unsigned long elapsed_pdp_st,
1819 if (isnan(cdp_val)) {
1820 if (current_cf == CF_AVERAGE) {
1821 pdp_temp_val *= elapsed_pdp_st;
1824 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1825 i, ii, pdp_temp_val);
1827 return pdp_temp_val;
1829 if (current_cf == CF_AVERAGE)
1830 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1831 if (current_cf == CF_MINIMUM)
1832 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1833 if (current_cf == CF_MAXIMUM)
1834 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1836 return pdp_temp_val;
1840 * For each RRA, update the seasonal values and then call update_aberrant_CF
1841 * for each data source.
1843 * Return 0 on success, -1 on error.
1845 static int update_aberrant_cdps(
1847 rrd_file_t *rrd_file,
1848 unsigned long rra_begin,
1849 unsigned long elapsed_pdp_st,
1850 rrd_value_t *pdp_temp,
1851 rrd_value_t **seasonal_coef)
1853 unsigned long rra_idx, ds_idx, j;
1855 /* number of PDP steps since the last update that
1856 * are assigned to the first CDP to be generated
1857 * since the last update. */
1858 unsigned short scratch_idx;
1859 unsigned long rra_start;
1860 enum cf_en current_cf;
1862 /* this loop is only entered if elapsed_pdp_st < 3 */
1863 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1864 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1865 rra_start = rra_begin;
1866 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1867 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1868 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1869 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1870 if (scratch_idx == CDP_primary_val) {
1871 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1872 elapsed_pdp_st + 1, seasonal_coef);
1874 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875 elapsed_pdp_st + 2, seasonal_coef);
1878 if (rrd_test_error())
1880 /* loop over data soures within each RRA */
1881 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1882 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1883 rra_idx * (rrd->stat_head->ds_cnt) +
1884 ds_idx, rra_idx, ds_idx, scratch_idx,
1888 rra_start += rrd->rra_def[rra_idx].row_cnt
1889 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1896 * Move sequentially through the file, writing one RRA at a time. Note this
1897 * architecture divorces the computation of CDP with flushing updated RRA
1900 * Return 0 on success, -1 on error.
1902 static int write_to_rras(
1904 rrd_file_t *rrd_file,
1905 unsigned long *rra_step_cnt,
1906 unsigned long rra_begin,
1907 time_t current_time,
1908 unsigned long *skip_update,
1909 rrd_info_t ** pcdp_summary)
1911 unsigned long rra_idx;
1912 unsigned long rra_start;
1913 time_t rra_time = 0; /* time of update for a RRA */
1915 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1917 /* Ready to write to disk */
1918 rra_start = rra_begin;
1920 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1921 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1922 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1925 unsigned short scratch_idx;
1926 unsigned long step_subtract;
1928 for (scratch_idx = CDP_primary_val,
1930 rra_step_cnt[rra_idx] > 0;
1931 rra_step_cnt[rra_idx]--,
1932 scratch_idx = CDP_secondary_val,
1933 step_subtract = 2) {
1937 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1939 /* increment, with wrap-around */
1940 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1941 rra_ptr->cur_row = 0;
1943 /* we know what our position should be */
1944 rra_pos_new = rra_start
1945 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1947 /* re-seek if the position is wrong or we wrapped around */
1948 if ((size_t)rra_pos_new != rrd_file->pos) {
1949 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1950 rrd_set_error("seek error in rrd");
1955 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1958 if (skip_update[rra_idx])
1961 if (*pcdp_summary != NULL) {
1962 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1964 rra_time = (current_time - current_time % step_time)
1965 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1969 (rrd_file, rrd, rra_idx, scratch_idx,
1970 pcdp_summary, rra_time) == -1)
1973 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1976 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1983 * Write out one row of values (one value per DS) to the archive.
1985 * Returns 0 on success, -1 on error.
1987 static int write_RRA_row(
1988 rrd_file_t *rrd_file,
1990 unsigned long rra_idx,
1991 unsigned short CDP_scratch_idx,
1992 rrd_info_t ** pcdp_summary,
1995 unsigned long ds_idx, cdp_idx;
1998 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1999 /* compute the cdp index */
2000 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2002 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2003 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2004 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2006 if (*pcdp_summary != NULL) {
2007 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2008 /* append info to the return hash */
2009 *pcdp_summary = rrd_info_push(*pcdp_summary,
2011 ("[%lli]RRA[%s][%lu]DS[%s]",
2012 (long long)rra_time,
2013 rrd->rra_def[rra_idx].cf_nam,
2014 rrd->rra_def[rra_idx].pdp_cnt,
2015 rrd->ds_def[ds_idx].ds_nam),
2019 if (rrd_write(rrd_file,
2020 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2021 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2022 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2030 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2032 * Returns 0 on success, -1 otherwise
2034 static int smooth_all_rras(
2036 rrd_file_t *rrd_file,
2037 unsigned long rra_begin)
2039 unsigned long rra_start = rra_begin;
2040 unsigned long rra_idx;
2042 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2043 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2044 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2046 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2048 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2049 if (rrd_test_error())
2052 rra_start += rrd->rra_def[rra_idx].row_cnt
2053 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2060 * Flush changes to disk (unless we're using mmap)
2062 * Returns 0 on success, -1 otherwise
2064 static int write_changes_to_disk(
2066 rrd_file_t *rrd_file,
2069 /* we just need to write back the live header portion now */
2070 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2071 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2072 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2074 rrd_set_error("seek rrd for live header writeback");
2078 if (rrd_write(rrd_file, rrd->live_head,
2079 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2080 rrd_set_error("rrd_write live_head to rrd");
2084 if (rrd_write(rrd_file, rrd->legacy_last_up,
2085 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2086 rrd_set_error("rrd_write live_head to rrd");
2092 if (rrd_write(rrd_file, rrd->pdp_prep,
2093 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2094 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2095 rrd_set_error("rrd_write pdp_prep to rrd");
2099 if (rrd_write(rrd_file, rrd->cdp_prep,
2100 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2101 rrd->stat_head->ds_cnt)
2102 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2103 rrd->stat_head->ds_cnt)) {
2105 rrd_set_error("rrd_write cdp_prep to rrd");
2109 if (rrd_write(rrd_file, rrd->rra_ptr,
2110 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2111 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2112 rrd_set_error("rrd_write rra_ptr to rrd");