2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
111 unsigned long *skip_update,
112 int *schedule_smooth);
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
124 static int get_time_from_reading(
128 time_t *current_time,
129 unsigned long *current_time_usec,
132 static int update_pdp_prep(
135 rrd_value_t *pdp_new,
138 static int calculate_elapsed_steps(
140 unsigned long current_time,
141 unsigned long current_time_usec,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
163 unsigned long ds_idx,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
200 static void update_cdp(
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
212 static void initialize_cdp_val(
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
245 static int update_aberrant_cdps(
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
270 static int smooth_all_rras(
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
276 static int write_changes_to_disk(
278 rrd_file_t *rrd_file,
283 * normalize time as returned by gettimeofday. usec part must
286 static inline void normalize_time(
289 if (t->tv_usec < 0) {
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
310 *current_time_usec = tmp_time.tv_usec;
312 *current_time_usec = 0;
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
323 rrd_info_t *result = NULL;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
333 opterr = 0; /* initialize getopt */
336 int option_index = 0;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
388 int option_index = 0;
392 char *opt_daemon = NULL;
395 opterr = 0; /* initialize getopt */
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
405 tmplt = strdup(optarg);
409 if (opt_daemon != NULL)
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
414 rrd_set_error("strdup failed.");
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
438 rrd_set_error("The caching daemon cannot be used together with "
443 if (! rrdc_is_connected(opt_daemon))
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
448 else /* we are connected */
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (void *) (argv + optind + 1)); /* values */
454 rrd_set_error("Failed sending the values to rrdcached: %s",
464 if (opt_daemon != NULL)
473 const char *filename,
478 return _rrd_update(filename, tmplt, argc, argv, NULL);
482 const char *filename,
486 rrd_info_t * pcdp_summary)
491 unsigned long rra_begin; /* byte pointer to the rra
492 * area in the rrd file. this
493 * pointer never changes value */
494 rrd_value_t *pdp_new; /* prepare the incoming data to be added
495 * to the existing entry */
496 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
497 * to the cdp values */
499 long *tmpl_idx; /* index representing the settings
500 * transported by the tmplt index */
501 unsigned long tmpl_cnt = 2; /* time and data */
503 time_t current_time = 0;
504 unsigned long current_time_usec = 0; /* microseconds part of current time */
506 int schedule_smooth = 0;
508 /* number of elapsed PDP steps since last update */
509 unsigned long *rra_step_cnt = NULL;
511 int version; /* rrd version */
512 rrd_file_t *rrd_file;
513 char *arg_copy; /* for processing the argv */
514 unsigned long *skip_update; /* RRAs to advance but not write */
516 /* need at least 1 arguments: data. */
518 rrd_set_error("Not enough arguments");
522 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
525 /* We are now at the beginning of the rra's */
526 rra_begin = rrd_file->header_len;
528 version = atoi(rrd.stat_head->version);
530 initialize_time(¤t_time, ¤t_time_usec, version);
532 /* get exclusive lock to whole file.
533 * lock gets removed when we close the file.
535 if (rrd_lock(rrd_file) != 0) {
536 rrd_set_error("could not lock RRD");
540 if (allocate_data_structures(&rrd, &updvals,
541 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542 &rra_step_cnt, &skip_update,
547 /* loop through the arguments. */
548 for (arg_i = 0; arg_i < argc; arg_i++) {
549 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550 rrd_set_error("failed duplication argv entry");
553 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
555 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556 &pcdp_summary, version, skip_update,
557 &schedule_smooth) == -1) {
558 if (rrd_test_error()) { /* Should have error string always here */
561 /* Prepend file name to error message */
562 if ((save_error = strdup(rrd_get_error())) != NULL) {
563 rrd_set_error("%s: %s", filename, save_error);
575 /* if we got here and if there is an error and if the file has not been
576 * written to, then close things up and return. */
577 if (rrd_test_error()) {
578 goto err_free_structures;
581 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582 goto err_free_structures;
586 /* calling the smoothing code here guarantees at most one smoothing
587 * operation per rrd_update call. Unfortunately, it is possible with bulk
588 * updates, or a long-delayed update for smoothing to occur off-schedule.
589 * This really isn't critical except during the burn-in cycles. */
590 if (schedule_smooth) {
591 smooth_all_rras(&rrd, rrd_file, rra_begin);
594 /* rrd_dontneed(rrd_file,&rrd); */
620 * get exclusive lock to whole file.
621 * lock gets removed when we close the file
623 * returns 0 on success
631 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
634 if (_fstat(file->fd, &st) == 0) {
635 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
642 lock.l_type = F_WRLCK; /* exclusive write lock */
643 lock.l_len = 0; /* whole file */
644 lock.l_start = 0; /* start of file */
645 lock.l_whence = SEEK_SET; /* end of file */
647 rcstat = fcntl(file->fd, F_SETLK, &lock);
655 * Allocate some important arrays used, and initialize the template.
657 * When it returns, either all of the structures are allocated
658 * or none of them are.
660 * Returns 0 on success, -1 on error.
662 static int allocate_data_structures(
665 rrd_value_t **pdp_temp,
668 unsigned long *tmpl_cnt,
669 unsigned long **rra_step_cnt,
670 unsigned long **skip_update,
671 rrd_value_t **pdp_new)
674 if ((*updvals = (char **) malloc(sizeof(char *)
675 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
676 rrd_set_error("allocating updvals pointer array.");
679 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
680 * rrd->stat_head->ds_cnt)) ==
682 rrd_set_error("allocating pdp_temp.");
683 goto err_free_updvals;
685 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
687 rrd->stat_head->rra_cnt)) ==
689 rrd_set_error("allocating skip_update.");
690 goto err_free_pdp_temp;
692 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
693 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
694 rrd_set_error("allocating tmpl_idx.");
695 goto err_free_skip_update;
697 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
700 rra_cnt))) == NULL) {
701 rrd_set_error("allocating rra_step_cnt.");
702 goto err_free_tmpl_idx;
705 /* initialize tmplt redirector */
706 /* default config example (assume DS 1 is a CDEF DS)
707 tmpl_idx[0] -> 0; (time)
708 tmpl_idx[1] -> 1; (DS 0)
709 tmpl_idx[2] -> 3; (DS 2)
710 tmpl_idx[3] -> 4; (DS 3) */
711 (*tmpl_idx)[0] = 0; /* time */
712 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
713 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
714 (*tmpl_idx)[ii++] = i;
719 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
720 goto err_free_rra_step_cnt;
724 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
725 * rrd->stat_head->ds_cnt)) == NULL) {
726 rrd_set_error("allocating pdp_new.");
727 goto err_free_rra_step_cnt;
732 err_free_rra_step_cnt:
736 err_free_skip_update:
746 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
748 * Returns 0 on success.
750 static int parse_template(
753 unsigned long *tmpl_cnt,
756 char *dsname, *tmplt_copy;
757 unsigned int tmpl_len, i;
760 *tmpl_cnt = 1; /* the first entry is the time */
762 /* we should work on a writeable copy here */
763 if ((tmplt_copy = strdup(tmplt)) == NULL) {
764 rrd_set_error("error copying tmplt '%s'", tmplt);
770 tmpl_len = strlen(tmplt_copy);
771 for (i = 0; i <= tmpl_len; i++) {
772 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
773 tmplt_copy[i] = '\0';
774 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
775 rrd_set_error("tmplt contains more DS definitions than RRD");
777 goto out_free_tmpl_copy;
779 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
780 rrd_set_error("unknown DS name '%s'", dsname);
782 goto out_free_tmpl_copy;
784 /* go to the next entry on the tmplt_copy */
786 dsname = &tmplt_copy[i + 1];
796 * Parse an update string, updates the primary data points (PDPs)
797 * and consolidated data points (CDPs), and writes changes to the RRAs.
799 * Returns 0 on success, -1 on error.
801 static int process_arg(
804 rrd_file_t *rrd_file,
805 unsigned long rra_begin,
806 time_t *current_time,
807 unsigned long *current_time_usec,
808 rrd_value_t *pdp_temp,
809 rrd_value_t *pdp_new,
810 unsigned long *rra_step_cnt,
813 unsigned long tmpl_cnt,
814 rrd_info_t ** pcdp_summary,
816 unsigned long *skip_update,
817 int *schedule_smooth)
819 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
821 /* a vector of future Holt-Winters seasonal coefs */
822 unsigned long elapsed_pdp_st;
824 double interval, pre_int, post_int; /* interval between this and
826 unsigned long proc_pdp_cnt;
828 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
829 current_time, current_time_usec, version) == -1) {
833 interval = (double) (*current_time - rrd->live_head->last_up)
834 + (double) ((long) *current_time_usec -
835 (long) rrd->live_head->last_up_usec) / 1e6f;
837 /* process the data sources and update the pdp_prep
838 * area accordingly */
839 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
843 elapsed_pdp_st = calculate_elapsed_steps(rrd,
845 *current_time_usec, interval,
849 /* has a pdp_st moment occurred since the last run ? */
850 if (elapsed_pdp_st == 0) {
851 /* no we have not passed a pdp_st moment. therefore update is simple */
852 simple_update(rrd, interval, pdp_new);
854 /* an pdp_st has occurred. */
855 if (process_all_pdp_st(rrd, interval,
857 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
860 if (update_all_cdp_prep(rrd, rra_step_cnt,
867 skip_update, schedule_smooth) == -1) {
868 goto err_free_coefficients;
870 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
871 elapsed_pdp_st, pdp_temp,
872 &seasonal_coef) == -1) {
873 goto err_free_coefficients;
875 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
876 *current_time, skip_update,
877 pcdp_summary) == -1) {
878 goto err_free_coefficients;
880 } /* endif a pdp_st has occurred */
881 rrd->live_head->last_up = *current_time;
882 rrd->live_head->last_up_usec = *current_time_usec;
885 *rrd->legacy_last_up = rrd->live_head->last_up;
888 free(last_seasonal_coef);
891 err_free_coefficients:
893 free(last_seasonal_coef);
898 * Parse a DS string (time + colon-separated values), storing the
899 * results in current_time, current_time_usec, and updvals.
901 * Returns 0 on success, -1 on error.
908 unsigned long tmpl_cnt,
909 time_t *current_time,
910 unsigned long *current_time_usec,
918 /* initialize all ds input to unknown except the first one
919 which has always got to be set */
920 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
923 /* separate all ds elements; first must be examined separately
924 due to alternate time syntax */
925 if ((p = strchr(input, '@')) != NULL) {
927 } else if ((p = strchr(input, ':')) != NULL) {
930 rrd_set_error("expected timestamp not found in data source from %s",
936 updvals[tmpl_idx[i++]] = p + 1;
941 updvals[tmpl_idx[i++]] = p + 1;
947 rrd_set_error("expected %lu data source readings (got %lu) from %s",
948 tmpl_cnt - 1, i, input);
952 if (get_time_from_reading(rrd, timesyntax, updvals,
953 current_time, current_time_usec,
961 * Parse the time in a DS string, store it in current_time and
962 * current_time_usec and verify that it's later than the last
963 * update for this DS.
965 * Returns 0 on success, -1 on error.
967 static int get_time_from_reading(
971 time_t *current_time,
972 unsigned long *current_time_usec,
976 char *parsetime_error = NULL;
978 rrd_time_value_t ds_tv;
979 struct timeval tmp_time; /* used for time conversion */
981 /* get the time from the reading ... handle N */
982 if (timesyntax == '@') { /* at-style */
983 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
984 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
987 if (ds_tv.type == RELATIVE_TO_END_TIME ||
988 ds_tv.type == RELATIVE_TO_START_TIME) {
989 rrd_set_error("specifying time relative to the 'start' "
990 "or 'end' makes no sense here: %s", updvals[0]);
993 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
994 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
995 } else if (strcmp(updvals[0], "N") == 0) {
996 gettimeofday(&tmp_time, 0);
997 normalize_time(&tmp_time);
998 *current_time = tmp_time.tv_sec;
999 *current_time_usec = tmp_time.tv_usec;
1001 old_locale = setlocale(LC_NUMERIC, "C");
1002 tmp = strtod(updvals[0], 0);
1003 setlocale(LC_NUMERIC, old_locale);
1004 *current_time = floor(tmp);
1005 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1007 /* dont do any correction for old version RRDs */
1009 *current_time_usec = 0;
1011 if (*current_time < rrd->live_head->last_up ||
1012 (*current_time == rrd->live_head->last_up &&
1013 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1014 rrd_set_error("illegal attempt to update using time %ld when "
1015 "last update time is %ld (minimum one second step)",
1016 *current_time, rrd->live_head->last_up);
1023 * Update pdp_new by interpreting the updvals according to the DS type
1024 * (COUNTER, GAUGE, etc.).
1026 * Returns 0 on success, -1 on error.
1028 static int update_pdp_prep(
1031 rrd_value_t *pdp_new,
1034 unsigned long ds_idx;
1036 char *endptr; /* used in the conversion */
1039 enum dst_en dst_idx;
1041 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1042 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1044 /* make sure we do not build diffs with old last_ds values */
1045 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1046 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1047 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1050 /* NOTE: DST_CDEF should never enter this if block, because
1051 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1052 * accidently specified a value for the DST_CDEF. To handle this case,
1053 * an extra check is required. */
1055 if ((updvals[ds_idx + 1][0] != 'U') &&
1056 (dst_idx != DST_CDEF) &&
1057 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1060 /* pdp_new contains rate * time ... eg the bytes transferred during
1061 * the interval. Doing it this way saves a lot of math operations
1066 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1067 if ((updvals[ds_idx + 1][ii] < '0'
1068 || updvals[ds_idx + 1][ii] > '9')
1069 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1070 rrd_set_error("not a simple integer: '%s'",
1071 updvals[ds_idx + 1]);
1075 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1077 rrd_diff(updvals[ds_idx + 1],
1078 rrd->pdp_prep[ds_idx].last_ds);
1079 if (dst_idx == DST_COUNTER) {
1080 /* simple overflow catcher. This will fail
1081 * terribly for non 32 or 64 bit counters
1082 * ... are there any others in SNMP land?
1084 if (pdp_new[ds_idx] < (double) 0.0)
1085 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1086 if (pdp_new[ds_idx] < (double) 0.0)
1087 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1089 rate = pdp_new[ds_idx] / interval;
1091 pdp_new[ds_idx] = DNAN;
1095 old_locale = setlocale(LC_NUMERIC, "C");
1097 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1098 setlocale(LC_NUMERIC, old_locale);
1100 rrd_set_error("converting '%s' to float: %s",
1101 updvals[ds_idx + 1], rrd_strerror(errno));
1104 if (endptr[0] != '\0') {
1106 ("conversion of '%s' to float not complete: tail '%s'",
1107 updvals[ds_idx + 1], endptr);
1110 rate = pdp_new[ds_idx] / interval;
1114 old_locale = setlocale(LC_NUMERIC, "C");
1116 strtod(updvals[ds_idx + 1], &endptr) * interval;
1117 setlocale(LC_NUMERIC, old_locale);
1119 rrd_set_error("converting '%s' to float: %s",
1120 updvals[ds_idx + 1], rrd_strerror(errno));
1123 if (endptr[0] != '\0') {
1125 ("conversion of '%s' to float not complete: tail '%s'",
1126 updvals[ds_idx + 1], endptr);
1129 rate = pdp_new[ds_idx] / interval;
1132 rrd_set_error("rrd contains unknown DS type : '%s'",
1133 rrd->ds_def[ds_idx].dst);
1136 /* break out of this for loop if the error string is set */
1137 if (rrd_test_error()) {
1140 /* make sure pdp_temp is neither too large or too small
1141 * if any of these occur it becomes unknown ...
1142 * sorry folks ... */
1144 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148 pdp_new[ds_idx] = DNAN;
1151 /* no news is news all the same */
1152 pdp_new[ds_idx] = DNAN;
1156 /* make a copy of the command line argument for the next run */
1158 fprintf(stderr, "prep ds[%lu]\t"
1162 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1165 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1167 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1173 * How many PDP steps have elapsed since the last update? Returns the answer,
1174 * and stores the time between the last update and the last PDP in pre_time,
1175 * and the time between the last PDP and the current time in post_int.
1177 static int calculate_elapsed_steps(
1179 unsigned long current_time,
1180 unsigned long current_time_usec,
1184 unsigned long *proc_pdp_cnt)
1186 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1187 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1189 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1190 * when it was last updated */
1191 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1193 /* when was the current pdp started */
1194 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1197 /* when did the last pdp_st occur */
1198 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199 occu_pdp_st = current_time - occu_pdp_age;
1201 if (occu_pdp_st > proc_pdp_st) {
1202 /* OK we passed the pdp_st moment */
1203 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1204 * occurred before the latest
1206 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207 *post_int = occu_pdp_age; /* how much after it */
1208 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1210 *pre_int = interval;
1214 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1217 printf("proc_pdp_age %lu\t"
1219 "occu_pfp_age %lu\t"
1223 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1227 /* compute the number of elapsed pdp_st moments */
1228 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1232 * Increment the PDP values by the values in pdp_new, or else initialize them.
1234 static void simple_update(
1237 rrd_value_t *pdp_new)
1241 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242 if (isnan(pdp_new[i])) {
1243 /* this is not really accurate if we use subsecond data arrival time
1244 should have thought of it when going subsecond resolution ...
1245 sorry next format change we will have it! */
1246 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1249 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1252 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1261 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1268 * Call process_pdp_st for each DS.
1270 * Returns 0 on success, -1 on error.
1272 static int process_all_pdp_st(
1277 unsigned long elapsed_pdp_st,
1278 rrd_value_t *pdp_new,
1279 rrd_value_t *pdp_temp)
1281 unsigned long ds_idx;
1283 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284 rate*seconds which occurred up to the last run.
1285 pdp_new[] contains rate*seconds from the latest run.
1286 pdp_temp[] will contain the rate for cdp */
1288 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290 elapsed_pdp_st * rrd->stat_head->pdp_step,
1291 pdp_new, pdp_temp) == -1) {
1295 fprintf(stderr, "PDP UPD ds[%lu]\t"
1296 "elapsed_pdp_st %lu\t"
1299 "new_unkn_sec %5lu\n",
1303 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1311 * Process an update that occurs after one of the PDP moments.
1312 * Increments the PDP value, sets NAN if time greater than the
1313 * heartbeats have elapsed, processes CDEFs.
1315 * Returns 0 on success, -1 on error.
1317 static int process_pdp_st(
1319 unsigned long ds_idx,
1323 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1324 rrd_value_t *pdp_new,
1325 rrd_value_t *pdp_temp)
1329 /* update pdp_prep to the current pdp_st. */
1330 double pre_unknown = 0.0;
1331 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1332 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1334 rpnstack_t rpnstack; /* used for COMPUTE DS */
1336 rpnstack_init(&rpnstack);
1339 if (isnan(pdp_new[ds_idx])) {
1340 /* a final bit of unknown to be added before calculation
1341 we use a temporary variable for this so that we
1342 don't have to turn integer lines before using the value */
1343 pre_unknown = pre_int;
1345 if (isnan(scratch[PDP_val].u_val)) {
1346 scratch[PDP_val].u_val = 0;
1348 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1351 /* if too much of the pdp_prep is unknown we dump it */
1352 /* if the interval is larger thatn mrhb we get NAN */
1353 if ((interval > mrhb) ||
1354 (rrd->stat_head->pdp_step / 2.0 <
1355 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356 pdp_temp[ds_idx] = DNAN;
1358 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1363 /* process CDEF data sources; remember each CDEF DS can
1364 * only reference other DS with a lower index number */
1365 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1369 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1370 /* substitute data values for OP_VARIABLE nodes */
1371 for (i = 0; rpnp[i].op != OP_END; i++) {
1372 if (rpnp[i].op == OP_VARIABLE) {
1373 rpnp[i].op = OP_NUMBER;
1374 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1377 /* run the rpn calculator */
1378 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1380 rpnstack_free(&rpnstack);
1385 /* make pdp_prep ready for the next run */
1386 if (isnan(pdp_new[ds_idx])) {
1387 /* this is not realy accurate if we use subsecond data arival time
1388 should have thought of it when going subsecond resolution ...
1389 sorry next format change we will have it! */
1390 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1391 scratch[PDP_val].u_val = DNAN;
1393 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1394 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1396 rpnstack_free(&rpnstack);
1401 * Iterate over all the RRAs for a given DS and:
1402 * 1. Decide whether to schedule a smooth later
1403 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1406 * Returns 0 on success, -1 on error
1408 static int update_all_cdp_prep(
1410 unsigned long *rra_step_cnt,
1411 unsigned long rra_begin,
1412 rrd_file_t *rrd_file,
1413 unsigned long elapsed_pdp_st,
1414 unsigned long proc_pdp_cnt,
1415 rrd_value_t **last_seasonal_coef,
1416 rrd_value_t **seasonal_coef,
1417 rrd_value_t *pdp_temp,
1418 unsigned long *skip_update,
1419 int *schedule_smooth)
1421 unsigned long rra_idx;
1423 /* index into the CDP scratch array */
1424 enum cf_en current_cf;
1425 unsigned long rra_start;
1427 /* number of rows to be updated in an RRA for a data value. */
1428 unsigned long start_pdp_offset;
1430 rra_start = rra_begin;
1431 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1432 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1434 rrd->rra_def[rra_idx].pdp_cnt -
1435 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1436 skip_update[rra_idx] = 0;
1437 if (start_pdp_offset <= elapsed_pdp_st) {
1438 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1439 rrd->rra_def[rra_idx].pdp_cnt + 1;
1441 rra_step_cnt[rra_idx] = 0;
1444 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1445 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1446 * so that they will be correct for the next observed value; note that for
1447 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1448 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1449 if (rra_step_cnt[rra_idx] > 1) {
1450 skip_update[rra_idx] = 1;
1451 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1452 elapsed_pdp_st, last_seasonal_coef);
1453 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454 elapsed_pdp_st + 1, seasonal_coef);
1456 /* periodically run a smoother for seasonal effects */
1457 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1460 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1461 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1462 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1465 *schedule_smooth = 1;
1468 if (rrd_test_error())
1472 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1473 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1474 current_cf) == -1) {
1478 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1479 sizeof(rrd_value_t);
1485 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1487 static int do_schedule_smooth(
1489 unsigned long rra_idx,
1490 unsigned long elapsed_pdp_st)
1492 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1493 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1494 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1495 unsigned long seasonal_smooth_idx =
1496 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1497 unsigned long *init_seasonal =
1498 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1500 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1501 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1502 * really an RRA level, not a data source within RRA level parameter, but
1503 * the rra_def is read only for rrd_update (not flushed to disk). */
1504 if (*init_seasonal > BURNIN_CYCLES) {
1505 /* someone has no doubt invented a trick to deal with this wrap around,
1506 * but at least this code is clear. */
1507 if (seasonal_smooth_idx > cur_row) {
1508 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1509 * between PDP and CDP */
1510 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1512 /* can't rely on negative numbers because we are working with
1513 * unsigned values */
1514 return (cur_row + elapsed_pdp_st >= row_cnt
1515 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1517 /* mark off one of the burn-in cycles */
1518 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1522 * For a given RRA, iterate over the data sources and call the appropriate
1523 * consolidation function.
1525 * Returns 0 on success, -1 on error.
1527 static int update_cdp_prep(
1529 unsigned long elapsed_pdp_st,
1530 unsigned long start_pdp_offset,
1531 unsigned long *rra_step_cnt,
1533 rrd_value_t *pdp_temp,
1534 rrd_value_t *last_seasonal_coef,
1535 rrd_value_t *seasonal_coef,
1538 unsigned long ds_idx, cdp_idx;
1540 /* update CDP_PREP areas */
1541 /* loop over data soures within each RRA */
1542 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1544 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1546 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1547 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1548 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1549 elapsed_pdp_st, start_pdp_offset,
1550 rrd->rra_def[rra_idx].pdp_cnt,
1551 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1554 /* Nothing to consolidate if there's one PDP per CDP. However, if
1555 * we've missed some PDPs, let's update null counters etc. */
1556 if (elapsed_pdp_st > 2) {
1557 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1558 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1563 if (rrd_test_error())
1565 } /* endif data sources loop */
1570 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1571 * primary value, secondary value, and # of unknowns.
1573 static void update_cdp(
1576 rrd_value_t pdp_temp_val,
1577 unsigned long rra_step_cnt,
1578 unsigned long elapsed_pdp_st,
1579 unsigned long start_pdp_offset,
1580 unsigned long pdp_cnt,
1585 /* shorthand variables */
1586 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1587 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1588 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1589 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1592 /* If we are in this block, as least 1 CDP value will be written to
1593 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1594 * to be written, then the "fill in" value is the CDP_secondary_val
1596 if (isnan(pdp_temp_val)) {
1597 *cdp_unkn_pdp_cnt += start_pdp_offset;
1598 *cdp_secondary_val = DNAN;
1600 /* CDP_secondary value is the RRA "fill in" value for intermediary
1601 * CDP data entries. No matter the CF, the value is the same because
1602 * the average, max, min, and last of a list of identical values is
1603 * the same, namely, the value itself. */
1604 *cdp_secondary_val = pdp_temp_val;
1607 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1608 *cdp_primary_val = DNAN;
1609 if (current_cf == CF_AVERAGE) {
1611 initialize_average_carry_over(pdp_temp_val,
1613 start_pdp_offset, pdp_cnt);
1615 *cdp_val = pdp_temp_val;
1618 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1619 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1620 } /* endif meets xff value requirement for a valid value */
1621 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1622 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1623 if (isnan(pdp_temp_val))
1624 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1626 *cdp_unkn_pdp_cnt = 0;
1627 } else { /* rra_step_cnt[i] == 0 */
1630 if (isnan(*cdp_val)) {
1631 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1634 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1638 if (isnan(pdp_temp_val)) {
1639 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1642 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1649 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1650 * on the type of consolidation function.
1652 static void initialize_cdp_val(
1655 rrd_value_t pdp_temp_val,
1656 unsigned long elapsed_pdp_st,
1657 unsigned long start_pdp_offset,
1658 unsigned long pdp_cnt)
1660 rrd_value_t cum_val, cur_val;
1662 switch (current_cf) {
1664 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1665 cur_val = IFDNAN(pdp_temp_val, 0.0);
1666 scratch[CDP_primary_val].u_val =
1667 (cum_val + cur_val * start_pdp_offset) /
1668 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1669 scratch[CDP_val].u_val =
1670 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1671 start_pdp_offset, pdp_cnt);
1674 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1675 cur_val = IFDNAN(pdp_temp_val, -DINF);
1678 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1680 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1686 if (cur_val > cum_val)
1687 scratch[CDP_primary_val].u_val = cur_val;
1689 scratch[CDP_primary_val].u_val = cum_val;
1690 /* initialize carry over value */
1691 scratch[CDP_val].u_val = pdp_temp_val;
1694 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1695 cur_val = IFDNAN(pdp_temp_val, DINF);
1698 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1700 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1706 if (cur_val < cum_val)
1707 scratch[CDP_primary_val].u_val = cur_val;
1709 scratch[CDP_primary_val].u_val = cum_val;
1710 /* initialize carry over value */
1711 scratch[CDP_val].u_val = pdp_temp_val;
1715 scratch[CDP_primary_val].u_val = pdp_temp_val;
1716 /* initialize carry over value */
1717 scratch[CDP_val].u_val = pdp_temp_val;
1723 * Update the consolidation function for Holt-Winters functions as
1724 * well as other functions that don't actually consolidate multiple
1727 static void reset_cdp(
1729 unsigned long elapsed_pdp_st,
1730 rrd_value_t *pdp_temp,
1731 rrd_value_t *last_seasonal_coef,
1732 rrd_value_t *seasonal_coef,
1736 enum cf_en current_cf)
1738 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1740 switch (current_cf) {
1743 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1744 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1747 case CF_DEVSEASONAL:
1748 /* need to update cached seasonal values, so they are consistent
1749 * with the bulk update */
1750 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1751 * CDP_last_deviation are the same. */
1752 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1753 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1757 /* need to update the null_count and last_null_count.
1758 * even do this for non-DNAN pdp_temp because the
1759 * algorithm is not learning from batch updates. */
1760 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1761 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1764 scratch[CDP_primary_val].u_val = DNAN;
1765 scratch[CDP_secondary_val].u_val = DNAN;
1768 /* do not count missed bulk values as failures */
1769 scratch[CDP_primary_val].u_val = 0;
1770 scratch[CDP_secondary_val].u_val = 0;
1771 /* need to reset violations buffer.
1772 * could do this more carefully, but for now, just
1773 * assume a bulk update wipes away all violations. */
1774 erase_violations(rrd, cdp_idx, rra_idx);
1779 static rrd_value_t initialize_average_carry_over(
1780 rrd_value_t pdp_temp_val,
1781 unsigned long elapsed_pdp_st,
1782 unsigned long start_pdp_offset,
1783 unsigned long pdp_cnt)
1785 /* initialize carry over value */
1786 if (isnan(pdp_temp_val)) {
1789 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1793 * Update or initialize a CDP value based on the consolidation
1796 * Returns the new value.
1798 static rrd_value_t calculate_cdp_val(
1799 rrd_value_t cdp_val,
1800 rrd_value_t pdp_temp_val,
1801 unsigned long elapsed_pdp_st,
1812 if (isnan(cdp_val)) {
1813 if (current_cf == CF_AVERAGE) {
1814 pdp_temp_val *= elapsed_pdp_st;
1817 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1818 i, ii, pdp_temp_val);
1820 return pdp_temp_val;
1822 if (current_cf == CF_AVERAGE)
1823 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1824 if (current_cf == CF_MINIMUM)
1825 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1826 if (current_cf == CF_MAXIMUM)
1827 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1829 return pdp_temp_val;
1833 * For each RRA, update the seasonal values and then call update_aberrant_CF
1834 * for each data source.
1836 * Return 0 on success, -1 on error.
1838 static int update_aberrant_cdps(
1840 rrd_file_t *rrd_file,
1841 unsigned long rra_begin,
1842 unsigned long elapsed_pdp_st,
1843 rrd_value_t *pdp_temp,
1844 rrd_value_t **seasonal_coef)
1846 unsigned long rra_idx, ds_idx, j;
1848 /* number of PDP steps since the last update that
1849 * are assigned to the first CDP to be generated
1850 * since the last update. */
1851 unsigned short scratch_idx;
1852 unsigned long rra_start;
1853 enum cf_en current_cf;
1855 /* this loop is only entered if elapsed_pdp_st < 3 */
1856 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1857 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1858 rra_start = rra_begin;
1859 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1861 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1863 if (scratch_idx == CDP_primary_val) {
1864 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1865 elapsed_pdp_st + 1, seasonal_coef);
1867 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868 elapsed_pdp_st + 2, seasonal_coef);
1871 if (rrd_test_error())
1873 /* loop over data soures within each RRA */
1874 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1875 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1876 rra_idx * (rrd->stat_head->ds_cnt) +
1877 ds_idx, rra_idx, ds_idx, scratch_idx,
1881 rra_start += rrd->rra_def[rra_idx].row_cnt
1882 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1889 * Move sequentially through the file, writing one RRA at a time. Note this
1890 * architecture divorces the computation of CDP with flushing updated RRA
1893 * Return 0 on success, -1 on error.
1895 static int write_to_rras(
1897 rrd_file_t *rrd_file,
1898 unsigned long *rra_step_cnt,
1899 unsigned long rra_begin,
1900 time_t current_time,
1901 unsigned long *skip_update,
1902 rrd_info_t ** pcdp_summary)
1904 unsigned long rra_idx;
1905 unsigned long rra_start;
1906 time_t rra_time = 0; /* time of update for a RRA */
1908 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1910 /* Ready to write to disk */
1911 rra_start = rra_begin;
1913 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1914 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1915 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1918 unsigned short scratch_idx;
1919 unsigned long step_subtract;
1921 for (scratch_idx = CDP_primary_val,
1923 rra_step_cnt[rra_idx] > 0;
1924 rra_step_cnt[rra_idx]--,
1925 scratch_idx = CDP_secondary_val,
1926 step_subtract = 2) {
1930 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1932 /* increment, with wrap-around */
1933 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1934 rra_ptr->cur_row = 0;
1936 /* we know what our position should be */
1937 rra_pos_new = rra_start
1938 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1940 /* re-seek if the position is wrong or we wrapped around */
1941 if (rra_pos_new != rrd_file->pos) {
1942 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1943 rrd_set_error("seek error in rrd");
1948 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1951 if (skip_update[rra_idx])
1954 if (*pcdp_summary != NULL) {
1955 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1957 rra_time = (current_time - current_time % step_time)
1958 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1962 (rrd_file, rrd, rra_idx, scratch_idx,
1963 pcdp_summary, rra_time) == -1)
1967 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1974 * Write out one row of values (one value per DS) to the archive.
1976 * Returns 0 on success, -1 on error.
1978 static int write_RRA_row(
1979 rrd_file_t *rrd_file,
1981 unsigned long rra_idx,
1982 unsigned short CDP_scratch_idx,
1983 rrd_info_t ** pcdp_summary,
1986 unsigned long ds_idx, cdp_idx;
1989 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1990 /* compute the cdp index */
1991 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1993 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1994 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1995 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1997 if (*pcdp_summary != NULL) {
1998 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1999 /* append info to the return hash */
2000 *pcdp_summary = rrd_info_push(*pcdp_summary,
2002 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2003 rrd->rra_def[rra_idx].cf_nam,
2004 rrd->rra_def[rra_idx].pdp_cnt,
2005 rrd->ds_def[ds_idx].ds_nam),
2008 if (rrd_write(rrd_file,
2009 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2010 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2011 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2019 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2021 * Returns 0 on success, -1 otherwise
2023 static int smooth_all_rras(
2025 rrd_file_t *rrd_file,
2026 unsigned long rra_begin)
2028 unsigned long rra_start = rra_begin;
2029 unsigned long rra_idx;
2031 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2032 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2033 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2035 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2037 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2038 if (rrd_test_error())
2041 rra_start += rrd->rra_def[rra_idx].row_cnt
2042 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2049 * Flush changes to disk (unless we're using mmap)
2051 * Returns 0 on success, -1 otherwise
2053 static int write_changes_to_disk(
2055 rrd_file_t *rrd_file,
2058 /* we just need to write back the live header portion now */
2059 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2060 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2061 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2063 rrd_set_error("seek rrd for live header writeback");
2067 if (rrd_write(rrd_file, rrd->live_head,
2068 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2069 rrd_set_error("rrd_write live_head to rrd");
2073 if (rrd_write(rrd_file, rrd->legacy_last_up,
2074 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2075 rrd_set_error("rrd_write live_head to rrd");
2081 if (rrd_write(rrd_file, rrd->pdp_prep,
2082 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2083 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2084 rrd_set_error("rrd_write pdp_prep to rrd");
2088 if (rrd_write(rrd_file, rrd->cdp_prep,
2089 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2090 rrd->stat_head->ds_cnt)
2091 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2092 rrd->stat_head->ds_cnt)) {
2094 rrd_set_error("rrd_write cdp_prep to rrd");
2098 if (rrd_write(rrd_file, rrd->rra_ptr,
2099 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2100 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2101 rrd_set_error("rrd_write rra_ptr to rrd");