1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 #include <sys/timeb.h>
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
42 static int gettimeofday(
44 struct __timezone *tz)
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
59 * normalize time as returned by gettimeofday. usec part must
62 static inline void normalize_time(
67 t->tv_usec += 1000000L;
71 static inline info_t *write_RRA_row(
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
80 unsigned long ds_idx, cdp_idx;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
97 rrd->rra_def[rra_idx].
99 rrd->rra_def[rra_idx].
102 ds_nam), RD_I_VAL, iv);
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 *rra_current += sizeof(rrd_value_t);
113 return (pcdp_summary);
117 const char *filename,
122 const char *filename,
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
136 info_t *result = NULL;
141 opterr = 0; /* initialize getopt */
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
148 int option_index = 0;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
190 opterr = 0; /* initialize getopt */
193 static struct option long_options[] = {
194 {"template", required_argument, 0, 't'},
197 int option_index = 0;
200 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
211 rrd_set_error("unknown option '%s'", argv[optind - 1]);
216 /* need at least 2 arguments: filename, data. */
217 if (argc - optind < 2) {
218 rrd_set_error("Not enough arguments");
223 rc = rrd_update_r(argv[optind], tmplt,
224 argc - optind - 1, (const char **) (argv + optind + 1));
229 const char *filename,
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
238 const char *filename,
242 info_t *pcdp_summary)
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
261 unsigned long proc_pdp_st; /* which pdp_st was the last
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
320 rrd_set_error("Not enough arguments");
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
328 /* We are now at the beginning of the rra's */
329 rra_current = rra_start = rra_begin = rrd_file->header_len;
331 /* initialize time */
332 version = atoi(rrd.stat_head->version);
333 gettimeofday(&tmp_time, 0);
334 normalize_time(&tmp_time);
335 current_time = tmp_time.tv_sec;
337 current_time_usec = tmp_time.tv_usec;
339 current_time_usec = 0;
342 /* get exclusive lock to whole file.
343 * lock gets removed when we close the file.
345 if (LockRRD(rrd_file->fd) != 0) {
346 rrd_set_error("could not lock RRD");
351 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352 rrd_set_error("allocating updvals pointer array");
356 if ((pdp_temp = malloc(sizeof(rrd_value_t)
357 * rrd.stat_head->ds_cnt)) == NULL) {
358 rrd_set_error("allocating pdp_temp ...");
359 goto err_free_updvals;
362 if ((tmpl_idx = malloc(sizeof(unsigned long)
363 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364 rrd_set_error("allocating tmpl_idx ...");
365 goto err_free_pdp_temp;
367 /* initialize tmplt redirector */
368 /* default config example (assume DS 1 is a CDEF DS)
369 tmpl_idx[0] -> 0; (time)
370 tmpl_idx[1] -> 1; (DS 0)
371 tmpl_idx[2] -> 3; (DS 2)
372 tmpl_idx[3] -> 4; (DS 3) */
373 tmpl_idx[0] = 0; /* time */
374 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
381 /* we should work on a writeable copy here */
383 unsigned int tmpl_len;
384 char *tmplt_copy = strdup(tmplt);
387 tmpl_cnt = 1; /* the first entry is the time */
388 tmpl_len = strlen(tmplt_copy);
389 for (i = 0; i <= tmpl_len; i++) {
390 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391 tmplt_copy[i] = '\0';
392 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
394 ("tmplt contains more DS definitions than RRD");
395 goto err_free_tmpl_idx;
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398 rrd_set_error("unknown DS name '%s'", dsname);
399 goto err_free_tmpl_idx;
401 /* the first element is always the time */
402 tmpl_idx[tmpl_cnt - 1]++;
403 /* go to the next entry on the tmplt_copy */
404 dsname = &tmplt_copy[i + 1];
405 /* fix the damage we did before */
415 if ((pdp_new = malloc(sizeof(rrd_value_t)
416 * rrd.stat_head->ds_cnt)) == NULL) {
417 rrd_set_error("allocating pdp_new ...");
418 goto err_free_tmpl_idx;
420 /* loop through the arguments. */
421 for (arg_i = 0; arg_i < argc; arg_i++) {
422 char *stepper = strdup(argv[arg_i]);
423 char *step_start = stepper;
425 char *parsetime_error = NULL;
426 enum { atstyle, normal } timesyntax;
427 struct rrd_time_value ds_tv;
429 if (stepper == NULL) {
430 rrd_set_error("failed duplication argv entry");
432 goto err_free_pdp_new;
434 /* initialize all ds input to unknown except the first one
435 which has always got to be set */
436 memset(updvals + 1, 'U', rrd.stat_head->ds_cnt);
437 updvals[0] = stepper;
438 /* separate all ds elements; first must be examined separately
439 due to alternate time syntax */
440 if ((p = strchr(stepper, '@')) != NULL) {
441 timesyntax = atstyle;
444 } else if ((p = strchr(stepper, ':')) != NULL) {
450 ("expected timestamp not found in data source from %s",
456 updvals[tmpl_idx[ii]] = stepper;
458 if (*stepper == ':') {
462 updvals[tmpl_idx[ii]] = stepper + 1;
468 if (ii != tmpl_cnt - 1) {
470 ("expected %lu data source readings (got %lu) from %s",
471 tmpl_cnt - 1, ii, argv[arg_i]);
476 /* get the time from the reading ... handle N */
477 if (timesyntax == atstyle) {
478 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
479 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483 if (ds_tv.type == RELATIVE_TO_END_TIME ||
484 ds_tv.type == RELATIVE_TO_START_TIME) {
485 rrd_set_error("specifying time relative to the 'start' "
486 "or 'end' makes no sense here: %s", updvals[0]);
491 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
493 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
495 } else if (strcmp(updvals[0], "N") == 0) {
496 gettimeofday(&tmp_time, 0);
497 normalize_time(&tmp_time);
498 current_time = tmp_time.tv_sec;
499 current_time_usec = tmp_time.tv_usec;
503 tmp = strtod(updvals[0], 0);
504 current_time = floor(tmp);
506 (long) ((tmp - (double) current_time) * 1000000.0);
508 /* dont do any correction for old version RRDs */
510 current_time_usec = 0;
512 if (current_time < rrd.live_head->last_up ||
513 (current_time == rrd.live_head->last_up &&
514 (long) current_time_usec <=
515 (long) rrd.live_head->last_up_usec)) {
516 rrd_set_error("illegal attempt to update using time %ld when "
517 "last update time is %ld (minimum one second step)",
518 current_time, rrd.live_head->last_up);
523 /* seek to the beginning of the rra's */
524 if (rra_current != rra_begin) {
526 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
527 rrd_set_error("seek error in rrd");
532 rra_current = rra_begin;
534 rra_start = rra_begin;
536 /* when was the current pdp started */
537 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
538 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
540 /* when did the last pdp_st occur */
541 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
542 occu_pdp_st = current_time - occu_pdp_age;
544 /* interval = current_time - rrd.live_head->last_up; */
545 interval = (double) (current_time - rrd.live_head->last_up)
546 + (double) ((long) current_time_usec -
547 (long) rrd.live_head->last_up_usec) / 1000000.0;
549 if (occu_pdp_st > proc_pdp_st) {
550 /* OK we passed the pdp_st moment */
551 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
552 * occurred before the latest
554 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
555 post_int = occu_pdp_age; /* how much after it */
556 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
563 printf("proc_pdp_age %lu\t"
569 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
570 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
573 /* process the data sources and update the pdp_prep
574 * area accordingly */
575 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
578 dst_idx = dst_conv(rrd.ds_def[i].dst);
580 /* make sure we do not build diffs with old last_ds values */
581 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
582 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
583 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
586 /* NOTE: DST_CDEF should never enter this if block, because
587 * updvals[i+1][0] is initialized to 'U'; unless the caller
588 * accidently specified a value for the DST_CDEF. To handle
589 * this case, an extra check is required. */
591 if ((updvals[i + 1][0] != 'U') &&
592 (dst_idx != DST_CDEF) &&
593 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
596 /* the data source type defines how to process the data */
597 /* pdp_new contains rate * time ... eg the bytes
598 * transferred during the interval. Doing it this way saves
599 * a lot of math operations */
603 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
604 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
605 if ((updvals[i + 1][ii] < '0'
606 || updvals[i + 1][ii] > '9') && (ii != 0
612 rrd_set_error("not a simple integer: '%s'",
617 if (rrd_test_error()) {
621 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
622 if (dst_idx == DST_COUNTER) {
623 /* simple overflow catcher suggested by Andres Kroonmaa */
624 /* this will fail terribly for non 32 or 64 bit counters ... */
625 /* are there any others in SNMP land ? */
626 if (pdp_new[i] < (double) 0.0)
627 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
628 if (pdp_new[i] < (double) 0.0)
629 pdp_new[i] += (double) 18446744069414584320.0;
632 rate = pdp_new[i] / interval;
639 pdp_new[i] = strtod(updvals[i + 1], &endptr);
641 rrd_set_error("converting '%s' to float: %s",
642 updvals[i + 1], rrd_strerror(errno));
645 if (endptr[0] != '\0') {
647 ("conversion of '%s' to float not complete: tail '%s'",
648 updvals[i + 1], endptr);
651 rate = pdp_new[i] / interval;
655 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
657 rrd_set_error("converting '%s' to float: %s",
658 updvals[i + 1], rrd_strerror(errno));
661 if (endptr[0] != '\0') {
663 ("conversion of '%s' to float not complete: tail '%s'",
664 updvals[i + 1], endptr);
667 rate = pdp_new[i] / interval;
670 rrd_set_error("rrd contains unknown DS type : '%s'",
674 /* break out of this for loop if the error string is set */
675 if (rrd_test_error()) {
678 /* make sure pdp_temp is neither too large or too small
679 * if any of these occur it becomes unknown ...
682 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
684 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
689 /* no news is news all the same */
694 /* make a copy of the command line argument for the next run */
701 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
703 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
704 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
706 /* break out of the argument parsing loop if the error_string is set */
707 if (rrd_test_error()) {
711 /* has a pdp_st moment occurred since the last run ? */
713 if (proc_pdp_st == occu_pdp_st) {
714 /* no we have not passed a pdp_st moment. therefore update is simple */
716 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
717 if (isnan(pdp_new[i])) {
718 /* this is not realy accurate if we use subsecond data arival time
719 should have thought of it when going subsecond resolution ...
720 sorry next format change we will have it! */
721 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
724 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
725 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
727 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
736 rrd.pdp_prep[i].scratch[PDP_val].u_val,
737 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
741 /* an pdp_st has occurred. */
743 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
744 rate*seconds which occurred up to the last run.
745 pdp_new[] contains rate*seconds from the latest run.
746 pdp_temp[] will contain the rate for cdp */
748 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
749 /* update pdp_prep to the current pdp_st. */
750 double pre_unknown = 0.0;
752 if (isnan(pdp_new[i])) {
753 /* a final bit of unkonwn to be added bevore calculation
754 we use a temporary variable for this so that we
755 don't have to turn integer lines before using the value */
756 pre_unknown = pre_int;
758 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
759 rrd.pdp_prep[i].scratch[PDP_val].u_val =
760 pdp_new[i] / interval * pre_int;
762 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
763 pdp_new[i] / interval * pre_int;
768 /* if too much of the pdp_prep is unknown we dump it */
770 /* removed because this does not agree with the
771 definition that a heartbeat can be unknown */
772 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
773 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
774 /* if the interval is larger thatn mrhb we get NAN */
775 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
776 (occu_pdp_st - proc_pdp_st <=
777 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
780 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
781 / ((double) (occu_pdp_st - proc_pdp_st
784 scratch[PDP_unkn_sec_cnt].u_cnt)
788 /* process CDEF data sources; remember each CDEF DS can
789 * only reference other DS with a lower index number */
790 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
794 rpn_expand((rpn_cdefds_t *) &
795 (rrd.ds_def[i].par[DS_cdef]));
796 /* substitue data values for OP_VARIABLE nodes */
797 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
798 if (rpnp[ii].op == OP_VARIABLE) {
799 rpnp[ii].op = OP_NUMBER;
800 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
803 /* run the rpn calculator */
804 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
806 break; /* exits the data sources pdp_temp loop */
810 /* make pdp_prep ready for the next run */
811 if (isnan(pdp_new[i])) {
812 /* this is not realy accurate if we use subsecond data arival time
813 should have thought of it when going subsecond resolution ...
814 sorry next format change we will have it! */
815 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
817 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
820 rrd.pdp_prep[i].scratch[PDP_val].u_val =
821 pdp_new[i] / interval * post_int;
829 "new_unkn_sec %5lu\n",
831 rrd.pdp_prep[i].scratch[PDP_val].u_val,
832 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
836 /* if there were errors during the last loop, bail out here */
837 if (rrd_test_error()) {
842 /* compute the number of elapsed pdp_st moments */
844 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
846 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
848 if (rra_step_cnt == NULL) {
849 rra_step_cnt = (unsigned long *)
850 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
853 for (i = 0, rra_start = rra_begin;
854 i < rrd.stat_head->rra_cnt;
856 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
857 sizeof(rrd_value_t), i++) {
858 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
859 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
860 (proc_pdp_st / rrd.stat_head->pdp_step) %
861 rrd.rra_def[i].pdp_cnt;
862 if (start_pdp_offset <= elapsed_pdp_st) {
863 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
864 rrd.rra_def[i].pdp_cnt + 1;
869 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
870 /* If this is a bulk update, we need to skip ahead in
871 the seasonal arrays so that they will be correct for
872 the next observed value;
873 note that for the bulk update itself, no update will
874 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
875 and DEVPREDICT will be set to DNAN. */
876 if (rra_step_cnt[i] > 2) {
877 /* skip update by resetting rra_step_cnt[i],
878 note that this is not data source specific; this is
879 due to the bulk update, not a DNAN value for the
880 specific data source. */
882 lookup_seasonal(&rrd, i, rra_start, rrd_file,
883 elapsed_pdp_st, &last_seasonal_coef);
884 lookup_seasonal(&rrd, i, rra_start, rrd_file,
885 elapsed_pdp_st + 1, &seasonal_coef);
888 /* periodically run a smoother for seasonal effects */
889 /* Need to use first cdp parameter buffer to track
890 * burnin (burnin requires a specific smoothing schedule).
891 * The CDP_init_seasonal parameter is really an RRA level,
892 * not a data source within RRA level parameter, but the rra_def
893 * is read only for rrd_update (not flushed to disk). */
894 iii = i * (rrd.stat_head->ds_cnt);
895 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898 > rrd.rra_def[i].row_cnt - 1) {
899 /* mark off one of the burnin cycles */
900 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
905 /* someone has no doubt invented a trick to deal with this
906 * wrap around, but at least this code is clear. */
907 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
908 u_cnt > rrd.rra_ptr[i].cur_row) {
909 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
910 * mapping between PDP and CDP */
911 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
913 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
917 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
918 rrd.rra_ptr[i].cur_row,
921 par[RRA_seasonal_smooth_idx].u_cnt);
926 /* can't rely on negative numbers because we are working with
928 /* Don't need modulus here. If we've wrapped more than once, only
929 * one smooth is executed at the end. */
930 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
931 rrd.rra_def[i].row_cnt
932 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
933 rrd.rra_def[i].row_cnt >=
934 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
938 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
939 rrd.rra_ptr[i].cur_row,
942 par[RRA_seasonal_smooth_idx].u_cnt);
949 rra_current = rrd_tell(rrd_file);
951 /* if cf is DEVSEASONAL or SEASONAL */
952 if (rrd_test_error())
955 /* update CDP_PREP areas */
956 /* loop over data soures within each RRA */
957 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
959 /* iii indexes the CDP prep area for this data source within the RRA */
960 iii = i * rrd.stat_head->ds_cnt + ii;
962 if (rrd.rra_def[i].pdp_cnt > 1) {
964 if (rra_step_cnt[i] > 0) {
965 /* If we are in this block, as least 1 CDP value will be written to
966 * disk, this is the CDP_primary_val entry. If more than 1 value needs
967 * to be written, then the "fill in" value is the CDP_secondary_val
969 if (isnan(pdp_temp[ii])) {
970 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
971 u_cnt += start_pdp_offset;
972 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
975 /* CDP_secondary value is the RRA "fill in" value for intermediary
976 * CDP data entries. No matter the CF, the value is the same because
977 * the average, max, min, and last of a list of identical values is
978 * the same, namely, the value itself. */
979 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
980 u_val = pdp_temp[ii];
983 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
985 rrd.rra_def[i].pdp_cnt *
986 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
987 rrd.cdp_prep[iii].scratch[CDP_primary_val].
989 /* initialize carry over */
990 if (current_cf == CF_AVERAGE) {
991 if (isnan(pdp_temp[ii])) {
992 rrd.cdp_prep[iii].scratch[CDP_val].
995 rrd.cdp_prep[iii].scratch[CDP_val].
1000 rrd.rra_def[i].pdp_cnt);
1003 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1007 rrd_value_t cum_val, cur_val;
1009 switch (current_cf) {
1012 IFDNAN(rrd.cdp_prep[iii].
1013 scratch[CDP_val].u_val, 0.0);
1014 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1016 scratch[CDP_primary_val].u_val =
1018 cur_val * start_pdp_offset) /
1019 (rrd.rra_def[i].pdp_cnt -
1021 scratch[CDP_unkn_pdp_cnt].u_cnt);
1022 /* initialize carry over value */
1023 if (isnan(pdp_temp[ii])) {
1024 rrd.cdp_prep[iii].scratch[CDP_val].
1027 rrd.cdp_prep[iii].scratch[CDP_val].
1032 rrd.rra_def[i].pdp_cnt);
1037 IFDNAN(rrd.cdp_prep[iii].
1038 scratch[CDP_val].u_val, -DINF);
1039 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1042 (rrd.cdp_prep[iii].scratch[CDP_val].
1043 u_val) && isnan(pdp_temp[ii])) {
1045 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1050 if (cur_val > cum_val)
1052 scratch[CDP_primary_val].u_val =
1056 scratch[CDP_primary_val].u_val =
1058 /* initialize carry over value */
1059 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1064 IFDNAN(rrd.cdp_prep[iii].
1065 scratch[CDP_val].u_val, DINF);
1066 cur_val = IFDNAN(pdp_temp[ii], DINF);
1069 (rrd.cdp_prep[iii].scratch[CDP_val].
1070 u_val) && isnan(pdp_temp[ii])) {
1072 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1077 if (cur_val < cum_val)
1079 scratch[CDP_primary_val].u_val =
1083 scratch[CDP_primary_val].u_val =
1085 /* initialize carry over value */
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1092 scratch[CDP_primary_val].u_val =
1094 /* initialize carry over value */
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1099 } /* endif meets xff value requirement for a valid value */
1100 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1101 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1102 if (isnan(pdp_temp[ii]))
1103 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1107 rrd.rra_def[i].pdp_cnt;
1109 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1111 } else { /* rra_step_cnt[i] == 0 */
1115 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1117 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1121 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1123 rrd.cdp_prep[iii].scratch[CDP_val].
1127 if (isnan(pdp_temp[ii])) {
1128 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1129 u_cnt += elapsed_pdp_st;
1132 (rrd.cdp_prep[iii].scratch[CDP_val].
1134 if (current_cf == CF_AVERAGE) {
1135 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1136 pdp_temp[ii] * elapsed_pdp_st;
1138 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1143 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1145 rrd.cdp_prep[iii].scratch[CDP_val].
1149 switch (current_cf) {
1151 rrd.cdp_prep[iii].scratch[CDP_val].
1153 pdp_temp[ii] * elapsed_pdp_st;
1157 rrd.cdp_prep[iii].scratch[CDP_val].
1159 rrd.cdp_prep[iii].scratch[CDP_val].
1160 u_val = pdp_temp[ii];
1164 rrd.cdp_prep[iii].scratch[CDP_val].
1166 rrd.cdp_prep[iii].scratch[CDP_val].
1167 u_val = pdp_temp[ii];
1171 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1177 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1178 if (elapsed_pdp_st > 2) {
1179 switch (current_cf) {
1182 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1183 u_val = pdp_temp[ii];
1184 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1185 u_val = pdp_temp[ii];
1188 case CF_DEVSEASONAL:
1189 /* need to update cached seasonal values, so they are consistent
1190 * with the bulk update */
1191 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1192 * CDP_last_deviation are the same. */
1194 scratch[CDP_hw_last_seasonal].u_val =
1195 last_seasonal_coef[ii];
1196 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1197 u_val = seasonal_coef[ii];
1200 /* need to update the null_count and last_null_count.
1201 * even do this for non-DNAN pdp_temp because the
1202 * algorithm is not learning from batch updates. */
1203 rrd.cdp_prep[iii].scratch[CDP_null_count].
1204 u_cnt += elapsed_pdp_st;
1206 scratch[CDP_last_null_count].u_cnt +=
1210 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1212 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1216 /* do not count missed bulk values as failures */
1217 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1219 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1221 /* need to reset violations buffer.
1222 * could do this more carefully, but for now, just
1223 * assume a bulk update wipes away all violations. */
1224 erase_violations(&rrd, iii, i);
1228 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1230 if (rrd_test_error())
1233 } /* endif data sources loop */
1234 } /* end RRA Loop */
1236 /* this loop is only entered if elapsed_pdp_st < 3 */
1237 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1238 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1239 for (i = 0, rra_start = rra_begin;
1240 i < rrd.stat_head->rra_cnt;
1242 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1243 sizeof(rrd_value_t), i++) {
1244 if (rrd.rra_def[i].pdp_cnt > 1)
1247 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1248 if (current_cf == CF_SEASONAL
1249 || current_cf == CF_DEVSEASONAL) {
1250 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1251 elapsed_pdp_st + (scratch_idx ==
1255 rra_current = rrd_tell(rrd_file);
1257 if (rrd_test_error())
1259 /* loop over data soures within each RRA */
1260 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1261 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1262 i * (rrd.stat_head->ds_cnt) + ii,
1263 i, ii, scratch_idx, seasonal_coef);
1265 } /* end RRA Loop */
1266 if (rrd_test_error())
1268 } /* end elapsed_pdp_st loop */
1270 if (rrd_test_error())
1273 /* Ready to write to disk */
1274 /* Move sequentially through the file, writing one RRA at a time.
1275 * Note this architecture divorces the computation of CDP with
1276 * flushing updated RRA entries to disk. */
1277 for (i = 0, rra_start = rra_begin;
1278 i < rrd.stat_head->rra_cnt;
1280 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1281 sizeof(rrd_value_t), i++) {
1282 /* is th5Aere anything to write for this RRA? If not, continue. */
1283 if (rra_step_cnt[i] == 0)
1286 /* write the first row */
1288 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1290 rrd.rra_ptr[i].cur_row++;
1291 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1292 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1293 /* positition on the first row */
1294 rra_pos_tmp = rra_start +
1295 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1296 sizeof(rrd_value_t);
1297 if (rra_pos_tmp != rra_current) {
1298 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1299 rrd_set_error("seek error in rrd");
1302 rra_current = rra_pos_tmp;
1305 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1307 scratch_idx = CDP_primary_val;
1308 if (pcdp_summary != NULL) {
1309 rra_time = (current_time - current_time
1310 % (rrd.rra_def[i].pdp_cnt *
1311 rrd.stat_head->pdp_step))
1314 1) * rrd.rra_def[i].pdp_cnt *
1315 rrd.stat_head->pdp_step);
1318 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1319 scratch_idx, pcdp_summary, &rra_time);
1320 if (rrd_test_error())
1323 /* write other rows of the bulk update, if any */
1324 scratch_idx = CDP_secondary_val;
1325 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1326 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1329 "Wraparound for RRA %s, %lu updates left\n",
1330 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1333 rrd.rra_ptr[i].cur_row = 0;
1334 /* seek back to beginning of current rra */
1335 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1336 rrd_set_error("seek error in rrd");
1340 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1343 rra_current = rra_start;
1345 if (pcdp_summary != NULL) {
1346 rra_time = (current_time - current_time
1347 % (rrd.rra_def[i].pdp_cnt *
1348 rrd.stat_head->pdp_step))
1351 2) * rrd.rra_def[i].pdp_cnt *
1352 rrd.stat_head->pdp_step);
1355 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1356 scratch_idx, pcdp_summary, &rra_time);
1359 if (rrd_test_error())
1363 /* break out of the argument parsing loop if error_string is set */
1364 if (rrd_test_error()) {
1369 } /* endif a pdp_st has occurred */
1370 rrd.live_head->last_up = current_time;
1371 rrd.live_head->last_up_usec = current_time_usec;
1373 } /* function argument loop */
1375 if (seasonal_coef != NULL)
1376 free(seasonal_coef);
1377 if (last_seasonal_coef != NULL)
1378 free(last_seasonal_coef);
1379 if (rra_step_cnt != NULL)
1381 rpnstack_free(&rpnstack);
1384 //rrd_flush(rrd_file); //XXX: really needed?
1386 /* if we got here and if there is an error and if the file has not been
1387 * written to, then close things up and return. */
1388 if (rrd_test_error()) {
1389 goto err_free_pdp_new;
1392 /* aargh ... that was tough ... so many loops ... anyway, its done.
1393 * we just need to write back the live header portion now*/
1395 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1396 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1397 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1399 rrd_set_error("seek rrd for live header writeback");
1400 goto err_free_pdp_new;
1402 /* for mmap, we did already write to the underlying mapping, so we do
1403 not need to write again. */
1406 if (rrd_write(rrd_file, rrd.live_head,
1407 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1408 rrd_set_error("rrd_write live_head to rrd");
1409 goto err_free_pdp_new;
1412 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1413 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1414 rrd_set_error("rrd_write live_head to rrd");
1415 goto err_free_pdp_new;
1420 if (rrd_write(rrd_file, rrd.pdp_prep,
1421 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1422 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1423 rrd_set_error("rrd_write pdp_prep to rrd");
1424 goto err_free_pdp_new;
1427 if (rrd_write(rrd_file, rrd.cdp_prep,
1428 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1429 rrd.stat_head->ds_cnt)
1430 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1431 rrd.stat_head->ds_cnt)) {
1433 rrd_set_error("rrd_write cdp_prep to rrd");
1434 goto err_free_pdp_new;
1437 if (rrd_write(rrd_file, rrd.rra_ptr,
1438 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1439 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1440 rrd_set_error("rrd_write rra_ptr to rrd");
1441 goto err_free_pdp_new;
1444 #ifdef HAVE_POSIX_FADVISExxx
1446 /* with update we have write ops, so they will probably not be done by now, this means
1447 the buffers will not get freed. But calling this for the whole file - header
1448 will let the data off the hook as soon as it is written when if it is from a previous
1449 update cycle. Calling fdsync to force things is much too hard here. */
1451 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1452 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1453 rrd_strerror(errno));
1454 goto err_free_pdp_new;
1457 /* rrd_flush(rrd_file); */
1459 /* calling the smoothing code here guarantees at most
1460 * one smoothing operation per rrd_update call. Unfortunately,
1461 * it is possible with bulk updates, or a long-delayed update
1462 * for smoothing to occur off-schedule. This really isn't
1463 * critical except during the burning cycles. */
1464 if (schedule_smooth) {
1466 rra_start = rra_begin;
1467 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1468 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1469 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1471 fprintf(stderr, "Running smoother for rra %ld\n", i);
1473 apply_smoother(&rrd, i, rra_start, rrd_file);
1474 if (rrd_test_error())
1477 rra_start += rrd.rra_def[i].row_cnt
1478 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1480 #ifdef HAVE_POSIX_FADVISExxx
1481 /* same procedure as above ... */
1483 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1484 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1485 rrd_strerror(errno));
1486 goto err_free_pdp_new;
1492 rrd_close(rrd_file);
1509 rrd_close(rrd_file);
1517 * get exclusive lock to whole file.
1518 * lock gets removed when we close the file
1520 * returns 0 on success
1528 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1531 if (_fstat(in_file, &st) == 0) {
1532 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1539 lock.l_type = F_WRLCK; /* exclusive write lock */
1540 lock.l_len = 0; /* whole file */
1541 lock.l_start = 0; /* start of file */
1542 lock.l_whence = SEEK_SET; /* end of file */
1544 rcstat = fcntl(in_file, F_SETLK, &lock);