2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
111 unsigned long *skip_update,
112 int *schedule_smooth);
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
124 static int get_time_from_reading(
128 time_t *current_time,
129 unsigned long *current_time_usec,
132 static int update_pdp_prep(
135 rrd_value_t *pdp_new,
138 static int calculate_elapsed_steps(
140 unsigned long current_time,
141 unsigned long current_time_usec,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
163 unsigned long ds_idx,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
200 static void update_cdp(
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
212 static void initialize_cdp_val(
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
245 static int update_aberrant_cdps(
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
270 static int smooth_all_rras(
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
276 static int write_changes_to_disk(
278 rrd_file_t *rrd_file,
283 * normalize time as returned by gettimeofday. usec part must
286 static inline void normalize_time(
289 if (t->tv_usec < 0) {
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
310 *current_time_usec = tmp_time.tv_usec;
312 *current_time_usec = 0;
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
323 rrd_info_t *result = NULL;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
333 opterr = 0; /* initialize getopt */
336 int option_index = 0;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
388 int option_index = 0;
392 char *opt_daemon = NULL;
395 opterr = 0; /* initialize getopt */
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
405 tmplt = strdup(optarg);
409 if (opt_daemon != NULL)
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
414 rrd_set_error("strdup failed.");
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
438 rrd_set_error("The caching daemon cannot be used together with "
443 if (! rrdc_is_connected(opt_daemon))
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
448 else /* we are connected */
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (void *) (argv + optind + 1)); /* values */
455 rrd_set_error("Failed sending the values to rrdcached: %s",
458 : rrd_strerror (rc));
468 if (opt_daemon != NULL)
477 const char *filename,
482 return _rrd_update(filename, tmplt, argc, argv, NULL);
486 const char *filename,
490 rrd_info_t * pcdp_summary)
495 unsigned long rra_begin; /* byte pointer to the rra
496 * area in the rrd file. this
497 * pointer never changes value */
498 rrd_value_t *pdp_new; /* prepare the incoming data to be added
499 * to the existing entry */
500 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
501 * to the cdp values */
503 long *tmpl_idx; /* index representing the settings
504 * transported by the tmplt index */
505 unsigned long tmpl_cnt = 2; /* time and data */
507 time_t current_time = 0;
508 unsigned long current_time_usec = 0; /* microseconds part of current time */
510 int schedule_smooth = 0;
512 /* number of elapsed PDP steps since last update */
513 unsigned long *rra_step_cnt = NULL;
515 int version; /* rrd version */
516 rrd_file_t *rrd_file;
517 char *arg_copy; /* for processing the argv */
518 unsigned long *skip_update; /* RRAs to advance but not write */
520 /* need at least 1 arguments: data. */
522 rrd_set_error("Not enough arguments");
526 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
529 /* We are now at the beginning of the rra's */
530 rra_begin = rrd_file->header_len;
532 version = atoi(rrd.stat_head->version);
534 initialize_time(¤t_time, ¤t_time_usec, version);
536 /* get exclusive lock to whole file.
537 * lock gets removed when we close the file.
539 if (rrd_lock(rrd_file) != 0) {
540 rrd_set_error("could not lock RRD");
544 if (allocate_data_structures(&rrd, &updvals,
545 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
546 &rra_step_cnt, &skip_update,
551 /* loop through the arguments. */
552 for (arg_i = 0; arg_i < argc; arg_i++) {
553 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
554 rrd_set_error("failed duplication argv entry");
557 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
558 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
559 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
560 &pcdp_summary, version, skip_update,
561 &schedule_smooth) == -1) {
562 if (rrd_test_error()) { /* Should have error string always here */
565 /* Prepend file name to error message */
566 if ((save_error = strdup(rrd_get_error())) != NULL) {
567 rrd_set_error("%s: %s", filename, save_error);
579 /* if we got here and if there is an error and if the file has not been
580 * written to, then close things up and return. */
581 if (rrd_test_error()) {
582 goto err_free_structures;
585 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
586 goto err_free_structures;
590 /* calling the smoothing code here guarantees at most one smoothing
591 * operation per rrd_update call. Unfortunately, it is possible with bulk
592 * updates, or a long-delayed update for smoothing to occur off-schedule.
593 * This really isn't critical except during the burn-in cycles. */
594 if (schedule_smooth) {
595 smooth_all_rras(&rrd, rrd_file, rra_begin);
598 /* rrd_dontneed(rrd_file,&rrd); */
624 * get exclusive lock to whole file.
625 * lock gets removed when we close the file
627 * returns 0 on success
635 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
638 if (_fstat(file->fd, &st) == 0) {
639 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
646 lock.l_type = F_WRLCK; /* exclusive write lock */
647 lock.l_len = 0; /* whole file */
648 lock.l_start = 0; /* start of file */
649 lock.l_whence = SEEK_SET; /* end of file */
651 rcstat = fcntl(file->fd, F_SETLK, &lock);
659 * Allocate some important arrays used, and initialize the template.
661 * When it returns, either all of the structures are allocated
662 * or none of them are.
664 * Returns 0 on success, -1 on error.
666 static int allocate_data_structures(
669 rrd_value_t **pdp_temp,
672 unsigned long *tmpl_cnt,
673 unsigned long **rra_step_cnt,
674 unsigned long **skip_update,
675 rrd_value_t **pdp_new)
678 if ((*updvals = (char **) malloc(sizeof(char *)
679 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
680 rrd_set_error("allocating updvals pointer array.");
683 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
684 * rrd->stat_head->ds_cnt)) ==
686 rrd_set_error("allocating pdp_temp.");
687 goto err_free_updvals;
689 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
691 rrd->stat_head->rra_cnt)) ==
693 rrd_set_error("allocating skip_update.");
694 goto err_free_pdp_temp;
696 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
697 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
698 rrd_set_error("allocating tmpl_idx.");
699 goto err_free_skip_update;
701 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
704 rra_cnt))) == NULL) {
705 rrd_set_error("allocating rra_step_cnt.");
706 goto err_free_tmpl_idx;
709 /* initialize tmplt redirector */
710 /* default config example (assume DS 1 is a CDEF DS)
711 tmpl_idx[0] -> 0; (time)
712 tmpl_idx[1] -> 1; (DS 0)
713 tmpl_idx[2] -> 3; (DS 2)
714 tmpl_idx[3] -> 4; (DS 3) */
715 (*tmpl_idx)[0] = 0; /* time */
716 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
717 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
718 (*tmpl_idx)[ii++] = i;
723 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
724 goto err_free_rra_step_cnt;
728 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
729 * rrd->stat_head->ds_cnt)) == NULL) {
730 rrd_set_error("allocating pdp_new.");
731 goto err_free_rra_step_cnt;
736 err_free_rra_step_cnt:
740 err_free_skip_update:
750 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
752 * Returns 0 on success.
754 static int parse_template(
757 unsigned long *tmpl_cnt,
760 char *dsname, *tmplt_copy;
761 unsigned int tmpl_len, i;
764 *tmpl_cnt = 1; /* the first entry is the time */
766 /* we should work on a writeable copy here */
767 if ((tmplt_copy = strdup(tmplt)) == NULL) {
768 rrd_set_error("error copying tmplt '%s'", tmplt);
774 tmpl_len = strlen(tmplt_copy);
775 for (i = 0; i <= tmpl_len; i++) {
776 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
777 tmplt_copy[i] = '\0';
778 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
779 rrd_set_error("tmplt contains more DS definitions than RRD");
781 goto out_free_tmpl_copy;
783 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
784 rrd_set_error("unknown DS name '%s'", dsname);
786 goto out_free_tmpl_copy;
788 /* go to the next entry on the tmplt_copy */
790 dsname = &tmplt_copy[i + 1];
800 * Parse an update string, updates the primary data points (PDPs)
801 * and consolidated data points (CDPs), and writes changes to the RRAs.
803 * Returns 0 on success, -1 on error.
805 static int process_arg(
808 rrd_file_t *rrd_file,
809 unsigned long rra_begin,
810 time_t *current_time,
811 unsigned long *current_time_usec,
812 rrd_value_t *pdp_temp,
813 rrd_value_t *pdp_new,
814 unsigned long *rra_step_cnt,
817 unsigned long tmpl_cnt,
818 rrd_info_t ** pcdp_summary,
820 unsigned long *skip_update,
821 int *schedule_smooth)
823 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
825 /* a vector of future Holt-Winters seasonal coefs */
826 unsigned long elapsed_pdp_st;
828 double interval, pre_int, post_int; /* interval between this and
830 unsigned long proc_pdp_cnt;
832 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
833 current_time, current_time_usec, version) == -1) {
837 interval = (double) (*current_time - rrd->live_head->last_up)
838 + (double) ((long) *current_time_usec -
839 (long) rrd->live_head->last_up_usec) / 1e6f;
841 /* process the data sources and update the pdp_prep
842 * area accordingly */
843 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
847 elapsed_pdp_st = calculate_elapsed_steps(rrd,
849 *current_time_usec, interval,
853 /* has a pdp_st moment occurred since the last run ? */
854 if (elapsed_pdp_st == 0) {
855 /* no we have not passed a pdp_st moment. therefore update is simple */
856 simple_update(rrd, interval, pdp_new);
858 /* an pdp_st has occurred. */
859 if (process_all_pdp_st(rrd, interval,
861 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
864 if (update_all_cdp_prep(rrd, rra_step_cnt,
871 skip_update, schedule_smooth) == -1) {
872 goto err_free_coefficients;
874 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
875 elapsed_pdp_st, pdp_temp,
876 &seasonal_coef) == -1) {
877 goto err_free_coefficients;
879 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
880 *current_time, skip_update,
881 pcdp_summary) == -1) {
882 goto err_free_coefficients;
884 } /* endif a pdp_st has occurred */
885 rrd->live_head->last_up = *current_time;
886 rrd->live_head->last_up_usec = *current_time_usec;
889 *rrd->legacy_last_up = rrd->live_head->last_up;
892 free(last_seasonal_coef);
895 err_free_coefficients:
897 free(last_seasonal_coef);
902 * Parse a DS string (time + colon-separated values), storing the
903 * results in current_time, current_time_usec, and updvals.
905 * Returns 0 on success, -1 on error.
912 unsigned long tmpl_cnt,
913 time_t *current_time,
914 unsigned long *current_time_usec,
922 /* initialize all ds input to unknown except the first one
923 which has always got to be set */
924 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
927 /* separate all ds elements; first must be examined separately
928 due to alternate time syntax */
929 if ((p = strchr(input, '@')) != NULL) {
931 } else if ((p = strchr(input, ':')) != NULL) {
934 rrd_set_error("expected timestamp not found in data source from %s",
940 updvals[tmpl_idx[i++]] = p + 1;
945 updvals[tmpl_idx[i++]] = p + 1;
951 rrd_set_error("expected %lu data source readings (got %lu) from %s",
952 tmpl_cnt - 1, i, input);
956 if (get_time_from_reading(rrd, timesyntax, updvals,
957 current_time, current_time_usec,
965 * Parse the time in a DS string, store it in current_time and
966 * current_time_usec and verify that it's later than the last
967 * update for this DS.
969 * Returns 0 on success, -1 on error.
971 static int get_time_from_reading(
975 time_t *current_time,
976 unsigned long *current_time_usec,
980 char *parsetime_error = NULL;
982 rrd_time_value_t ds_tv;
983 struct timeval tmp_time; /* used for time conversion */
985 /* get the time from the reading ... handle N */
986 if (timesyntax == '@') { /* at-style */
987 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
988 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
991 if (ds_tv.type == RELATIVE_TO_END_TIME ||
992 ds_tv.type == RELATIVE_TO_START_TIME) {
993 rrd_set_error("specifying time relative to the 'start' "
994 "or 'end' makes no sense here: %s", updvals[0]);
997 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
998 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
999 } else if (strcmp(updvals[0], "N") == 0) {
1000 gettimeofday(&tmp_time, 0);
1001 normalize_time(&tmp_time);
1002 *current_time = tmp_time.tv_sec;
1003 *current_time_usec = tmp_time.tv_usec;
1005 old_locale = setlocale(LC_NUMERIC, "C");
1006 tmp = strtod(updvals[0], 0);
1007 setlocale(LC_NUMERIC, old_locale);
1008 *current_time = floor(tmp);
1009 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1011 /* dont do any correction for old version RRDs */
1013 *current_time_usec = 0;
1015 if (*current_time < rrd->live_head->last_up ||
1016 (*current_time == rrd->live_head->last_up &&
1017 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1018 rrd_set_error("illegal attempt to update using time %ld when "
1019 "last update time is %ld (minimum one second step)",
1020 *current_time, rrd->live_head->last_up);
1027 * Update pdp_new by interpreting the updvals according to the DS type
1028 * (COUNTER, GAUGE, etc.).
1030 * Returns 0 on success, -1 on error.
1032 static int update_pdp_prep(
1035 rrd_value_t *pdp_new,
1038 unsigned long ds_idx;
1040 char *endptr; /* used in the conversion */
1043 enum dst_en dst_idx;
1045 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1046 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1048 /* make sure we do not build diffs with old last_ds values */
1049 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1050 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1051 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1054 /* NOTE: DST_CDEF should never enter this if block, because
1055 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1056 * accidently specified a value for the DST_CDEF. To handle this case,
1057 * an extra check is required. */
1059 if ((updvals[ds_idx + 1][0] != 'U') &&
1060 (dst_idx != DST_CDEF) &&
1061 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1064 /* pdp_new contains rate * time ... eg the bytes transferred during
1065 * the interval. Doing it this way saves a lot of math operations
1070 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1071 if ((updvals[ds_idx + 1][ii] < '0'
1072 || updvals[ds_idx + 1][ii] > '9')
1073 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1074 rrd_set_error("not a simple integer: '%s'",
1075 updvals[ds_idx + 1]);
1079 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1081 rrd_diff(updvals[ds_idx + 1],
1082 rrd->pdp_prep[ds_idx].last_ds);
1083 if (dst_idx == DST_COUNTER) {
1084 /* simple overflow catcher. This will fail
1085 * terribly for non 32 or 64 bit counters
1086 * ... are there any others in SNMP land?
1088 if (pdp_new[ds_idx] < (double) 0.0)
1089 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1090 if (pdp_new[ds_idx] < (double) 0.0)
1091 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1093 rate = pdp_new[ds_idx] / interval;
1095 pdp_new[ds_idx] = DNAN;
1099 old_locale = setlocale(LC_NUMERIC, "C");
1101 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1102 setlocale(LC_NUMERIC, old_locale);
1104 rrd_set_error("converting '%s' to float: %s",
1105 updvals[ds_idx + 1], rrd_strerror(errno));
1108 if (endptr[0] != '\0') {
1110 ("conversion of '%s' to float not complete: tail '%s'",
1111 updvals[ds_idx + 1], endptr);
1114 rate = pdp_new[ds_idx] / interval;
1118 old_locale = setlocale(LC_NUMERIC, "C");
1120 strtod(updvals[ds_idx + 1], &endptr) * interval;
1121 setlocale(LC_NUMERIC, old_locale);
1123 rrd_set_error("converting '%s' to float: %s",
1124 updvals[ds_idx + 1], rrd_strerror(errno));
1127 if (endptr[0] != '\0') {
1129 ("conversion of '%s' to float not complete: tail '%s'",
1130 updvals[ds_idx + 1], endptr);
1133 rate = pdp_new[ds_idx] / interval;
1136 rrd_set_error("rrd contains unknown DS type : '%s'",
1137 rrd->ds_def[ds_idx].dst);
1140 /* break out of this for loop if the error string is set */
1141 if (rrd_test_error()) {
1144 /* make sure pdp_temp is neither too large or too small
1145 * if any of these occur it becomes unknown ...
1146 * sorry folks ... */
1148 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1149 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1150 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1151 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1152 pdp_new[ds_idx] = DNAN;
1155 /* no news is news all the same */
1156 pdp_new[ds_idx] = DNAN;
1160 /* make a copy of the command line argument for the next run */
1162 fprintf(stderr, "prep ds[%lu]\t"
1166 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1169 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1171 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1177 * How many PDP steps have elapsed since the last update? Returns the answer,
1178 * and stores the time between the last update and the last PDP in pre_time,
1179 * and the time between the last PDP and the current time in post_int.
1181 static int calculate_elapsed_steps(
1183 unsigned long current_time,
1184 unsigned long current_time_usec,
1188 unsigned long *proc_pdp_cnt)
1190 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1191 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1193 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1194 * when it was last updated */
1195 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1197 /* when was the current pdp started */
1198 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1199 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1201 /* when did the last pdp_st occur */
1202 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1203 occu_pdp_st = current_time - occu_pdp_age;
1205 if (occu_pdp_st > proc_pdp_st) {
1206 /* OK we passed the pdp_st moment */
1207 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1208 * occurred before the latest
1210 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1211 *post_int = occu_pdp_age; /* how much after it */
1212 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1214 *pre_int = interval;
1218 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1221 printf("proc_pdp_age %lu\t"
1223 "occu_pfp_age %lu\t"
1227 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1228 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1231 /* compute the number of elapsed pdp_st moments */
1232 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1236 * Increment the PDP values by the values in pdp_new, or else initialize them.
1238 static void simple_update(
1241 rrd_value_t *pdp_new)
1245 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1246 if (isnan(pdp_new[i])) {
1247 /* this is not really accurate if we use subsecond data arrival time
1248 should have thought of it when going subsecond resolution ...
1249 sorry next format change we will have it! */
1250 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1253 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1254 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1256 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1265 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1266 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1272 * Call process_pdp_st for each DS.
1274 * Returns 0 on success, -1 on error.
1276 static int process_all_pdp_st(
1281 unsigned long elapsed_pdp_st,
1282 rrd_value_t *pdp_new,
1283 rrd_value_t *pdp_temp)
1285 unsigned long ds_idx;
1287 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1288 rate*seconds which occurred up to the last run.
1289 pdp_new[] contains rate*seconds from the latest run.
1290 pdp_temp[] will contain the rate for cdp */
1292 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1293 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1294 elapsed_pdp_st * rrd->stat_head->pdp_step,
1295 pdp_new, pdp_temp) == -1) {
1299 fprintf(stderr, "PDP UPD ds[%lu]\t"
1300 "elapsed_pdp_st %lu\t"
1303 "new_unkn_sec %5lu\n",
1307 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1308 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1315 * Process an update that occurs after one of the PDP moments.
1316 * Increments the PDP value, sets NAN if time greater than the
1317 * heartbeats have elapsed, processes CDEFs.
1319 * Returns 0 on success, -1 on error.
1321 static int process_pdp_st(
1323 unsigned long ds_idx,
1327 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1328 rrd_value_t *pdp_new,
1329 rrd_value_t *pdp_temp)
1333 /* update pdp_prep to the current pdp_st. */
1334 double pre_unknown = 0.0;
1335 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1336 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1338 rpnstack_t rpnstack; /* used for COMPUTE DS */
1340 rpnstack_init(&rpnstack);
1343 if (isnan(pdp_new[ds_idx])) {
1344 /* a final bit of unknown to be added before calculation
1345 we use a temporary variable for this so that we
1346 don't have to turn integer lines before using the value */
1347 pre_unknown = pre_int;
1349 if (isnan(scratch[PDP_val].u_val)) {
1350 scratch[PDP_val].u_val = 0;
1352 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1355 /* if too much of the pdp_prep is unknown we dump it */
1356 /* if the interval is larger thatn mrhb we get NAN */
1357 if ((interval > mrhb) ||
1358 (rrd->stat_head->pdp_step / 2.0 <
1359 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1360 pdp_temp[ds_idx] = DNAN;
1362 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1363 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1367 /* process CDEF data sources; remember each CDEF DS can
1368 * only reference other DS with a lower index number */
1369 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1373 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1374 /* substitute data values for OP_VARIABLE nodes */
1375 for (i = 0; rpnp[i].op != OP_END; i++) {
1376 if (rpnp[i].op == OP_VARIABLE) {
1377 rpnp[i].op = OP_NUMBER;
1378 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1381 /* run the rpn calculator */
1382 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1384 rpnstack_free(&rpnstack);
1389 /* make pdp_prep ready for the next run */
1390 if (isnan(pdp_new[ds_idx])) {
1391 /* this is not realy accurate if we use subsecond data arival time
1392 should have thought of it when going subsecond resolution ...
1393 sorry next format change we will have it! */
1394 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1395 scratch[PDP_val].u_val = DNAN;
1397 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1398 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1400 rpnstack_free(&rpnstack);
1405 * Iterate over all the RRAs for a given DS and:
1406 * 1. Decide whether to schedule a smooth later
1407 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1410 * Returns 0 on success, -1 on error
1412 static int update_all_cdp_prep(
1414 unsigned long *rra_step_cnt,
1415 unsigned long rra_begin,
1416 rrd_file_t *rrd_file,
1417 unsigned long elapsed_pdp_st,
1418 unsigned long proc_pdp_cnt,
1419 rrd_value_t **last_seasonal_coef,
1420 rrd_value_t **seasonal_coef,
1421 rrd_value_t *pdp_temp,
1422 unsigned long *skip_update,
1423 int *schedule_smooth)
1425 unsigned long rra_idx;
1427 /* index into the CDP scratch array */
1428 enum cf_en current_cf;
1429 unsigned long rra_start;
1431 /* number of rows to be updated in an RRA for a data value. */
1432 unsigned long start_pdp_offset;
1434 rra_start = rra_begin;
1435 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1436 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1438 rrd->rra_def[rra_idx].pdp_cnt -
1439 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1440 skip_update[rra_idx] = 0;
1441 if (start_pdp_offset <= elapsed_pdp_st) {
1442 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1443 rrd->rra_def[rra_idx].pdp_cnt + 1;
1445 rra_step_cnt[rra_idx] = 0;
1448 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1449 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1450 * so that they will be correct for the next observed value; note that for
1451 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1452 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1453 if (rra_step_cnt[rra_idx] > 1) {
1454 skip_update[rra_idx] = 1;
1455 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1456 elapsed_pdp_st, last_seasonal_coef);
1457 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1458 elapsed_pdp_st + 1, seasonal_coef);
1460 /* periodically run a smoother for seasonal effects */
1461 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1464 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1465 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1466 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1469 *schedule_smooth = 1;
1472 if (rrd_test_error())
1476 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1477 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1478 current_cf) == -1) {
1482 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1483 sizeof(rrd_value_t);
1489 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1491 static int do_schedule_smooth(
1493 unsigned long rra_idx,
1494 unsigned long elapsed_pdp_st)
1496 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1497 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1498 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1499 unsigned long seasonal_smooth_idx =
1500 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1501 unsigned long *init_seasonal =
1502 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1504 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1505 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1506 * really an RRA level, not a data source within RRA level parameter, but
1507 * the rra_def is read only for rrd_update (not flushed to disk). */
1508 if (*init_seasonal > BURNIN_CYCLES) {
1509 /* someone has no doubt invented a trick to deal with this wrap around,
1510 * but at least this code is clear. */
1511 if (seasonal_smooth_idx > cur_row) {
1512 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1513 * between PDP and CDP */
1514 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1516 /* can't rely on negative numbers because we are working with
1517 * unsigned values */
1518 return (cur_row + elapsed_pdp_st >= row_cnt
1519 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1521 /* mark off one of the burn-in cycles */
1522 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1526 * For a given RRA, iterate over the data sources and call the appropriate
1527 * consolidation function.
1529 * Returns 0 on success, -1 on error.
1531 static int update_cdp_prep(
1533 unsigned long elapsed_pdp_st,
1534 unsigned long start_pdp_offset,
1535 unsigned long *rra_step_cnt,
1537 rrd_value_t *pdp_temp,
1538 rrd_value_t *last_seasonal_coef,
1539 rrd_value_t *seasonal_coef,
1542 unsigned long ds_idx, cdp_idx;
1544 /* update CDP_PREP areas */
1545 /* loop over data soures within each RRA */
1546 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1548 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1550 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1551 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1552 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1553 elapsed_pdp_st, start_pdp_offset,
1554 rrd->rra_def[rra_idx].pdp_cnt,
1555 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1558 /* Nothing to consolidate if there's one PDP per CDP. However, if
1559 * we've missed some PDPs, let's update null counters etc. */
1560 if (elapsed_pdp_st > 2) {
1561 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1562 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1567 if (rrd_test_error())
1569 } /* endif data sources loop */
1574 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1575 * primary value, secondary value, and # of unknowns.
1577 static void update_cdp(
1580 rrd_value_t pdp_temp_val,
1581 unsigned long rra_step_cnt,
1582 unsigned long elapsed_pdp_st,
1583 unsigned long start_pdp_offset,
1584 unsigned long pdp_cnt,
1589 /* shorthand variables */
1590 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1591 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1592 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1593 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1596 /* If we are in this block, as least 1 CDP value will be written to
1597 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1598 * to be written, then the "fill in" value is the CDP_secondary_val
1600 if (isnan(pdp_temp_val)) {
1601 *cdp_unkn_pdp_cnt += start_pdp_offset;
1602 *cdp_secondary_val = DNAN;
1604 /* CDP_secondary value is the RRA "fill in" value for intermediary
1605 * CDP data entries. No matter the CF, the value is the same because
1606 * the average, max, min, and last of a list of identical values is
1607 * the same, namely, the value itself. */
1608 *cdp_secondary_val = pdp_temp_val;
1611 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1612 *cdp_primary_val = DNAN;
1613 if (current_cf == CF_AVERAGE) {
1615 initialize_average_carry_over(pdp_temp_val,
1617 start_pdp_offset, pdp_cnt);
1619 *cdp_val = pdp_temp_val;
1622 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1623 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1624 } /* endif meets xff value requirement for a valid value */
1625 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1626 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1627 if (isnan(pdp_temp_val))
1628 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1630 *cdp_unkn_pdp_cnt = 0;
1631 } else { /* rra_step_cnt[i] == 0 */
1634 if (isnan(*cdp_val)) {
1635 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1638 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1642 if (isnan(pdp_temp_val)) {
1643 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1646 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1653 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1654 * on the type of consolidation function.
1656 static void initialize_cdp_val(
1659 rrd_value_t pdp_temp_val,
1660 unsigned long elapsed_pdp_st,
1661 unsigned long start_pdp_offset,
1662 unsigned long pdp_cnt)
1664 rrd_value_t cum_val, cur_val;
1666 switch (current_cf) {
1668 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1669 cur_val = IFDNAN(pdp_temp_val, 0.0);
1670 scratch[CDP_primary_val].u_val =
1671 (cum_val + cur_val * start_pdp_offset) /
1672 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1673 scratch[CDP_val].u_val =
1674 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1675 start_pdp_offset, pdp_cnt);
1678 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1679 cur_val = IFDNAN(pdp_temp_val, -DINF);
1682 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1684 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1690 if (cur_val > cum_val)
1691 scratch[CDP_primary_val].u_val = cur_val;
1693 scratch[CDP_primary_val].u_val = cum_val;
1694 /* initialize carry over value */
1695 scratch[CDP_val].u_val = pdp_temp_val;
1698 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1699 cur_val = IFDNAN(pdp_temp_val, DINF);
1702 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1704 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1710 if (cur_val < cum_val)
1711 scratch[CDP_primary_val].u_val = cur_val;
1713 scratch[CDP_primary_val].u_val = cum_val;
1714 /* initialize carry over value */
1715 scratch[CDP_val].u_val = pdp_temp_val;
1719 scratch[CDP_primary_val].u_val = pdp_temp_val;
1720 /* initialize carry over value */
1721 scratch[CDP_val].u_val = pdp_temp_val;
1727 * Update the consolidation function for Holt-Winters functions as
1728 * well as other functions that don't actually consolidate multiple
1731 static void reset_cdp(
1733 unsigned long elapsed_pdp_st,
1734 rrd_value_t *pdp_temp,
1735 rrd_value_t *last_seasonal_coef,
1736 rrd_value_t *seasonal_coef,
1740 enum cf_en current_cf)
1742 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1744 switch (current_cf) {
1747 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1748 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1751 case CF_DEVSEASONAL:
1752 /* need to update cached seasonal values, so they are consistent
1753 * with the bulk update */
1754 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1755 * CDP_last_deviation are the same. */
1756 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1757 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1761 /* need to update the null_count and last_null_count.
1762 * even do this for non-DNAN pdp_temp because the
1763 * algorithm is not learning from batch updates. */
1764 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1765 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1768 scratch[CDP_primary_val].u_val = DNAN;
1769 scratch[CDP_secondary_val].u_val = DNAN;
1772 /* do not count missed bulk values as failures */
1773 scratch[CDP_primary_val].u_val = 0;
1774 scratch[CDP_secondary_val].u_val = 0;
1775 /* need to reset violations buffer.
1776 * could do this more carefully, but for now, just
1777 * assume a bulk update wipes away all violations. */
1778 erase_violations(rrd, cdp_idx, rra_idx);
1783 static rrd_value_t initialize_average_carry_over(
1784 rrd_value_t pdp_temp_val,
1785 unsigned long elapsed_pdp_st,
1786 unsigned long start_pdp_offset,
1787 unsigned long pdp_cnt)
1789 /* initialize carry over value */
1790 if (isnan(pdp_temp_val)) {
1793 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1797 * Update or initialize a CDP value based on the consolidation
1800 * Returns the new value.
1802 static rrd_value_t calculate_cdp_val(
1803 rrd_value_t cdp_val,
1804 rrd_value_t pdp_temp_val,
1805 unsigned long elapsed_pdp_st,
1816 if (isnan(cdp_val)) {
1817 if (current_cf == CF_AVERAGE) {
1818 pdp_temp_val *= elapsed_pdp_st;
1821 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1822 i, ii, pdp_temp_val);
1824 return pdp_temp_val;
1826 if (current_cf == CF_AVERAGE)
1827 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1828 if (current_cf == CF_MINIMUM)
1829 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1830 if (current_cf == CF_MAXIMUM)
1831 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1833 return pdp_temp_val;
1837 * For each RRA, update the seasonal values and then call update_aberrant_CF
1838 * for each data source.
1840 * Return 0 on success, -1 on error.
1842 static int update_aberrant_cdps(
1844 rrd_file_t *rrd_file,
1845 unsigned long rra_begin,
1846 unsigned long elapsed_pdp_st,
1847 rrd_value_t *pdp_temp,
1848 rrd_value_t **seasonal_coef)
1850 unsigned long rra_idx, ds_idx, j;
1852 /* number of PDP steps since the last update that
1853 * are assigned to the first CDP to be generated
1854 * since the last update. */
1855 unsigned short scratch_idx;
1856 unsigned long rra_start;
1857 enum cf_en current_cf;
1859 /* this loop is only entered if elapsed_pdp_st < 3 */
1860 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1861 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1862 rra_start = rra_begin;
1863 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1864 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1865 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1866 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1867 if (scratch_idx == CDP_primary_val) {
1868 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1869 elapsed_pdp_st + 1, seasonal_coef);
1871 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1872 elapsed_pdp_st + 2, seasonal_coef);
1875 if (rrd_test_error())
1877 /* loop over data soures within each RRA */
1878 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1879 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1880 rra_idx * (rrd->stat_head->ds_cnt) +
1881 ds_idx, rra_idx, ds_idx, scratch_idx,
1885 rra_start += rrd->rra_def[rra_idx].row_cnt
1886 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1893 * Move sequentially through the file, writing one RRA at a time. Note this
1894 * architecture divorces the computation of CDP with flushing updated RRA
1897 * Return 0 on success, -1 on error.
1899 static int write_to_rras(
1901 rrd_file_t *rrd_file,
1902 unsigned long *rra_step_cnt,
1903 unsigned long rra_begin,
1904 time_t current_time,
1905 unsigned long *skip_update,
1906 rrd_info_t ** pcdp_summary)
1908 unsigned long rra_idx;
1909 unsigned long rra_start;
1910 time_t rra_time = 0; /* time of update for a RRA */
1912 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1914 /* Ready to write to disk */
1915 rra_start = rra_begin;
1917 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1918 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1919 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1922 unsigned short scratch_idx;
1923 unsigned long step_subtract;
1925 for (scratch_idx = CDP_primary_val,
1927 rra_step_cnt[rra_idx] > 0;
1928 rra_step_cnt[rra_idx]--,
1929 scratch_idx = CDP_secondary_val,
1930 step_subtract = 2) {
1934 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1936 /* increment, with wrap-around */
1937 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1938 rra_ptr->cur_row = 0;
1940 /* we know what our position should be */
1941 rra_pos_new = rra_start
1942 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1944 /* re-seek if the position is wrong or we wrapped around */
1945 if (rra_pos_new != rrd_file->pos) {
1946 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1947 rrd_set_error("seek error in rrd");
1952 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1955 if (skip_update[rra_idx])
1958 if (*pcdp_summary != NULL) {
1959 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1961 rra_time = (current_time - current_time % step_time)
1962 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1966 (rrd_file, rrd, rra_idx, scratch_idx,
1967 pcdp_summary, rra_time) == -1)
1971 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1978 * Write out one row of values (one value per DS) to the archive.
1980 * Returns 0 on success, -1 on error.
1982 static int write_RRA_row(
1983 rrd_file_t *rrd_file,
1985 unsigned long rra_idx,
1986 unsigned short CDP_scratch_idx,
1987 rrd_info_t ** pcdp_summary,
1990 unsigned long ds_idx, cdp_idx;
1993 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1994 /* compute the cdp index */
1995 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1997 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1998 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1999 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2001 if (*pcdp_summary != NULL) {
2002 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2003 /* append info to the return hash */
2004 *pcdp_summary = rrd_info_push(*pcdp_summary,
2006 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2007 rrd->rra_def[rra_idx].cf_nam,
2008 rrd->rra_def[rra_idx].pdp_cnt,
2009 rrd->ds_def[ds_idx].ds_nam),
2012 if (rrd_write(rrd_file,
2013 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2014 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2015 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2023 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2025 * Returns 0 on success, -1 otherwise
2027 static int smooth_all_rras(
2029 rrd_file_t *rrd_file,
2030 unsigned long rra_begin)
2032 unsigned long rra_start = rra_begin;
2033 unsigned long rra_idx;
2035 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2036 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2037 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2039 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2041 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2042 if (rrd_test_error())
2045 rra_start += rrd->rra_def[rra_idx].row_cnt
2046 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2053 * Flush changes to disk (unless we're using mmap)
2055 * Returns 0 on success, -1 otherwise
2057 static int write_changes_to_disk(
2059 rrd_file_t *rrd_file,
2062 /* we just need to write back the live header portion now */
2063 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2064 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2065 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2067 rrd_set_error("seek rrd for live header writeback");
2071 if (rrd_write(rrd_file, rrd->live_head,
2072 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2073 rrd_set_error("rrd_write live_head to rrd");
2077 if (rrd_write(rrd_file, rrd->legacy_last_up,
2078 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2079 rrd_set_error("rrd_write live_head to rrd");
2085 if (rrd_write(rrd_file, rrd->pdp_prep,
2086 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2087 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2088 rrd_set_error("rrd_write pdp_prep to rrd");
2092 if (rrd_write(rrd_file, rrd->cdp_prep,
2093 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2094 rrd->stat_head->ds_cnt)
2095 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2096 rrd->stat_head->ds_cnt)) {
2098 rrd_set_error("rrd_write cdp_prep to rrd");
2102 if (rrd_write(rrd_file, rrd->rra_ptr,
2103 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2104 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2105 rrd_set_error("rrd_write rra_ptr to rrd");