1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
30 #include <sys/timeb.h>
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
44 static int gettimeofday(
46 struct __timezone *tz)
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normalize time as returned by gettimeofday. usec part must
64 static inline void normalize_time(
69 t->tv_usec += 1000000L;
73 static inline info_t *write_RRA_row(
76 unsigned long rra_idx,
77 unsigned long *rra_current,
78 unsigned short CDP_scratch_idx,
82 unsigned long ds_idx, cdp_idx;
85 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86 /* compute the cdp index */
87 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
89 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
93 if (pcdp_summary != NULL) {
94 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95 /* append info to the return hash */
96 pcdp_summary = info_push(pcdp_summary,
97 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
99 rrd->rra_def[rra_idx].
101 rrd->rra_def[rra_idx].
104 ds_nam), RD_I_VAL, iv);
108 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
113 *rra_current += sizeof(rrd_value_t);
115 return (pcdp_summary);
119 const char *filename,
124 const char *filename,
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
133 info_t *rrd_update_v(
138 info_t *result = NULL;
140 struct option long_options[] = {
141 {"template", required_argument, 0, 't'},
147 opterr = 0; /* initialize getopt */
150 int option_index = 0;
153 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
164 rrd_set_error("unknown option '%s'", argv[optind - 1]);
169 /* need at least 2 arguments: filename, data. */
170 if (argc - optind < 2) {
171 rrd_set_error("Not enough arguments");
175 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176 rc.u_int = _rrd_update(argv[optind], tmplt,
178 (const char **) (argv + optind + 1), result);
179 result->value.u_int = rc.u_int;
188 struct option long_options[] = {
189 {"template", required_argument, 0, 't'},
192 int option_index = 0;
198 opterr = 0; /* initialize getopt */
201 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
208 tmplt = strdup(optarg);
212 rrd_set_error("unknown option '%s'", argv[optind - 1]);
217 /* need at least 2 arguments: filename, data. */
218 if (argc - optind < 2) {
219 rrd_set_error("Not enough arguments");
224 rc = rrd_update_r(argv[optind], tmplt,
225 argc - optind - 1, (const char **) (argv + optind + 1));
231 const char *filename,
236 return _rrd_update(filename, tmplt, argc, argv, NULL);
240 const char *filename,
244 info_t *pcdp_summary)
249 unsigned long i, ii, iii = 1;
251 unsigned long rra_begin; /* byte pointer to the rra
252 * area in the rrd file. this
253 * pointer never changes value */
254 unsigned long rra_start; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer changes as each rrd is
258 unsigned long rra_current; /* byte pointer to the current write
259 * spot in the rrd file. */
260 unsigned long rra_pos_tmp; /* temporary byte pointer. */
261 double interval, pre_int, post_int; /* interval between this and
263 unsigned long proc_pdp_st; /* which pdp_st was the last
265 unsigned long occu_pdp_st; /* when was the pdp_st
266 * before the last update
268 unsigned long proc_pdp_age; /* how old was the data in
269 * the pdp prep area when it
270 * was last updated */
271 unsigned long occu_pdp_age; /* how long ago was the last
273 rrd_value_t *pdp_new; /* prepare the incoming data
274 * to be added the the
276 rrd_value_t *pdp_temp; /* prepare the pdp values
277 * to be added the the
280 long *tmpl_idx; /* index representing the settings
281 transported by the tmplt index */
282 unsigned long tmpl_cnt = 2; /* time and data */
285 time_t current_time = 0;
286 time_t rra_time = 0; /* time of update for a RRA */
287 unsigned long current_time_usec = 0; /* microseconds part of current time */
288 struct timeval tmp_time; /* used for time conversion */
291 int schedule_smooth = 0;
292 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
294 /* a vector of future Holt-Winters seasonal coefs */
295 unsigned long elapsed_pdp_st;
297 /* number of elapsed PDP steps since last update */
298 unsigned long *rra_step_cnt = NULL;
300 /* number of rows to be updated in an RRA for a data
302 unsigned long start_pdp_offset;
304 /* number of PDP steps since the last update that
305 * are assigned to the first CDP to be generated
306 * since the last update. */
307 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
312 /* numeric id of the current consolidation function */
313 rpnstack_t rpnstack; /* used for COMPUTE DS */
314 int version; /* rrd version */
315 char *endptr; /* used in the conversion */
316 rrd_file_t *rrd_file;
318 rpnstack_init(&rpnstack);
320 /* need at least 1 arguments: data. */
322 rrd_set_error("Not enough arguments");
326 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327 if (rrd_file == NULL) {
330 /* We are now at the beginning of the rra's */
331 rra_current = rra_start = rra_begin = rrd_file->header_len;
333 /* initialize time */
334 version = atoi(rrd.stat_head->version);
335 gettimeofday(&tmp_time, 0);
336 normalize_time(&tmp_time);
337 current_time = tmp_time.tv_sec;
339 current_time_usec = tmp_time.tv_usec;
341 current_time_usec = 0;
344 /* get exclusive lock to whole file.
345 * lock gets removed when we close the file.
347 if (LockRRD(rrd_file->fd) != 0) {
348 rrd_set_error("could not lock RRD");
353 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354 rrd_set_error("allocating updvals pointer array");
358 if ((pdp_temp = malloc(sizeof(rrd_value_t)
359 * rrd.stat_head->ds_cnt)) == NULL) {
360 rrd_set_error("allocating pdp_temp ...");
361 goto err_free_updvals;
364 if ((tmpl_idx = malloc(sizeof(unsigned long)
365 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366 rrd_set_error("allocating tmpl_idx ...");
367 goto err_free_pdp_temp;
369 /* initialize tmplt redirector */
370 /* default config example (assume DS 1 is a CDEF DS)
371 tmpl_idx[0] -> 0; (time)
372 tmpl_idx[1] -> 1; (DS 0)
373 tmpl_idx[2] -> 3; (DS 2)
374 tmpl_idx[3] -> 4; (DS 3) */
375 tmpl_idx[0] = 0; /* time */
376 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
383 /* we should work on a writeable copy here */
385 unsigned int tmpl_len;
386 char *tmplt_copy = strdup(tmplt);
389 tmpl_cnt = 1; /* the first entry is the time */
390 tmpl_len = strlen(tmplt_copy);
391 for (i = 0; i <= tmpl_len; i++) {
392 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393 tmplt_copy[i] = '\0';
394 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
396 ("tmplt contains more DS definitions than RRD");
397 goto err_free_tmpl_idx;
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400 rrd_set_error("unknown DS name '%s'", dsname);
401 goto err_free_tmpl_idx;
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt - 1]++;
405 /* go to the next entry on the tmplt_copy */
406 dsname = &tmplt_copy[i + 1];
407 /* fix the damage we did before */
417 if ((pdp_new = malloc(sizeof(rrd_value_t)
418 * rrd.stat_head->ds_cnt)) == NULL) {
419 rrd_set_error("allocating pdp_new ...");
420 goto err_free_tmpl_idx;
422 /* loop through the arguments. */
423 for (arg_i = 0; arg_i < argc; arg_i++) {
424 char *stepper = strdup(argv[arg_i]);
425 char *step_start = stepper;
427 char *parsetime_error = NULL;
428 enum { atstyle, normal } timesyntax;
429 struct rrd_time_value ds_tv;
431 if (stepper == NULL) {
432 rrd_set_error("failed duplication argv entry");
434 goto err_free_pdp_new;
436 /* initialize all ds input to unknown except the first one
437 which has always got to be set */
438 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
440 updvals[0] = stepper;
441 /* separate all ds elements; first must be examined separately
442 due to alternate time syntax */
443 if ((p = strchr(stepper, '@')) != NULL) {
444 timesyntax = atstyle;
447 } else if ((p = strchr(stepper, ':')) != NULL) {
453 ("expected timestamp not found in data source from %s",
459 updvals[tmpl_idx[ii]] = stepper;
461 if (*stepper == ':') {
465 updvals[tmpl_idx[ii]] = stepper + 1;
471 if (ii != tmpl_cnt - 1) {
473 ("expected %lu data source readings (got %lu) from %s",
474 tmpl_cnt - 1, ii, argv[arg_i]);
479 /* get the time from the reading ... handle N */
480 if (timesyntax == atstyle) {
481 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
486 if (ds_tv.type == RELATIVE_TO_END_TIME ||
487 ds_tv.type == RELATIVE_TO_START_TIME) {
488 rrd_set_error("specifying time relative to the 'start' "
489 "or 'end' makes no sense here: %s", updvals[0]);
494 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
496 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
498 } else if (strcmp(updvals[0], "N") == 0) {
499 gettimeofday(&tmp_time, 0);
500 normalize_time(&tmp_time);
501 current_time = tmp_time.tv_sec;
502 current_time_usec = tmp_time.tv_usec;
506 old_locale = setlocale(LC_NUMERIC,"C");
507 tmp = strtod(updvals[0], 0);
508 setlocale(LC_NUMERIC,old_locale);
509 current_time = floor(tmp);
511 (long) ((tmp - (double) current_time) * 1000000.0);
513 /* dont do any correction for old version RRDs */
515 current_time_usec = 0;
517 if (current_time < rrd.live_head->last_up ||
518 (current_time == rrd.live_head->last_up &&
519 (long) current_time_usec <=
520 (long) rrd.live_head->last_up_usec)) {
521 rrd_set_error("illegal attempt to update using time %ld when "
522 "last update time is %ld (minimum one second step)",
523 current_time, rrd.live_head->last_up);
528 /* seek to the beginning of the rra's */
529 if (rra_current != rra_begin) {
531 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
532 rrd_set_error("seek error in rrd");
537 rra_current = rra_begin;
539 rra_start = rra_begin;
541 /* when was the current pdp started */
542 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
543 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
545 /* when did the last pdp_st occur */
546 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
547 occu_pdp_st = current_time - occu_pdp_age;
549 /* interval = current_time - rrd.live_head->last_up; */
550 interval = (double) (current_time - rrd.live_head->last_up)
551 + (double) ((long) current_time_usec -
552 (long) rrd.live_head->last_up_usec) / 1000000.0;
554 if (occu_pdp_st > proc_pdp_st) {
555 /* OK we passed the pdp_st moment */
556 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
557 * occurred before the latest
559 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
560 post_int = occu_pdp_age; /* how much after it */
561 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
568 printf("proc_pdp_age %lu\t"
574 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
575 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
578 /* process the data sources and update the pdp_prep
579 * area accordingly */
580 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
583 dst_idx = dst_conv(rrd.ds_def[i].dst);
585 /* make sure we do not build diffs with old last_ds values */
586 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
587 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
588 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
591 /* NOTE: DST_CDEF should never enter this if block, because
592 * updvals[i+1][0] is initialized to 'U'; unless the caller
593 * accidently specified a value for the DST_CDEF. To handle
594 * this case, an extra check is required. */
596 if ((updvals[i + 1][0] != 'U') &&
597 (dst_idx != DST_CDEF) &&
598 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
602 /* the data source type defines how to process the data */
603 /* pdp_new contains rate * time ... eg the bytes
604 * transferred during the interval. Doing it this way saves
605 * a lot of math operations */
609 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
610 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611 if ((updvals[i + 1][ii] < '0'
612 || updvals[i + 1][ii] > '9') && (ii != 0
618 rrd_set_error("not a simple integer: '%s'",
623 if (rrd_test_error()) {
627 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
628 if (dst_idx == DST_COUNTER) {
629 /* simple overflow catcher suggested by Andres Kroonmaa */
630 /* this will fail terribly for non 32 or 64 bit counters ... */
631 /* are there any others in SNMP land ? */
632 if (pdp_new[i] < (double) 0.0)
633 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
634 if (pdp_new[i] < (double) 0.0)
635 pdp_new[i] += (double) 18446744069414584320.0;
638 rate = pdp_new[i] / interval;
644 old_locale = setlocale(LC_NUMERIC,"C");
646 pdp_new[i] = strtod(updvals[i + 1], &endptr);
647 setlocale(LC_NUMERIC,old_locale);
649 rrd_set_error("converting '%s' to float: %s",
650 updvals[i + 1], rrd_strerror(errno));
653 if (endptr[0] != '\0') {
655 ("conversion of '%s' to float not complete: tail '%s'",
656 updvals[i + 1], endptr);
659 rate = pdp_new[i] / interval;
663 old_locale = setlocale(LC_NUMERIC,"C");
664 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
665 setlocale(LC_NUMERIC,old_locale);
667 rrd_set_error("converting '%s' to float: %s",
668 updvals[i + 1], rrd_strerror(errno));
671 if (endptr[0] != '\0') {
673 ("conversion of '%s' to float not complete: tail '%s'",
674 updvals[i + 1], endptr);
677 rate = pdp_new[i] / interval;
680 rrd_set_error("rrd contains unknown DS type : '%s'",
684 /* break out of this for loop if the error string is set */
685 if (rrd_test_error()) {
688 /* make sure pdp_temp is neither too large or too small
689 * if any of these occur it becomes unknown ...
692 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
693 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
694 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
695 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
699 /* no news is news all the same */
704 /* make a copy of the command line argument for the next run */
711 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
713 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
714 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
716 /* break out of the argument parsing loop if the error_string is set */
717 if (rrd_test_error()) {
721 /* has a pdp_st moment occurred since the last run ? */
723 if (proc_pdp_st == occu_pdp_st) {
724 /* no we have not passed a pdp_st moment. therefore update is simple */
726 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
727 if (isnan(pdp_new[i])) {
728 /* this is not realy accurate if we use subsecond data arival time
729 should have thought of it when going subsecond resolution ...
730 sorry next format change we will have it! */
731 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
734 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
735 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
737 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
746 rrd.pdp_prep[i].scratch[PDP_val].u_val,
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
751 /* an pdp_st has occurred. */
753 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
754 rate*seconds which occurred up to the last run.
755 pdp_new[] contains rate*seconds from the latest run.
756 pdp_temp[] will contain the rate for cdp */
758 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
759 /* update pdp_prep to the current pdp_st. */
760 double pre_unknown = 0.0;
762 if (isnan(pdp_new[i])) {
763 /* a final bit of unkonwn to be added bevore calculation
764 we use a temporary variable for this so that we
765 don't have to turn integer lines before using the value */
766 pre_unknown = pre_int;
768 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
769 rrd.pdp_prep[i].scratch[PDP_val].u_val =
770 pdp_new[i] / interval * pre_int;
772 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
773 pdp_new[i] / interval * pre_int;
778 /* if too much of the pdp_prep is unknown we dump it */
780 /* removed because this does not agree with the
781 definition that a heartbeat can be unknown */
782 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
783 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
784 /* if the interval is larger thatn mrhb we get NAN */
785 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
786 (occu_pdp_st - proc_pdp_st <=
787 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
790 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
791 / ((double) (occu_pdp_st - proc_pdp_st
794 scratch[PDP_unkn_sec_cnt].u_cnt)
798 /* process CDEF data sources; remember each CDEF DS can
799 * only reference other DS with a lower index number */
800 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
804 rpn_expand((rpn_cdefds_t *) &
805 (rrd.ds_def[i].par[DS_cdef]));
806 /* substitue data values for OP_VARIABLE nodes */
807 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
808 if (rpnp[ii].op == OP_VARIABLE) {
809 rpnp[ii].op = OP_NUMBER;
810 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
813 /* run the rpn calculator */
814 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
816 break; /* exits the data sources pdp_temp loop */
820 /* make pdp_prep ready for the next run */
821 if (isnan(pdp_new[i])) {
822 /* this is not realy accurate if we use subsecond data arival time
823 should have thought of it when going subsecond resolution ...
824 sorry next format change we will have it! */
825 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
827 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
829 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
830 rrd.pdp_prep[i].scratch[PDP_val].u_val =
831 pdp_new[i] / interval * post_int;
839 "new_unkn_sec %5lu\n",
841 rrd.pdp_prep[i].scratch[PDP_val].u_val,
842 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
846 /* if there were errors during the last loop, bail out here */
847 if (rrd_test_error()) {
852 /* compute the number of elapsed pdp_st moments */
854 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
856 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
858 if (rra_step_cnt == NULL) {
859 rra_step_cnt = (unsigned long *)
860 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
863 for (i = 0, rra_start = rra_begin;
864 i < rrd.stat_head->rra_cnt;
866 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
867 sizeof(rrd_value_t), i++) {
868 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
869 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
870 (proc_pdp_st / rrd.stat_head->pdp_step) %
871 rrd.rra_def[i].pdp_cnt;
872 if (start_pdp_offset <= elapsed_pdp_st) {
873 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
874 rrd.rra_def[i].pdp_cnt + 1;
879 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
880 /* If this is a bulk update, we need to skip ahead in
881 the seasonal arrays so that they will be correct for
882 the next observed value;
883 note that for the bulk update itself, no update will
884 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
885 and DEVPREDICT will be set to DNAN. */
886 if (rra_step_cnt[i] > 2) {
887 /* skip update by resetting rra_step_cnt[i],
888 note that this is not data source specific; this is
889 due to the bulk update, not a DNAN value for the
890 specific data source. */
892 lookup_seasonal(&rrd, i, rra_start, rrd_file,
893 elapsed_pdp_st, &last_seasonal_coef);
894 lookup_seasonal(&rrd, i, rra_start, rrd_file,
895 elapsed_pdp_st + 1, &seasonal_coef);
898 /* periodically run a smoother for seasonal effects */
899 /* Need to use first cdp parameter buffer to track
900 * burnin (burnin requires a specific smoothing schedule).
901 * The CDP_init_seasonal parameter is really an RRA level,
902 * not a data source within RRA level parameter, but the rra_def
903 * is read only for rrd_update (not flushed to disk). */
904 iii = i * (rrd.stat_head->ds_cnt);
905 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
907 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
908 > rrd.rra_def[i].row_cnt - 1) {
909 /* mark off one of the burnin cycles */
910 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
915 /* someone has no doubt invented a trick to deal with this
916 * wrap around, but at least this code is clear. */
917 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
918 u_cnt > rrd.rra_ptr[i].cur_row) {
919 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
920 * mapping between PDP and CDP */
921 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
923 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
927 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
928 rrd.rra_ptr[i].cur_row,
931 par[RRA_seasonal_smooth_idx].u_cnt);
936 /* can't rely on negative numbers because we are working with
938 /* Don't need modulus here. If we've wrapped more than once, only
939 * one smooth is executed at the end. */
940 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
941 rrd.rra_def[i].row_cnt
942 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
943 rrd.rra_def[i].row_cnt >=
944 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
948 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
949 rrd.rra_ptr[i].cur_row,
952 par[RRA_seasonal_smooth_idx].u_cnt);
959 rra_current = rrd_tell(rrd_file);
961 /* if cf is DEVSEASONAL or SEASONAL */
962 if (rrd_test_error())
965 /* update CDP_PREP areas */
966 /* loop over data soures within each RRA */
967 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
969 /* iii indexes the CDP prep area for this data source within the RRA */
970 iii = i * rrd.stat_head->ds_cnt + ii;
972 if (rrd.rra_def[i].pdp_cnt > 1) {
974 if (rra_step_cnt[i] > 0) {
975 /* If we are in this block, as least 1 CDP value will be written to
976 * disk, this is the CDP_primary_val entry. If more than 1 value needs
977 * to be written, then the "fill in" value is the CDP_secondary_val
979 if (isnan(pdp_temp[ii])) {
980 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
981 u_cnt += start_pdp_offset;
982 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
985 /* CDP_secondary value is the RRA "fill in" value for intermediary
986 * CDP data entries. No matter the CF, the value is the same because
987 * the average, max, min, and last of a list of identical values is
988 * the same, namely, the value itself. */
989 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
990 u_val = pdp_temp[ii];
993 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
995 rrd.rra_def[i].pdp_cnt *
996 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
997 rrd.cdp_prep[iii].scratch[CDP_primary_val].
999 /* initialize carry over */
1000 if (current_cf == CF_AVERAGE) {
1001 if (isnan(pdp_temp[ii])) {
1002 rrd.cdp_prep[iii].scratch[CDP_val].
1005 rrd.cdp_prep[iii].scratch[CDP_val].
1010 rrd.rra_def[i].pdp_cnt);
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1017 rrd_value_t cum_val, cur_val;
1019 switch (current_cf) {
1022 IFDNAN(rrd.cdp_prep[iii].
1023 scratch[CDP_val].u_val, 0.0);
1024 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1026 scratch[CDP_primary_val].u_val =
1028 cur_val * start_pdp_offset) /
1029 (rrd.rra_def[i].pdp_cnt -
1031 scratch[CDP_unkn_pdp_cnt].u_cnt);
1032 /* initialize carry over value */
1033 if (isnan(pdp_temp[ii])) {
1034 rrd.cdp_prep[iii].scratch[CDP_val].
1037 rrd.cdp_prep[iii].scratch[CDP_val].
1042 rrd.rra_def[i].pdp_cnt);
1047 IFDNAN(rrd.cdp_prep[iii].
1048 scratch[CDP_val].u_val, -DINF);
1049 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1052 (rrd.cdp_prep[iii].scratch[CDP_val].
1053 u_val) && isnan(pdp_temp[ii])) {
1055 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1060 if (cur_val > cum_val)
1062 scratch[CDP_primary_val].u_val =
1066 scratch[CDP_primary_val].u_val =
1068 /* initialize carry over value */
1069 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1074 IFDNAN(rrd.cdp_prep[iii].
1075 scratch[CDP_val].u_val, DINF);
1076 cur_val = IFDNAN(pdp_temp[ii], DINF);
1079 (rrd.cdp_prep[iii].scratch[CDP_val].
1080 u_val) && isnan(pdp_temp[ii])) {
1082 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1087 if (cur_val < cum_val)
1089 scratch[CDP_primary_val].u_val =
1093 scratch[CDP_primary_val].u_val =
1095 /* initialize carry over value */
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1102 scratch[CDP_primary_val].u_val =
1104 /* initialize carry over value */
1105 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1109 } /* endif meets xff value requirement for a valid value */
1110 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1111 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1112 if (isnan(pdp_temp[ii]))
1113 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1117 rrd.rra_def[i].pdp_cnt;
1119 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1121 } else { /* rra_step_cnt[i] == 0 */
1125 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1127 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1131 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1133 rrd.cdp_prep[iii].scratch[CDP_val].
1137 if (isnan(pdp_temp[ii])) {
1138 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1139 u_cnt += elapsed_pdp_st;
1142 (rrd.cdp_prep[iii].scratch[CDP_val].
1144 if (current_cf == CF_AVERAGE) {
1145 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1146 pdp_temp[ii] * elapsed_pdp_st;
1148 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1153 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1155 rrd.cdp_prep[iii].scratch[CDP_val].
1159 switch (current_cf) {
1161 rrd.cdp_prep[iii].scratch[CDP_val].
1163 pdp_temp[ii] * elapsed_pdp_st;
1167 rrd.cdp_prep[iii].scratch[CDP_val].
1169 rrd.cdp_prep[iii].scratch[CDP_val].
1170 u_val = pdp_temp[ii];
1174 rrd.cdp_prep[iii].scratch[CDP_val].
1176 rrd.cdp_prep[iii].scratch[CDP_val].
1177 u_val = pdp_temp[ii];
1181 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1187 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1188 if (elapsed_pdp_st > 2) {
1189 switch (current_cf) {
1192 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1193 u_val = pdp_temp[ii];
1194 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1195 u_val = pdp_temp[ii];
1198 case CF_DEVSEASONAL:
1199 /* need to update cached seasonal values, so they are consistent
1200 * with the bulk update */
1201 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1202 * CDP_last_deviation are the same. */
1204 scratch[CDP_hw_last_seasonal].u_val =
1205 last_seasonal_coef[ii];
1206 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1207 u_val = seasonal_coef[ii];
1211 /* need to update the null_count and last_null_count.
1212 * even do this for non-DNAN pdp_temp because the
1213 * algorithm is not learning from batch updates. */
1214 rrd.cdp_prep[iii].scratch[CDP_null_count].
1215 u_cnt += elapsed_pdp_st;
1217 scratch[CDP_last_null_count].u_cnt +=
1221 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1223 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1227 /* do not count missed bulk values as failures */
1228 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1230 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1232 /* need to reset violations buffer.
1233 * could do this more carefully, but for now, just
1234 * assume a bulk update wipes away all violations. */
1235 erase_violations(&rrd, iii, i);
1239 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1241 if (rrd_test_error())
1244 } /* endif data sources loop */
1245 } /* end RRA Loop */
1247 /* this loop is only entered if elapsed_pdp_st < 3 */
1248 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1249 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1250 for (i = 0, rra_start = rra_begin;
1251 i < rrd.stat_head->rra_cnt;
1253 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1254 sizeof(rrd_value_t), i++) {
1255 if (rrd.rra_def[i].pdp_cnt > 1)
1258 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1259 if (current_cf == CF_SEASONAL
1260 || current_cf == CF_DEVSEASONAL) {
1261 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1262 elapsed_pdp_st + (scratch_idx ==
1266 rra_current = rrd_tell(rrd_file);
1268 if (rrd_test_error())
1270 /* loop over data soures within each RRA */
1271 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1272 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1273 i * (rrd.stat_head->ds_cnt) + ii,
1274 i, ii, scratch_idx, seasonal_coef);
1276 } /* end RRA Loop */
1277 if (rrd_test_error())
1279 } /* end elapsed_pdp_st loop */
1281 if (rrd_test_error())
1284 /* Ready to write to disk */
1285 /* Move sequentially through the file, writing one RRA at a time.
1286 * Note this architecture divorces the computation of CDP with
1287 * flushing updated RRA entries to disk. */
1288 for (i = 0, rra_start = rra_begin;
1289 i < rrd.stat_head->rra_cnt;
1291 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1292 sizeof(rrd_value_t), i++) {
1293 /* is th5Aere anything to write for this RRA? If not, continue. */
1294 if (rra_step_cnt[i] == 0)
1297 /* write the first row */
1299 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1301 rrd.rra_ptr[i].cur_row++;
1302 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1303 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1304 /* positition on the first row */
1305 rra_pos_tmp = rra_start +
1306 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1307 sizeof(rrd_value_t);
1308 if (rra_pos_tmp != rra_current) {
1309 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1310 rrd_set_error("seek error in rrd");
1313 rra_current = rra_pos_tmp;
1316 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1318 scratch_idx = CDP_primary_val;
1319 if (pcdp_summary != NULL) {
1320 rra_time = (current_time - current_time
1321 % (rrd.rra_def[i].pdp_cnt *
1322 rrd.stat_head->pdp_step))
1325 1) * rrd.rra_def[i].pdp_cnt *
1326 rrd.stat_head->pdp_step);
1329 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1330 scratch_idx, pcdp_summary, &rra_time);
1331 if (rrd_test_error())
1334 /* write other rows of the bulk update, if any */
1335 scratch_idx = CDP_secondary_val;
1336 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1337 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1340 "Wraparound for RRA %s, %lu updates left\n",
1341 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1344 rrd.rra_ptr[i].cur_row = 0;
1345 /* seek back to beginning of current rra */
1346 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1347 rrd_set_error("seek error in rrd");
1351 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1354 rra_current = rra_start;
1356 if (pcdp_summary != NULL) {
1357 rra_time = (current_time - current_time
1358 % (rrd.rra_def[i].pdp_cnt *
1359 rrd.stat_head->pdp_step))
1362 2) * rrd.rra_def[i].pdp_cnt *
1363 rrd.stat_head->pdp_step);
1366 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1367 scratch_idx, pcdp_summary, &rra_time);
1370 if (rrd_test_error())
1374 /* break out of the argument parsing loop if error_string is set */
1375 if (rrd_test_error()) {
1380 } /* endif a pdp_st has occurred */
1381 rrd.live_head->last_up = current_time;
1382 rrd.live_head->last_up_usec = current_time_usec;
1384 } /* function argument loop */
1386 if (seasonal_coef != NULL)
1387 free(seasonal_coef);
1388 if (last_seasonal_coef != NULL)
1389 free(last_seasonal_coef);
1390 if (rra_step_cnt != NULL)
1392 rpnstack_free(&rpnstack);
1395 //rrd_flush(rrd_file); //XXX: really needed?
1397 /* if we got here and if there is an error and if the file has not been
1398 * written to, then close things up and return. */
1399 if (rrd_test_error()) {
1400 goto err_free_pdp_new;
1403 /* aargh ... that was tough ... so many loops ... anyway, its done.
1404 * we just need to write back the live header portion now*/
1406 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1407 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1408 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1410 rrd_set_error("seek rrd for live header writeback");
1411 goto err_free_pdp_new;
1413 /* for mmap, we did already write to the underlying mapping, so we do
1414 not need to write again. */
1417 if (rrd_write(rrd_file, rrd.live_head,
1418 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1419 rrd_set_error("rrd_write live_head to rrd");
1420 goto err_free_pdp_new;
1423 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1424 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1425 rrd_set_error("rrd_write live_head to rrd");
1426 goto err_free_pdp_new;
1431 if (rrd_write(rrd_file, rrd.pdp_prep,
1432 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1433 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1434 rrd_set_error("rrd_write pdp_prep to rrd");
1435 goto err_free_pdp_new;
1438 if (rrd_write(rrd_file, rrd.cdp_prep,
1439 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1440 rrd.stat_head->ds_cnt)
1441 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1442 rrd.stat_head->ds_cnt)) {
1444 rrd_set_error("rrd_write cdp_prep to rrd");
1445 goto err_free_pdp_new;
1448 if (rrd_write(rrd_file, rrd.rra_ptr,
1449 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1450 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1451 rrd_set_error("rrd_write rra_ptr to rrd");
1452 goto err_free_pdp_new;
1456 /* rrd_flush(rrd_file); */
1458 /* calling the smoothing code here guarantees at most
1459 * one smoothing operation per rrd_update call. Unfortunately,
1460 * it is possible with bulk updates, or a long-delayed update
1461 * for smoothing to occur off-schedule. This really isn't
1462 * critical except during the burning cycles. */
1463 if (schedule_smooth) {
1465 rra_start = rra_begin;
1466 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1467 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1468 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1470 fprintf(stderr, "Running smoother for rra %ld\n", i);
1472 apply_smoother(&rrd, i, rra_start, rrd_file);
1473 if (rrd_test_error())
1476 rra_start += rrd.rra_def[i].row_cnt
1477 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1481 /* rrd_dontneed(rrd_file,&rrd); */
1483 rrd_close(rrd_file);
1500 rrd_close(rrd_file);
1508 * get exclusive lock to whole file.
1509 * lock gets removed when we close the file
1511 * returns 0 on success
1519 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1522 if (_fstat(in_file, &st) == 0) {
1523 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1530 lock.l_type = F_WRLCK; /* exclusive write lock */
1531 lock.l_len = 0; /* whole file */
1532 lock.l_start = 0; /* start of file */
1533 lock.l_whence = SEEK_SET; /* end of file */
1535 rcstat = fcntl(in_file, F_SETLK, &lock);