1 /*****************************************************************************
2 * RRDtool 1.4.2 Copyright by Tobi Oetiker, 1997-2009
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
8 *****************************************************************************/
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(
49 struct __timezone *tz)
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
64 /* FUNCTION PROTOTYPES */
78 static int allocate_data_structures(
81 rrd_value_t **pdp_temp,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
92 unsigned long *tmpl_cnt,
95 static int process_arg(
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
110 unsigned long *skip_update,
111 int *schedule_smooth);
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
123 static int get_time_from_reading(
127 time_t *current_time,
128 unsigned long *current_time_usec,
131 static int update_pdp_prep(
134 rrd_value_t *pdp_new,
137 static int calculate_elapsed_steps(
139 unsigned long current_time,
140 unsigned long current_time_usec,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
162 unsigned long ds_idx,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
199 static void update_cdp(
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
211 static void initialize_cdp_val(
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
244 static int update_aberrant_cdps(
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
269 static int smooth_all_rras(
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
275 static int write_changes_to_disk(
277 rrd_file_t *rrd_file,
282 * normalize time as returned by gettimeofday. usec part must
285 static void normalize_time(
288 if (t->tv_usec < 0) {
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
309 *current_time_usec = tmp_time.tv_usec;
311 *current_time_usec = 0;
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
322 rrd_info_t *result = NULL;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
332 opterr = 0; /* initialize getopt */
335 int option_index = 0;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
387 int option_index = 0;
391 char *opt_daemon = NULL;
394 opterr = 0; /* initialize getopt */
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
404 tmplt = strdup(optarg);
408 if (opt_daemon != NULL)
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
413 rrd_set_error("strdup failed.");
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437 rrd_set_error("The caching daemon cannot be used together with "
442 if (! rrdc_is_connected(opt_daemon))
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
447 else /* we are connected */
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
453 rrd_set_error("Failed sending the values to rrdcached: %s",
463 if (opt_daemon != NULL)
472 const char *filename,
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
481 const char *filename,
485 rrd_info_t * pcdp_summary)
490 unsigned long rra_begin; /* byte pointer to the rra
491 * area in the rrd file. this
492 * pointer never changes value */
493 rrd_value_t *pdp_new; /* prepare the incoming data to be added
494 * to the existing entry */
495 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
496 * to the cdp values */
498 long *tmpl_idx; /* index representing the settings
499 * transported by the tmplt index */
500 unsigned long tmpl_cnt = 2; /* time and data */
502 time_t current_time = 0;
503 unsigned long current_time_usec = 0; /* microseconds part of current time */
505 int schedule_smooth = 0;
507 /* number of elapsed PDP steps since last update */
508 unsigned long *rra_step_cnt = NULL;
510 int version; /* rrd version */
511 rrd_file_t *rrd_file;
512 char *arg_copy; /* for processing the argv */
513 unsigned long *skip_update; /* RRAs to advance but not write */
515 /* need at least 1 arguments: data. */
517 rrd_set_error("Not enough arguments");
522 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
525 /* We are now at the beginning of the rra's */
526 rra_begin = rrd_file->header_len;
528 version = atoi(rrd.stat_head->version);
530 initialize_time(¤t_time, ¤t_time_usec, version);
532 /* get exclusive lock to whole file.
533 * lock gets removed when we close the file.
535 if (rrd_lock(rrd_file) != 0) {
536 rrd_set_error("could not lock RRD");
540 if (allocate_data_structures(&rrd, &updvals,
541 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542 &rra_step_cnt, &skip_update,
547 /* loop through the arguments. */
548 for (arg_i = 0; arg_i < argc; arg_i++) {
549 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550 rrd_set_error("failed duplication argv entry");
553 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
555 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556 &pcdp_summary, version, skip_update,
557 &schedule_smooth) == -1) {
558 if (rrd_test_error()) { /* Should have error string always here */
561 /* Prepend file name to error message */
562 if ((save_error = strdup(rrd_get_error())) != NULL) {
563 rrd_set_error("%s: %s", filename, save_error);
575 /* if we got here and if there is an error and if the file has not been
576 * written to, then close things up and return. */
577 if (rrd_test_error()) {
578 goto err_free_structures;
581 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582 goto err_free_structures;
586 /* calling the smoothing code here guarantees at most one smoothing
587 * operation per rrd_update call. Unfortunately, it is possible with bulk
588 * updates, or a long-delayed update for smoothing to occur off-schedule.
589 * This really isn't critical except during the burn-in cycles. */
590 if (schedule_smooth) {
591 smooth_all_rras(&rrd, rrd_file, rra_begin);
594 /* rrd_dontneed(rrd_file,&rrd); */
620 * Allocate some important arrays used, and initialize the template.
622 * When it returns, either all of the structures are allocated
623 * or none of them are.
625 * Returns 0 on success, -1 on error.
627 static int allocate_data_structures(
630 rrd_value_t **pdp_temp,
633 unsigned long *tmpl_cnt,
634 unsigned long **rra_step_cnt,
635 unsigned long **skip_update,
636 rrd_value_t **pdp_new)
639 if ((*updvals = (char **) malloc(sizeof(char *)
640 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641 rrd_set_error("allocating updvals pointer array.");
644 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
645 * rrd->stat_head->ds_cnt)) ==
647 rrd_set_error("allocating pdp_temp.");
648 goto err_free_updvals;
650 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
652 rrd->stat_head->rra_cnt)) ==
654 rrd_set_error("allocating skip_update.");
655 goto err_free_pdp_temp;
657 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
658 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
659 rrd_set_error("allocating tmpl_idx.");
660 goto err_free_skip_update;
662 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
665 rra_cnt))) == NULL) {
666 rrd_set_error("allocating rra_step_cnt.");
667 goto err_free_tmpl_idx;
670 /* initialize tmplt redirector */
671 /* default config example (assume DS 1 is a CDEF DS)
672 tmpl_idx[0] -> 0; (time)
673 tmpl_idx[1] -> 1; (DS 0)
674 tmpl_idx[2] -> 3; (DS 2)
675 tmpl_idx[3] -> 4; (DS 3) */
676 (*tmpl_idx)[0] = 0; /* time */
677 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
678 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
679 (*tmpl_idx)[ii++] = i;
684 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
685 goto err_free_rra_step_cnt;
689 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
690 * rrd->stat_head->ds_cnt)) == NULL) {
691 rrd_set_error("allocating pdp_new.");
692 goto err_free_rra_step_cnt;
697 err_free_rra_step_cnt:
701 err_free_skip_update:
711 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
713 * Returns 0 on success.
715 static int parse_template(
718 unsigned long *tmpl_cnt,
721 char *dsname, *tmplt_copy;
722 unsigned int tmpl_len, i;
725 *tmpl_cnt = 1; /* the first entry is the time */
727 /* we should work on a writeable copy here */
728 if ((tmplt_copy = strdup(tmplt)) == NULL) {
729 rrd_set_error("error copying tmplt '%s'", tmplt);
735 tmpl_len = strlen(tmplt_copy);
736 for (i = 0; i <= tmpl_len; i++) {
737 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
738 tmplt_copy[i] = '\0';
739 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
740 rrd_set_error("tmplt contains more DS definitions than RRD");
742 goto out_free_tmpl_copy;
744 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
745 rrd_set_error("unknown DS name '%s'", dsname);
747 goto out_free_tmpl_copy;
749 /* go to the next entry on the tmplt_copy */
751 dsname = &tmplt_copy[i + 1];
761 * Parse an update string, updates the primary data points (PDPs)
762 * and consolidated data points (CDPs), and writes changes to the RRAs.
764 * Returns 0 on success, -1 on error.
766 static int process_arg(
769 rrd_file_t *rrd_file,
770 unsigned long rra_begin,
771 time_t *current_time,
772 unsigned long *current_time_usec,
773 rrd_value_t *pdp_temp,
774 rrd_value_t *pdp_new,
775 unsigned long *rra_step_cnt,
778 unsigned long tmpl_cnt,
779 rrd_info_t ** pcdp_summary,
781 unsigned long *skip_update,
782 int *schedule_smooth)
784 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
786 /* a vector of future Holt-Winters seasonal coefs */
787 unsigned long elapsed_pdp_st;
789 double interval, pre_int, post_int; /* interval between this and
791 unsigned long proc_pdp_cnt;
793 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
794 current_time, current_time_usec, version) == -1) {
798 interval = (double) (*current_time - rrd->live_head->last_up)
799 + (double) ((long) *current_time_usec -
800 (long) rrd->live_head->last_up_usec) / 1e6f;
802 /* process the data sources and update the pdp_prep
803 * area accordingly */
804 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
808 elapsed_pdp_st = calculate_elapsed_steps(rrd,
810 *current_time_usec, interval,
814 /* has a pdp_st moment occurred since the last run ? */
815 if (elapsed_pdp_st == 0) {
816 /* no we have not passed a pdp_st moment. therefore update is simple */
817 simple_update(rrd, interval, pdp_new);
819 /* an pdp_st has occurred. */
820 if (process_all_pdp_st(rrd, interval,
822 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
825 if (update_all_cdp_prep(rrd, rra_step_cnt,
832 skip_update, schedule_smooth) == -1) {
833 goto err_free_coefficients;
835 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
836 elapsed_pdp_st, pdp_temp,
837 &seasonal_coef) == -1) {
838 goto err_free_coefficients;
840 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
841 *current_time, skip_update,
842 pcdp_summary) == -1) {
843 goto err_free_coefficients;
845 } /* endif a pdp_st has occurred */
846 rrd->live_head->last_up = *current_time;
847 rrd->live_head->last_up_usec = *current_time_usec;
850 *rrd->legacy_last_up = rrd->live_head->last_up;
853 free(last_seasonal_coef);
856 err_free_coefficients:
858 free(last_seasonal_coef);
863 * Parse a DS string (time + colon-separated values), storing the
864 * results in current_time, current_time_usec, and updvals.
866 * Returns 0 on success, -1 on error.
873 unsigned long tmpl_cnt,
874 time_t *current_time,
875 unsigned long *current_time_usec,
883 /* initialize all ds input to unknown except the first one
884 which has always got to be set */
885 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
888 /* separate all ds elements; first must be examined separately
889 due to alternate time syntax */
890 if ((p = strchr(input, '@')) != NULL) {
892 } else if ((p = strchr(input, ':')) != NULL) {
895 rrd_set_error("expected timestamp not found in data source from %s",
901 updvals[tmpl_idx[i++]] = p + 1;
906 updvals[tmpl_idx[i++]] = p + 1;
909 rrd_set_error("found extra data on update argument: %s",p+1);
916 rrd_set_error("expected %lu data source readings (got %lu) from %s",
917 tmpl_cnt - 1, i - 1, input);
921 if (get_time_from_reading(rrd, timesyntax, updvals,
922 current_time, current_time_usec,
930 * Parse the time in a DS string, store it in current_time and
931 * current_time_usec and verify that it's later than the last
932 * update for this DS.
934 * Returns 0 on success, -1 on error.
936 static int get_time_from_reading(
940 time_t *current_time,
941 unsigned long *current_time_usec,
945 char *parsetime_error = NULL;
947 rrd_time_value_t ds_tv;
948 struct timeval tmp_time; /* used for time conversion */
950 /* get the time from the reading ... handle N */
951 if (timesyntax == '@') { /* at-style */
952 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
953 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
956 if (ds_tv.type == RELATIVE_TO_END_TIME ||
957 ds_tv.type == RELATIVE_TO_START_TIME) {
958 rrd_set_error("specifying time relative to the 'start' "
959 "or 'end' makes no sense here: %s", updvals[0]);
962 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
963 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
964 } else if (strcmp(updvals[0], "N") == 0) {
965 gettimeofday(&tmp_time, 0);
966 normalize_time(&tmp_time);
967 *current_time = tmp_time.tv_sec;
968 *current_time_usec = tmp_time.tv_usec;
970 old_locale = setlocale(LC_NUMERIC, "C");
972 tmp = strtod(updvals[0], 0);
974 rrd_set_error("converting '%s' to float: %s",
975 updvals[0], rrd_strerror(errno));
978 setlocale(LC_NUMERIC, old_locale);
980 gettimeofday(&tmp_time, 0);
981 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
984 *current_time = floor(tmp);
985 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
987 /* dont do any correction for old version RRDs */
989 *current_time_usec = 0;
991 if (*current_time < rrd->live_head->last_up ||
992 (*current_time == rrd->live_head->last_up &&
993 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
994 rrd_set_error("illegal attempt to update using time %ld when "
995 "last update time is %ld (minimum one second step)",
996 *current_time, rrd->live_head->last_up);
1003 * Update pdp_new by interpreting the updvals according to the DS type
1004 * (COUNTER, GAUGE, etc.).
1006 * Returns 0 on success, -1 on error.
1008 static int update_pdp_prep(
1011 rrd_value_t *pdp_new,
1014 unsigned long ds_idx;
1016 char *endptr; /* used in the conversion */
1019 enum dst_en dst_idx;
1021 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1022 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1024 /* make sure we do not build diffs with old last_ds values */
1025 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1026 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1027 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1030 /* NOTE: DST_CDEF should never enter this if block, because
1031 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1032 * accidently specified a value for the DST_CDEF. To handle this case,
1033 * an extra check is required. */
1035 if ((updvals[ds_idx + 1][0] != 'U') &&
1036 (dst_idx != DST_CDEF) &&
1037 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1040 /* pdp_new contains rate * time ... eg the bytes transferred during
1041 * the interval. Doing it this way saves a lot of math operations
1046 if ( ( updvals[ds_idx + 1][0] < '0'
1047 || updvals[ds_idx + 1][0] > '9' )
1048 && updvals[ds_idx + 1][0] != '-'
1049 && updvals[ds_idx + 1][0] != 'U'
1051 rrd_set_error("not a simple integer: '%s'",
1052 updvals[ds_idx + 1]);
1055 for (ii = 1; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1056 if ( updvals[ds_idx + 1][ii] < '0'
1057 || updvals[ds_idx + 1][ii] > '9'
1059 rrd_set_error("not a simple integer: '%s'",
1060 updvals[ds_idx + 1]);
1064 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1066 rrd_diff(updvals[ds_idx + 1],
1067 rrd->pdp_prep[ds_idx].last_ds);
1068 if (dst_idx == DST_COUNTER) {
1069 /* simple overflow catcher. This will fail
1070 * terribly for non 32 or 64 bit counters
1071 * ... are there any others in SNMP land?
1073 if (pdp_new[ds_idx] < (double) 0.0)
1074 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1075 if (pdp_new[ds_idx] < (double) 0.0)
1076 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1078 rate = pdp_new[ds_idx] / interval;
1080 pdp_new[ds_idx] = DNAN;
1084 old_locale = setlocale(LC_NUMERIC, "C");
1086 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1088 rrd_set_error("converting '%s' to float: %s",
1089 updvals[ds_idx + 1], rrd_strerror(errno));
1092 setlocale(LC_NUMERIC, old_locale);
1093 if (endptr[0] != '\0') {
1095 ("conversion of '%s' to float not complete: tail '%s'",
1096 updvals[ds_idx + 1], endptr);
1099 rate = pdp_new[ds_idx] / interval;
1102 old_locale = setlocale(LC_NUMERIC, "C");
1105 strtod(updvals[ds_idx + 1], &endptr) * interval;
1107 rrd_set_error("converting '%s' to float: %s",
1108 updvals[ds_idx + 1], rrd_strerror(errno));
1111 setlocale(LC_NUMERIC, old_locale);
1112 if (endptr[0] != '\0') {
1114 ("conversion of '%s' to float not complete: tail '%s'",
1115 updvals[ds_idx + 1], endptr);
1118 rate = pdp_new[ds_idx] / interval;
1121 rrd_set_error("rrd contains unknown DS type : '%s'",
1122 rrd->ds_def[ds_idx].dst);
1125 /* break out of this for loop if the error string is set */
1126 if (rrd_test_error()) {
1129 /* make sure pdp_temp is neither too large or too small
1130 * if any of these occur it becomes unknown ...
1131 * sorry folks ... */
1133 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1134 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1135 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1136 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1137 pdp_new[ds_idx] = DNAN;
1140 /* no news is news all the same */
1141 pdp_new[ds_idx] = DNAN;
1145 /* make a copy of the command line argument for the next run */
1147 fprintf(stderr, "prep ds[%lu]\t"
1151 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1154 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1156 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1162 * How many PDP steps have elapsed since the last update? Returns the answer,
1163 * and stores the time between the last update and the last PDP in pre_time,
1164 * and the time between the last PDP and the current time in post_int.
1166 static int calculate_elapsed_steps(
1168 unsigned long current_time,
1169 unsigned long current_time_usec,
1173 unsigned long *proc_pdp_cnt)
1175 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1176 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1178 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1179 * when it was last updated */
1180 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1182 /* when was the current pdp started */
1183 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1184 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1186 /* when did the last pdp_st occur */
1187 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1188 occu_pdp_st = current_time - occu_pdp_age;
1190 if (occu_pdp_st > proc_pdp_st) {
1191 /* OK we passed the pdp_st moment */
1192 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1193 * occurred before the latest
1195 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1196 *post_int = occu_pdp_age; /* how much after it */
1197 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1199 *pre_int = interval;
1203 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1206 printf("proc_pdp_age %lu\t"
1208 "occu_pfp_age %lu\t"
1212 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1213 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1216 /* compute the number of elapsed pdp_st moments */
1217 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1221 * Increment the PDP values by the values in pdp_new, or else initialize them.
1223 static void simple_update(
1226 rrd_value_t *pdp_new)
1230 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1231 if (isnan(pdp_new[i])) {
1232 /* this is not really accurate if we use subsecond data arrival time
1233 should have thought of it when going subsecond resolution ...
1234 sorry next format change we will have it! */
1235 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1238 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1239 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1241 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1250 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1251 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1257 * Call process_pdp_st for each DS.
1259 * Returns 0 on success, -1 on error.
1261 static int process_all_pdp_st(
1266 unsigned long elapsed_pdp_st,
1267 rrd_value_t *pdp_new,
1268 rrd_value_t *pdp_temp)
1270 unsigned long ds_idx;
1272 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1273 rate*seconds which occurred up to the last run.
1274 pdp_new[] contains rate*seconds from the latest run.
1275 pdp_temp[] will contain the rate for cdp */
1277 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1278 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1279 elapsed_pdp_st * rrd->stat_head->pdp_step,
1280 pdp_new, pdp_temp) == -1) {
1284 fprintf(stderr, "PDP UPD ds[%lu]\t"
1285 "elapsed_pdp_st %lu\t"
1288 "new_unkn_sec %5lu\n",
1292 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1293 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1300 * Process an update that occurs after one of the PDP moments.
1301 * Increments the PDP value, sets NAN if time greater than the
1302 * heartbeats have elapsed, processes CDEFs.
1304 * Returns 0 on success, -1 on error.
1306 static int process_pdp_st(
1308 unsigned long ds_idx,
1312 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1313 rrd_value_t *pdp_new,
1314 rrd_value_t *pdp_temp)
1318 /* update pdp_prep to the current pdp_st. */
1319 double pre_unknown = 0.0;
1320 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1321 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1323 rpnstack_t rpnstack; /* used for COMPUTE DS */
1325 rpnstack_init(&rpnstack);
1328 if (isnan(pdp_new[ds_idx])) {
1329 /* a final bit of unknown to be added before calculation
1330 we use a temporary variable for this so that we
1331 don't have to turn integer lines before using the value */
1332 pre_unknown = pre_int;
1334 if (isnan(scratch[PDP_val].u_val)) {
1335 scratch[PDP_val].u_val = 0;
1337 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1340 /* if too much of the pdp_prep is unknown we dump it */
1341 /* if the interval is larger thatn mrhb we get NAN */
1342 if ((interval > mrhb) ||
1343 (rrd->stat_head->pdp_step / 2.0 <
1344 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1345 pdp_temp[ds_idx] = DNAN;
1347 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1348 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1352 /* process CDEF data sources; remember each CDEF DS can
1353 * only reference other DS with a lower index number */
1354 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1358 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1359 /* substitute data values for OP_VARIABLE nodes */
1360 for (i = 0; rpnp[i].op != OP_END; i++) {
1361 if (rpnp[i].op == OP_VARIABLE) {
1362 rpnp[i].op = OP_NUMBER;
1363 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1366 /* run the rpn calculator */
1367 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1369 rpnstack_free(&rpnstack);
1374 /* make pdp_prep ready for the next run */
1375 if (isnan(pdp_new[ds_idx])) {
1376 /* this is not realy accurate if we use subsecond data arival time
1377 should have thought of it when going subsecond resolution ...
1378 sorry next format change we will have it! */
1379 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1380 scratch[PDP_val].u_val = DNAN;
1382 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1383 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1385 rpnstack_free(&rpnstack);
1390 * Iterate over all the RRAs for a given DS and:
1391 * 1. Decide whether to schedule a smooth later
1392 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1395 * Returns 0 on success, -1 on error
1397 static int update_all_cdp_prep(
1399 unsigned long *rra_step_cnt,
1400 unsigned long rra_begin,
1401 rrd_file_t *rrd_file,
1402 unsigned long elapsed_pdp_st,
1403 unsigned long proc_pdp_cnt,
1404 rrd_value_t **last_seasonal_coef,
1405 rrd_value_t **seasonal_coef,
1406 rrd_value_t *pdp_temp,
1407 unsigned long *skip_update,
1408 int *schedule_smooth)
1410 unsigned long rra_idx;
1412 /* index into the CDP scratch array */
1413 enum cf_en current_cf;
1414 unsigned long rra_start;
1416 /* number of rows to be updated in an RRA for a data value. */
1417 unsigned long start_pdp_offset;
1419 rra_start = rra_begin;
1420 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1421 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1423 rrd->rra_def[rra_idx].pdp_cnt -
1424 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1425 skip_update[rra_idx] = 0;
1426 if (start_pdp_offset <= elapsed_pdp_st) {
1427 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1428 rrd->rra_def[rra_idx].pdp_cnt + 1;
1430 rra_step_cnt[rra_idx] = 0;
1433 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1434 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1435 * so that they will be correct for the next observed value; note that for
1436 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1437 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1438 if (rra_step_cnt[rra_idx] > 1) {
1439 skip_update[rra_idx] = 1;
1440 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1441 elapsed_pdp_st, last_seasonal_coef);
1442 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1443 elapsed_pdp_st + 1, seasonal_coef);
1445 /* periodically run a smoother for seasonal effects */
1446 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1449 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1450 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1451 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1454 *schedule_smooth = 1;
1457 if (rrd_test_error())
1461 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1462 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1463 current_cf) == -1) {
1467 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1468 sizeof(rrd_value_t);
1474 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1476 static int do_schedule_smooth(
1478 unsigned long rra_idx,
1479 unsigned long elapsed_pdp_st)
1481 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1482 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1483 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1484 unsigned long seasonal_smooth_idx =
1485 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1486 unsigned long *init_seasonal =
1487 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1489 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1490 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1491 * really an RRA level, not a data source within RRA level parameter, but
1492 * the rra_def is read only for rrd_update (not flushed to disk). */
1493 if (*init_seasonal > BURNIN_CYCLES) {
1494 /* someone has no doubt invented a trick to deal with this wrap around,
1495 * but at least this code is clear. */
1496 if (seasonal_smooth_idx > cur_row) {
1497 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1498 * between PDP and CDP */
1499 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1501 /* can't rely on negative numbers because we are working with
1502 * unsigned values */
1503 return (cur_row + elapsed_pdp_st >= row_cnt
1504 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1506 /* mark off one of the burn-in cycles */
1507 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1511 * For a given RRA, iterate over the data sources and call the appropriate
1512 * consolidation function.
1514 * Returns 0 on success, -1 on error.
1516 static int update_cdp_prep(
1518 unsigned long elapsed_pdp_st,
1519 unsigned long start_pdp_offset,
1520 unsigned long *rra_step_cnt,
1522 rrd_value_t *pdp_temp,
1523 rrd_value_t *last_seasonal_coef,
1524 rrd_value_t *seasonal_coef,
1527 unsigned long ds_idx, cdp_idx;
1529 /* update CDP_PREP areas */
1530 /* loop over data soures within each RRA */
1531 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1533 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1535 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1536 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1537 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1538 elapsed_pdp_st, start_pdp_offset,
1539 rrd->rra_def[rra_idx].pdp_cnt,
1540 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1543 /* Nothing to consolidate if there's one PDP per CDP. However, if
1544 * we've missed some PDPs, let's update null counters etc. */
1545 if (elapsed_pdp_st > 2) {
1546 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1547 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1552 if (rrd_test_error())
1554 } /* endif data sources loop */
1559 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1560 * primary value, secondary value, and # of unknowns.
1562 static void update_cdp(
1565 rrd_value_t pdp_temp_val,
1566 unsigned long rra_step_cnt,
1567 unsigned long elapsed_pdp_st,
1568 unsigned long start_pdp_offset,
1569 unsigned long pdp_cnt,
1574 /* shorthand variables */
1575 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1576 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1577 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1578 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1581 /* If we are in this block, as least 1 CDP value will be written to
1582 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1583 * to be written, then the "fill in" value is the CDP_secondary_val
1585 if (isnan(pdp_temp_val)) {
1586 *cdp_unkn_pdp_cnt += start_pdp_offset;
1587 *cdp_secondary_val = DNAN;
1589 /* CDP_secondary value is the RRA "fill in" value for intermediary
1590 * CDP data entries. No matter the CF, the value is the same because
1591 * the average, max, min, and last of a list of identical values is
1592 * the same, namely, the value itself. */
1593 *cdp_secondary_val = pdp_temp_val;
1596 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1597 *cdp_primary_val = DNAN;
1598 if (current_cf == CF_AVERAGE) {
1600 initialize_average_carry_over(pdp_temp_val,
1602 start_pdp_offset, pdp_cnt);
1604 *cdp_val = pdp_temp_val;
1607 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1608 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1609 } /* endif meets xff value requirement for a valid value */
1610 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1611 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1612 if (isnan(pdp_temp_val))
1613 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1615 *cdp_unkn_pdp_cnt = 0;
1616 } else { /* rra_step_cnt[i] == 0 */
1619 if (isnan(*cdp_val)) {
1620 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1623 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1627 if (isnan(pdp_temp_val)) {
1628 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1631 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1638 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1639 * on the type of consolidation function.
1641 static void initialize_cdp_val(
1644 rrd_value_t pdp_temp_val,
1645 unsigned long elapsed_pdp_st,
1646 unsigned long start_pdp_offset,
1647 unsigned long pdp_cnt)
1649 rrd_value_t cum_val, cur_val;
1651 switch (current_cf) {
1653 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1654 cur_val = IFDNAN(pdp_temp_val, 0.0);
1655 scratch[CDP_primary_val].u_val =
1656 (cum_val + cur_val * start_pdp_offset) /
1657 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1658 scratch[CDP_val].u_val =
1659 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1660 start_pdp_offset, pdp_cnt);
1663 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1664 cur_val = IFDNAN(pdp_temp_val, -DINF);
1667 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1669 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1675 if (cur_val > cum_val)
1676 scratch[CDP_primary_val].u_val = cur_val;
1678 scratch[CDP_primary_val].u_val = cum_val;
1679 /* initialize carry over value */
1680 scratch[CDP_val].u_val = pdp_temp_val;
1683 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1684 cur_val = IFDNAN(pdp_temp_val, DINF);
1687 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1689 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1695 if (cur_val < cum_val)
1696 scratch[CDP_primary_val].u_val = cur_val;
1698 scratch[CDP_primary_val].u_val = cum_val;
1699 /* initialize carry over value */
1700 scratch[CDP_val].u_val = pdp_temp_val;
1704 scratch[CDP_primary_val].u_val = pdp_temp_val;
1705 /* initialize carry over value */
1706 scratch[CDP_val].u_val = pdp_temp_val;
1712 * Update the consolidation function for Holt-Winters functions as
1713 * well as other functions that don't actually consolidate multiple
1716 static void reset_cdp(
1718 unsigned long elapsed_pdp_st,
1719 rrd_value_t *pdp_temp,
1720 rrd_value_t *last_seasonal_coef,
1721 rrd_value_t *seasonal_coef,
1725 enum cf_en current_cf)
1727 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1729 switch (current_cf) {
1732 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1733 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1736 case CF_DEVSEASONAL:
1737 /* need to update cached seasonal values, so they are consistent
1738 * with the bulk update */
1739 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1740 * CDP_last_deviation are the same. */
1741 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1742 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1746 /* need to update the null_count and last_null_count.
1747 * even do this for non-DNAN pdp_temp because the
1748 * algorithm is not learning from batch updates. */
1749 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1750 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1753 scratch[CDP_primary_val].u_val = DNAN;
1754 scratch[CDP_secondary_val].u_val = DNAN;
1757 /* do not count missed bulk values as failures */
1758 scratch[CDP_primary_val].u_val = 0;
1759 scratch[CDP_secondary_val].u_val = 0;
1760 /* need to reset violations buffer.
1761 * could do this more carefully, but for now, just
1762 * assume a bulk update wipes away all violations. */
1763 erase_violations(rrd, cdp_idx, rra_idx);
1768 static rrd_value_t initialize_average_carry_over(
1769 rrd_value_t pdp_temp_val,
1770 unsigned long elapsed_pdp_st,
1771 unsigned long start_pdp_offset,
1772 unsigned long pdp_cnt)
1774 /* initialize carry over value */
1775 if (isnan(pdp_temp_val)) {
1778 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1782 * Update or initialize a CDP value based on the consolidation
1785 * Returns the new value.
1787 static rrd_value_t calculate_cdp_val(
1788 rrd_value_t cdp_val,
1789 rrd_value_t pdp_temp_val,
1790 unsigned long elapsed_pdp_st,
1801 if (isnan(cdp_val)) {
1802 if (current_cf == CF_AVERAGE) {
1803 pdp_temp_val *= elapsed_pdp_st;
1806 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1807 i, ii, pdp_temp_val);
1809 return pdp_temp_val;
1811 if (current_cf == CF_AVERAGE)
1812 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1813 if (current_cf == CF_MINIMUM)
1814 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1815 if (current_cf == CF_MAXIMUM)
1816 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1818 return pdp_temp_val;
1822 * For each RRA, update the seasonal values and then call update_aberrant_CF
1823 * for each data source.
1825 * Return 0 on success, -1 on error.
1827 static int update_aberrant_cdps(
1829 rrd_file_t *rrd_file,
1830 unsigned long rra_begin,
1831 unsigned long elapsed_pdp_st,
1832 rrd_value_t *pdp_temp,
1833 rrd_value_t **seasonal_coef)
1835 unsigned long rra_idx, ds_idx, j;
1837 /* number of PDP steps since the last update that
1838 * are assigned to the first CDP to be generated
1839 * since the last update. */
1840 unsigned short scratch_idx;
1841 unsigned long rra_start;
1842 enum cf_en current_cf;
1844 /* this loop is only entered if elapsed_pdp_st < 3 */
1845 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1846 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1847 rra_start = rra_begin;
1848 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1849 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1850 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1851 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1852 if (scratch_idx == CDP_primary_val) {
1853 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1854 elapsed_pdp_st + 1, seasonal_coef);
1856 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1857 elapsed_pdp_st + 2, seasonal_coef);
1860 if (rrd_test_error())
1862 /* loop over data soures within each RRA */
1863 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1864 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1865 rra_idx * (rrd->stat_head->ds_cnt) +
1866 ds_idx, rra_idx, ds_idx, scratch_idx,
1870 rra_start += rrd->rra_def[rra_idx].row_cnt
1871 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1878 * Move sequentially through the file, writing one RRA at a time. Note this
1879 * architecture divorces the computation of CDP with flushing updated RRA
1882 * Return 0 on success, -1 on error.
1884 static int write_to_rras(
1886 rrd_file_t *rrd_file,
1887 unsigned long *rra_step_cnt,
1888 unsigned long rra_begin,
1889 time_t current_time,
1890 unsigned long *skip_update,
1891 rrd_info_t ** pcdp_summary)
1893 unsigned long rra_idx;
1894 unsigned long rra_start;
1895 time_t rra_time = 0; /* time of update for a RRA */
1897 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1899 /* Ready to write to disk */
1900 rra_start = rra_begin;
1902 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1903 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1904 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1907 unsigned short scratch_idx;
1908 unsigned long step_subtract;
1910 for (scratch_idx = CDP_primary_val,
1912 rra_step_cnt[rra_idx] > 0;
1913 rra_step_cnt[rra_idx]--,
1914 scratch_idx = CDP_secondary_val,
1915 step_subtract = 2) {
1919 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1921 /* increment, with wrap-around */
1922 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1923 rra_ptr->cur_row = 0;
1925 /* we know what our position should be */
1926 rra_pos_new = rra_start
1927 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1929 /* re-seek if the position is wrong or we wrapped around */
1930 if ((size_t)rra_pos_new != rrd_file->pos) {
1931 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1932 rrd_set_error("seek error in rrd");
1937 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1940 if (skip_update[rra_idx])
1943 if (*pcdp_summary != NULL) {
1944 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1946 rra_time = (current_time - current_time % step_time)
1947 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1951 (rrd_file, rrd, rra_idx, scratch_idx,
1952 pcdp_summary, rra_time) == -1)
1955 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1958 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1965 * Write out one row of values (one value per DS) to the archive.
1967 * Returns 0 on success, -1 on error.
1969 static int write_RRA_row(
1970 rrd_file_t *rrd_file,
1972 unsigned long rra_idx,
1973 unsigned short CDP_scratch_idx,
1974 rrd_info_t ** pcdp_summary,
1977 unsigned long ds_idx, cdp_idx;
1980 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1981 /* compute the cdp index */
1982 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1984 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1985 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1986 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1988 if (*pcdp_summary != NULL) {
1989 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1990 /* append info to the return hash */
1991 *pcdp_summary = rrd_info_push(*pcdp_summary,
1993 ("[%lli]RRA[%s][%lu]DS[%s]",
1994 (long long)rra_time,
1995 rrd->rra_def[rra_idx].cf_nam,
1996 rrd->rra_def[rra_idx].pdp_cnt,
1997 rrd->ds_def[ds_idx].ds_nam),
2001 if (rrd_write(rrd_file,
2002 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2003 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2004 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2012 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2014 * Returns 0 on success, -1 otherwise
2016 static int smooth_all_rras(
2018 rrd_file_t *rrd_file,
2019 unsigned long rra_begin)
2021 unsigned long rra_start = rra_begin;
2022 unsigned long rra_idx;
2024 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2025 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2026 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2028 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2030 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2031 if (rrd_test_error())
2034 rra_start += rrd->rra_def[rra_idx].row_cnt
2035 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2042 * Flush changes to disk (unless we're using mmap)
2044 * Returns 0 on success, -1 otherwise
2046 static int write_changes_to_disk(
2048 rrd_file_t *rrd_file,
2051 /* we just need to write back the live header portion now */
2052 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2053 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2054 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2056 rrd_set_error("seek rrd for live header writeback");
2060 if (rrd_write(rrd_file, rrd->live_head,
2061 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2062 rrd_set_error("rrd_write live_head to rrd");
2066 if (rrd_write(rrd_file, rrd->legacy_last_up,
2067 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2068 rrd_set_error("rrd_write live_head to rrd");
2074 if (rrd_write(rrd_file, rrd->pdp_prep,
2075 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2076 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2077 rrd_set_error("rrd_write pdp_prep to rrd");
2081 if (rrd_write(rrd_file, rrd->cdp_prep,
2082 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2083 rrd->stat_head->ds_cnt)
2084 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2085 rrd->stat_head->ds_cnt)) {
2087 rrd_set_error("rrd_write cdp_prep to rrd");
2091 if (rrd_write(rrd_file, rrd->rra_ptr,
2092 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2093 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2094 rrd_set_error("rrd_write rra_ptr to rrd");