1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 #include <sys/timeb.h>
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
42 static int gettimeofday(
44 struct __timezone *tz)
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
59 * normalize time as returned by gettimeofday. usec part must
62 static inline void normalize_time(
67 t->tv_usec += 1000000L;
71 static inline info_t *write_RRA_row(
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
80 unsigned long ds_idx, cdp_idx;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
97 rrd->rra_def[rra_idx].
99 rrd->rra_def[rra_idx].
102 ds_nam), RD_I_VAL, iv);
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 *rra_current += sizeof(rrd_value_t);
113 return (pcdp_summary);
117 const char *filename,
122 const char *filename,
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
136 info_t *result = NULL;
138 struct option long_options[] = {
139 {"template", required_argument, 0, 't'},
145 opterr = 0; /* initialize getopt */
148 int option_index = 0;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
186 struct option long_options[] = {
187 {"template", required_argument, 0, 't'},
190 int option_index = 0;
196 opterr = 0; /* initialize getopt */
199 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
206 tmplt = strdup(optarg);
210 rrd_set_error("unknown option '%s'", argv[optind - 1]);
215 /* need at least 2 arguments: filename, data. */
216 if (argc - optind < 2) {
217 rrd_set_error("Not enough arguments");
222 rc = rrd_update_r(argv[optind], tmplt,
223 argc - optind - 1, (const char **) (argv + optind + 1));
229 const char *filename,
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
238 const char *filename,
242 info_t *pcdp_summary)
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
261 unsigned long proc_pdp_st; /* which pdp_st was the last
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
320 rrd_set_error("Not enough arguments");
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
328 /* We are now at the beginning of the rra's */
329 rra_current = rra_start = rra_begin = rrd_file->header_len;
331 /* initialize time */
332 version = atoi(rrd.stat_head->version);
333 gettimeofday(&tmp_time, 0);
334 normalize_time(&tmp_time);
335 current_time = tmp_time.tv_sec;
337 current_time_usec = tmp_time.tv_usec;
339 current_time_usec = 0;
342 /* get exclusive lock to whole file.
343 * lock gets removed when we close the file.
345 if (LockRRD(rrd_file->fd) != 0) {
346 rrd_set_error("could not lock RRD");
351 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352 rrd_set_error("allocating updvals pointer array");
356 if ((pdp_temp = malloc(sizeof(rrd_value_t)
357 * rrd.stat_head->ds_cnt)) == NULL) {
358 rrd_set_error("allocating pdp_temp ...");
359 goto err_free_updvals;
362 if ((tmpl_idx = malloc(sizeof(unsigned long)
363 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364 rrd_set_error("allocating tmpl_idx ...");
365 goto err_free_pdp_temp;
367 /* initialize tmplt redirector */
368 /* default config example (assume DS 1 is a CDEF DS)
369 tmpl_idx[0] -> 0; (time)
370 tmpl_idx[1] -> 1; (DS 0)
371 tmpl_idx[2] -> 3; (DS 2)
372 tmpl_idx[3] -> 4; (DS 3) */
373 tmpl_idx[0] = 0; /* time */
374 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
381 /* we should work on a writeable copy here */
383 unsigned int tmpl_len;
384 char *tmplt_copy = strdup(tmplt);
387 tmpl_cnt = 1; /* the first entry is the time */
388 tmpl_len = strlen(tmplt_copy);
389 for (i = 0; i <= tmpl_len; i++) {
390 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391 tmplt_copy[i] = '\0';
392 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
394 ("tmplt contains more DS definitions than RRD");
395 goto err_free_tmpl_idx;
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398 rrd_set_error("unknown DS name '%s'", dsname);
399 goto err_free_tmpl_idx;
401 /* the first element is always the time */
402 tmpl_idx[tmpl_cnt - 1]++;
403 /* go to the next entry on the tmplt_copy */
404 dsname = &tmplt_copy[i + 1];
405 /* fix the damage we did before */
415 if ((pdp_new = malloc(sizeof(rrd_value_t)
416 * rrd.stat_head->ds_cnt)) == NULL) {
417 rrd_set_error("allocating pdp_new ...");
418 goto err_free_tmpl_idx;
420 /* loop through the arguments. */
421 for (arg_i = 0; arg_i < argc; arg_i++) {
422 char *stepper = strdup(argv[arg_i]);
423 char *step_start = stepper;
425 char *parsetime_error = NULL;
426 enum { atstyle, normal } timesyntax;
427 struct rrd_time_value ds_tv;
429 if (stepper == NULL) {
430 rrd_set_error("failed duplication argv entry");
432 goto err_free_pdp_new;
434 /* initialize all ds input to unknown except the first one
435 which has always got to be set */
436 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
438 updvals[0] = stepper;
439 /* separate all ds elements; first must be examined separately
440 due to alternate time syntax */
441 if ((p = strchr(stepper, '@')) != NULL) {
442 timesyntax = atstyle;
445 } else if ((p = strchr(stepper, ':')) != NULL) {
451 ("expected timestamp not found in data source from %s",
457 updvals[tmpl_idx[ii]] = stepper;
459 if (*stepper == ':') {
463 updvals[tmpl_idx[ii]] = stepper + 1;
469 if (ii != tmpl_cnt - 1) {
471 ("expected %lu data source readings (got %lu) from %s",
472 tmpl_cnt - 1, ii, argv[arg_i]);
477 /* get the time from the reading ... handle N */
478 if (timesyntax == atstyle) {
479 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
480 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
484 if (ds_tv.type == RELATIVE_TO_END_TIME ||
485 ds_tv.type == RELATIVE_TO_START_TIME) {
486 rrd_set_error("specifying time relative to the 'start' "
487 "or 'end' makes no sense here: %s", updvals[0]);
492 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
494 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
496 } else if (strcmp(updvals[0], "N") == 0) {
497 gettimeofday(&tmp_time, 0);
498 normalize_time(&tmp_time);
499 current_time = tmp_time.tv_sec;
500 current_time_usec = tmp_time.tv_usec;
504 tmp = strtod(updvals[0], 0);
505 current_time = floor(tmp);
507 (long) ((tmp - (double) current_time) * 1000000.0);
509 /* dont do any correction for old version RRDs */
511 current_time_usec = 0;
513 if (current_time < rrd.live_head->last_up ||
514 (current_time == rrd.live_head->last_up &&
515 (long) current_time_usec <=
516 (long) rrd.live_head->last_up_usec)) {
517 rrd_set_error("illegal attempt to update using time %ld when "
518 "last update time is %ld (minimum one second step)",
519 current_time, rrd.live_head->last_up);
524 /* seek to the beginning of the rra's */
525 if (rra_current != rra_begin) {
527 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
528 rrd_set_error("seek error in rrd");
533 rra_current = rra_begin;
535 rra_start = rra_begin;
537 /* when was the current pdp started */
538 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
539 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
541 /* when did the last pdp_st occur */
542 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
543 occu_pdp_st = current_time - occu_pdp_age;
545 /* interval = current_time - rrd.live_head->last_up; */
546 interval = (double) (current_time - rrd.live_head->last_up)
547 + (double) ((long) current_time_usec -
548 (long) rrd.live_head->last_up_usec) / 1000000.0;
550 if (occu_pdp_st > proc_pdp_st) {
551 /* OK we passed the pdp_st moment */
552 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
553 * occurred before the latest
555 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
556 post_int = occu_pdp_age; /* how much after it */
557 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
564 printf("proc_pdp_age %lu\t"
570 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
571 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
574 /* process the data sources and update the pdp_prep
575 * area accordingly */
576 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
579 dst_idx = dst_conv(rrd.ds_def[i].dst);
581 /* make sure we do not build diffs with old last_ds values */
582 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
583 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
584 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
587 /* NOTE: DST_CDEF should never enter this if block, because
588 * updvals[i+1][0] is initialized to 'U'; unless the caller
589 * accidently specified a value for the DST_CDEF. To handle
590 * this case, an extra check is required. */
592 if ((updvals[i + 1][0] != 'U') &&
593 (dst_idx != DST_CDEF) &&
594 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
597 /* the data source type defines how to process the data */
598 /* pdp_new contains rate * time ... eg the bytes
599 * transferred during the interval. Doing it this way saves
600 * a lot of math operations */
604 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
605 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
606 if ((updvals[i + 1][ii] < '0'
607 || updvals[i + 1][ii] > '9') && (ii != 0
613 rrd_set_error("not a simple integer: '%s'",
618 if (rrd_test_error()) {
622 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
623 if (dst_idx == DST_COUNTER) {
624 /* simple overflow catcher suggested by Andres Kroonmaa */
625 /* this will fail terribly for non 32 or 64 bit counters ... */
626 /* are there any others in SNMP land ? */
627 if (pdp_new[i] < (double) 0.0)
628 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
629 if (pdp_new[i] < (double) 0.0)
630 pdp_new[i] += (double) 18446744069414584320.0;
633 rate = pdp_new[i] / interval;
640 pdp_new[i] = strtod(updvals[i + 1], &endptr);
642 rrd_set_error("converting '%s' to float: %s",
643 updvals[i + 1], rrd_strerror(errno));
646 if (endptr[0] != '\0') {
648 ("conversion of '%s' to float not complete: tail '%s'",
649 updvals[i + 1], endptr);
652 rate = pdp_new[i] / interval;
656 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
658 rrd_set_error("converting '%s' to float: %s",
659 updvals[i + 1], rrd_strerror(errno));
662 if (endptr[0] != '\0') {
664 ("conversion of '%s' to float not complete: tail '%s'",
665 updvals[i + 1], endptr);
668 rate = pdp_new[i] / interval;
671 rrd_set_error("rrd contains unknown DS type : '%s'",
675 /* break out of this for loop if the error string is set */
676 if (rrd_test_error()) {
679 /* make sure pdp_temp is neither too large or too small
680 * if any of these occur it becomes unknown ...
683 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
685 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
690 /* no news is news all the same */
695 /* make a copy of the command line argument for the next run */
702 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
704 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
707 /* break out of the argument parsing loop if the error_string is set */
708 if (rrd_test_error()) {
712 /* has a pdp_st moment occurred since the last run ? */
714 if (proc_pdp_st == occu_pdp_st) {
715 /* no we have not passed a pdp_st moment. therefore update is simple */
717 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
718 if (isnan(pdp_new[i])) {
719 /* this is not realy accurate if we use subsecond data arival time
720 should have thought of it when going subsecond resolution ...
721 sorry next format change we will have it! */
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
725 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
726 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
728 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
745 rate*seconds which occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
750 /* update pdp_prep to the current pdp_st. */
751 double pre_unknown = 0.0;
753 if (isnan(pdp_new[i])) {
754 /* a final bit of unkonwn to be added bevore calculation
755 we use a temporary variable for this so that we
756 don't have to turn integer lines before using the value */
757 pre_unknown = pre_int;
759 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
760 rrd.pdp_prep[i].scratch[PDP_val].u_val =
761 pdp_new[i] / interval * pre_int;
763 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764 pdp_new[i] / interval * pre_int;
769 /* if too much of the pdp_prep is unknown we dump it */
771 /* removed because this does not agree with the
772 definition that a heartbeat can be unknown */
773 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775 /* if the interval is larger thatn mrhb we get NAN */
776 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777 (occu_pdp_st - proc_pdp_st <=
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
781 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782 / ((double) (occu_pdp_st - proc_pdp_st
785 scratch[PDP_unkn_sec_cnt].u_cnt)
789 /* process CDEF data sources; remember each CDEF DS can
790 * only reference other DS with a lower index number */
791 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795 rpn_expand((rpn_cdefds_t *) &
796 (rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
799 if (rpnp[ii].op == OP_VARIABLE) {
800 rpnp[ii].op = OP_NUMBER;
801 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
804 /* run the rpn calculator */
805 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
807 break; /* exits the data sources pdp_temp loop */
811 /* make pdp_prep ready for the next run */
812 if (isnan(pdp_new[i])) {
813 /* this is not realy accurate if we use subsecond data arival time
814 should have thought of it when going subsecond resolution ...
815 sorry next format change we will have it! */
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
818 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821 rrd.pdp_prep[i].scratch[PDP_val].u_val =
822 pdp_new[i] / interval * post_int;
830 "new_unkn_sec %5lu\n",
832 rrd.pdp_prep[i].scratch[PDP_val].u_val,
833 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
837 /* if there were errors during the last loop, bail out here */
838 if (rrd_test_error()) {
843 /* compute the number of elapsed pdp_st moments */
845 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
847 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
849 if (rra_step_cnt == NULL) {
850 rra_step_cnt = (unsigned long *)
851 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
854 for (i = 0, rra_start = rra_begin;
855 i < rrd.stat_head->rra_cnt;
857 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
858 sizeof(rrd_value_t), i++) {
859 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861 (proc_pdp_st / rrd.stat_head->pdp_step) %
862 rrd.rra_def[i].pdp_cnt;
863 if (start_pdp_offset <= elapsed_pdp_st) {
864 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
865 rrd.rra_def[i].pdp_cnt + 1;
870 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
871 /* If this is a bulk update, we need to skip ahead in
872 the seasonal arrays so that they will be correct for
873 the next observed value;
874 note that for the bulk update itself, no update will
875 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
876 and DEVPREDICT will be set to DNAN. */
877 if (rra_step_cnt[i] > 2) {
878 /* skip update by resetting rra_step_cnt[i],
879 note that this is not data source specific; this is
880 due to the bulk update, not a DNAN value for the
881 specific data source. */
883 lookup_seasonal(&rrd, i, rra_start, rrd_file,
884 elapsed_pdp_st, &last_seasonal_coef);
885 lookup_seasonal(&rrd, i, rra_start, rrd_file,
886 elapsed_pdp_st + 1, &seasonal_coef);
889 /* periodically run a smoother for seasonal effects */
890 /* Need to use first cdp parameter buffer to track
891 * burnin (burnin requires a specific smoothing schedule).
892 * The CDP_init_seasonal parameter is really an RRA level,
893 * not a data source within RRA level parameter, but the rra_def
894 * is read only for rrd_update (not flushed to disk). */
895 iii = i * (rrd.stat_head->ds_cnt);
896 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
898 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
899 > rrd.rra_def[i].row_cnt - 1) {
900 /* mark off one of the burnin cycles */
901 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
906 /* someone has no doubt invented a trick to deal with this
907 * wrap around, but at least this code is clear. */
908 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
909 u_cnt > rrd.rra_ptr[i].cur_row) {
910 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
911 * mapping between PDP and CDP */
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
914 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
918 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row,
922 par[RRA_seasonal_smooth_idx].u_cnt);
927 /* can't rely on negative numbers because we are working with
929 /* Don't need modulus here. If we've wrapped more than once, only
930 * one smooth is executed at the end. */
931 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
932 rrd.rra_def[i].row_cnt
933 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
934 rrd.rra_def[i].row_cnt >=
935 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
939 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
940 rrd.rra_ptr[i].cur_row,
943 par[RRA_seasonal_smooth_idx].u_cnt);
950 rra_current = rrd_tell(rrd_file);
952 /* if cf is DEVSEASONAL or SEASONAL */
953 if (rrd_test_error())
956 /* update CDP_PREP areas */
957 /* loop over data soures within each RRA */
958 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
960 /* iii indexes the CDP prep area for this data source within the RRA */
961 iii = i * rrd.stat_head->ds_cnt + ii;
963 if (rrd.rra_def[i].pdp_cnt > 1) {
965 if (rra_step_cnt[i] > 0) {
966 /* If we are in this block, as least 1 CDP value will be written to
967 * disk, this is the CDP_primary_val entry. If more than 1 value needs
968 * to be written, then the "fill in" value is the CDP_secondary_val
970 if (isnan(pdp_temp[ii])) {
971 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
972 u_cnt += start_pdp_offset;
973 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
976 /* CDP_secondary value is the RRA "fill in" value for intermediary
977 * CDP data entries. No matter the CF, the value is the same because
978 * the average, max, min, and last of a list of identical values is
979 * the same, namely, the value itself. */
980 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
981 u_val = pdp_temp[ii];
984 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
986 rrd.rra_def[i].pdp_cnt *
987 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].
990 /* initialize carry over */
991 if (current_cf == CF_AVERAGE) {
992 if (isnan(pdp_temp[ii])) {
993 rrd.cdp_prep[iii].scratch[CDP_val].
996 rrd.cdp_prep[iii].scratch[CDP_val].
1001 rrd.rra_def[i].pdp_cnt);
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1008 rrd_value_t cum_val, cur_val;
1010 switch (current_cf) {
1013 IFDNAN(rrd.cdp_prep[iii].
1014 scratch[CDP_val].u_val, 0.0);
1015 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1017 scratch[CDP_primary_val].u_val =
1019 cur_val * start_pdp_offset) /
1020 (rrd.rra_def[i].pdp_cnt -
1022 scratch[CDP_unkn_pdp_cnt].u_cnt);
1023 /* initialize carry over value */
1024 if (isnan(pdp_temp[ii])) {
1025 rrd.cdp_prep[iii].scratch[CDP_val].
1028 rrd.cdp_prep[iii].scratch[CDP_val].
1033 rrd.rra_def[i].pdp_cnt);
1038 IFDNAN(rrd.cdp_prep[iii].
1039 scratch[CDP_val].u_val, -DINF);
1040 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1043 (rrd.cdp_prep[iii].scratch[CDP_val].
1044 u_val) && isnan(pdp_temp[ii])) {
1046 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1051 if (cur_val > cum_val)
1053 scratch[CDP_primary_val].u_val =
1057 scratch[CDP_primary_val].u_val =
1059 /* initialize carry over value */
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1065 IFDNAN(rrd.cdp_prep[iii].
1066 scratch[CDP_val].u_val, DINF);
1067 cur_val = IFDNAN(pdp_temp[ii], DINF);
1070 (rrd.cdp_prep[iii].scratch[CDP_val].
1071 u_val) && isnan(pdp_temp[ii])) {
1073 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1078 if (cur_val < cum_val)
1080 scratch[CDP_primary_val].u_val =
1084 scratch[CDP_primary_val].u_val =
1086 /* initialize carry over value */
1087 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1093 scratch[CDP_primary_val].u_val =
1095 /* initialize carry over value */
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1100 } /* endif meets xff value requirement for a valid value */
1101 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1102 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1103 if (isnan(pdp_temp[ii]))
1104 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1108 rrd.rra_def[i].pdp_cnt;
1110 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1112 } else { /* rra_step_cnt[i] == 0 */
1116 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1118 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1122 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1124 rrd.cdp_prep[iii].scratch[CDP_val].
1128 if (isnan(pdp_temp[ii])) {
1129 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1130 u_cnt += elapsed_pdp_st;
1133 (rrd.cdp_prep[iii].scratch[CDP_val].
1135 if (current_cf == CF_AVERAGE) {
1136 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1137 pdp_temp[ii] * elapsed_pdp_st;
1139 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1144 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1146 rrd.cdp_prep[iii].scratch[CDP_val].
1150 switch (current_cf) {
1152 rrd.cdp_prep[iii].scratch[CDP_val].
1154 pdp_temp[ii] * elapsed_pdp_st;
1158 rrd.cdp_prep[iii].scratch[CDP_val].
1160 rrd.cdp_prep[iii].scratch[CDP_val].
1161 u_val = pdp_temp[ii];
1165 rrd.cdp_prep[iii].scratch[CDP_val].
1167 rrd.cdp_prep[iii].scratch[CDP_val].
1168 u_val = pdp_temp[ii];
1172 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1178 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1179 if (elapsed_pdp_st > 2) {
1180 switch (current_cf) {
1183 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1184 u_val = pdp_temp[ii];
1185 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1186 u_val = pdp_temp[ii];
1189 case CF_DEVSEASONAL:
1190 /* need to update cached seasonal values, so they are consistent
1191 * with the bulk update */
1192 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1193 * CDP_last_deviation are the same. */
1195 scratch[CDP_hw_last_seasonal].u_val =
1196 last_seasonal_coef[ii];
1197 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1198 u_val = seasonal_coef[ii];
1202 /* need to update the null_count and last_null_count.
1203 * even do this for non-DNAN pdp_temp because the
1204 * algorithm is not learning from batch updates. */
1205 rrd.cdp_prep[iii].scratch[CDP_null_count].
1206 u_cnt += elapsed_pdp_st;
1208 scratch[CDP_last_null_count].u_cnt +=
1212 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1214 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1218 /* do not count missed bulk values as failures */
1219 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1221 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1223 /* need to reset violations buffer.
1224 * could do this more carefully, but for now, just
1225 * assume a bulk update wipes away all violations. */
1226 erase_violations(&rrd, iii, i);
1230 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1232 if (rrd_test_error())
1235 } /* endif data sources loop */
1236 } /* end RRA Loop */
1238 /* this loop is only entered if elapsed_pdp_st < 3 */
1239 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1240 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1241 for (i = 0, rra_start = rra_begin;
1242 i < rrd.stat_head->rra_cnt;
1244 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1245 sizeof(rrd_value_t), i++) {
1246 if (rrd.rra_def[i].pdp_cnt > 1)
1249 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1250 if (current_cf == CF_SEASONAL
1251 || current_cf == CF_DEVSEASONAL) {
1252 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1253 elapsed_pdp_st + (scratch_idx ==
1257 rra_current = rrd_tell(rrd_file);
1259 if (rrd_test_error())
1261 /* loop over data soures within each RRA */
1262 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1263 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1264 i * (rrd.stat_head->ds_cnt) + ii,
1265 i, ii, scratch_idx, seasonal_coef);
1267 } /* end RRA Loop */
1268 if (rrd_test_error())
1270 } /* end elapsed_pdp_st loop */
1272 if (rrd_test_error())
1275 /* Ready to write to disk */
1276 /* Move sequentially through the file, writing one RRA at a time.
1277 * Note this architecture divorces the computation of CDP with
1278 * flushing updated RRA entries to disk. */
1279 for (i = 0, rra_start = rra_begin;
1280 i < rrd.stat_head->rra_cnt;
1282 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1283 sizeof(rrd_value_t), i++) {
1284 /* is th5Aere anything to write for this RRA? If not, continue. */
1285 if (rra_step_cnt[i] == 0)
1288 /* write the first row */
1290 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1292 rrd.rra_ptr[i].cur_row++;
1293 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1294 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1295 /* positition on the first row */
1296 rra_pos_tmp = rra_start +
1297 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1298 sizeof(rrd_value_t);
1299 if (rra_pos_tmp != rra_current) {
1300 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1301 rrd_set_error("seek error in rrd");
1304 rra_current = rra_pos_tmp;
1307 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1309 scratch_idx = CDP_primary_val;
1310 if (pcdp_summary != NULL) {
1311 rra_time = (current_time - current_time
1312 % (rrd.rra_def[i].pdp_cnt *
1313 rrd.stat_head->pdp_step))
1316 1) * rrd.rra_def[i].pdp_cnt *
1317 rrd.stat_head->pdp_step);
1320 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1321 scratch_idx, pcdp_summary, &rra_time);
1322 if (rrd_test_error())
1325 /* write other rows of the bulk update, if any */
1326 scratch_idx = CDP_secondary_val;
1327 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1328 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1331 "Wraparound for RRA %s, %lu updates left\n",
1332 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1335 rrd.rra_ptr[i].cur_row = 0;
1336 /* seek back to beginning of current rra */
1337 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1338 rrd_set_error("seek error in rrd");
1342 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1345 rra_current = rra_start;
1347 if (pcdp_summary != NULL) {
1348 rra_time = (current_time - current_time
1349 % (rrd.rra_def[i].pdp_cnt *
1350 rrd.stat_head->pdp_step))
1353 2) * rrd.rra_def[i].pdp_cnt *
1354 rrd.stat_head->pdp_step);
1357 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1358 scratch_idx, pcdp_summary, &rra_time);
1361 if (rrd_test_error())
1365 /* break out of the argument parsing loop if error_string is set */
1366 if (rrd_test_error()) {
1371 } /* endif a pdp_st has occurred */
1372 rrd.live_head->last_up = current_time;
1373 rrd.live_head->last_up_usec = current_time_usec;
1375 } /* function argument loop */
1377 if (seasonal_coef != NULL)
1378 free(seasonal_coef);
1379 if (last_seasonal_coef != NULL)
1380 free(last_seasonal_coef);
1381 if (rra_step_cnt != NULL)
1383 rpnstack_free(&rpnstack);
1386 //rrd_flush(rrd_file); //XXX: really needed?
1388 /* if we got here and if there is an error and if the file has not been
1389 * written to, then close things up and return. */
1390 if (rrd_test_error()) {
1391 goto err_free_pdp_new;
1394 /* aargh ... that was tough ... so many loops ... anyway, its done.
1395 * we just need to write back the live header portion now*/
1397 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1398 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1399 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1401 rrd_set_error("seek rrd for live header writeback");
1402 goto err_free_pdp_new;
1404 /* for mmap, we did already write to the underlying mapping, so we do
1405 not need to write again. */
1408 if (rrd_write(rrd_file, rrd.live_head,
1409 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1410 rrd_set_error("rrd_write live_head to rrd");
1411 goto err_free_pdp_new;
1414 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1415 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1416 rrd_set_error("rrd_write live_head to rrd");
1417 goto err_free_pdp_new;
1422 if (rrd_write(rrd_file, rrd.pdp_prep,
1423 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1424 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1425 rrd_set_error("rrd_write pdp_prep to rrd");
1426 goto err_free_pdp_new;
1429 if (rrd_write(rrd_file, rrd.cdp_prep,
1430 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1431 rrd.stat_head->ds_cnt)
1432 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1433 rrd.stat_head->ds_cnt)) {
1435 rrd_set_error("rrd_write cdp_prep to rrd");
1436 goto err_free_pdp_new;
1439 if (rrd_write(rrd_file, rrd.rra_ptr,
1440 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1441 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1442 rrd_set_error("rrd_write rra_ptr to rrd");
1443 goto err_free_pdp_new;
1447 /* rrd_flush(rrd_file); */
1449 /* calling the smoothing code here guarantees at most
1450 * one smoothing operation per rrd_update call. Unfortunately,
1451 * it is possible with bulk updates, or a long-delayed update
1452 * for smoothing to occur off-schedule. This really isn't
1453 * critical except during the burning cycles. */
1454 if (schedule_smooth) {
1456 rra_start = rra_begin;
1457 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1458 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1459 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1461 fprintf(stderr, "Running smoother for rra %ld\n", i);
1463 apply_smoother(&rrd, i, rra_start, rrd_file);
1464 if (rrd_test_error())
1467 rra_start += rrd.rra_def[i].row_cnt
1468 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1472 rrd_dontneed(rrd_file,&rrd);
1474 rrd_close(rrd_file);
1491 rrd_close(rrd_file);
1499 * get exclusive lock to whole file.
1500 * lock gets removed when we close the file
1502 * returns 0 on success
1510 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1513 if (_fstat(in_file, &st) == 0) {
1514 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1521 lock.l_type = F_WRLCK; /* exclusive write lock */
1522 lock.l_len = 0; /* whole file */
1523 lock.l_start = 0; /* start of file */
1524 lock.l_whence = SEEK_SET; /* end of file */
1526 rcstat = fcntl(in_file, F_SETLK, &lock);