1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
30 #include <sys/timeb.h>
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
44 static int gettimeofday(
46 struct __timezone *tz)
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normalize time as returned by gettimeofday. usec part must
64 static inline void normalize_time(
69 t->tv_usec += 1000000L;
73 static inline info_t *write_RRA_row(
76 unsigned long rra_idx,
77 unsigned long *rra_current,
78 unsigned short CDP_scratch_idx,
82 unsigned long ds_idx, cdp_idx;
85 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86 /* compute the cdp index */
87 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
89 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
93 if (pcdp_summary != NULL) {
94 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95 /* append info to the return hash */
96 pcdp_summary = info_push(pcdp_summary,
97 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
99 rrd->rra_def[rra_idx].
101 rrd->rra_def[rra_idx].
104 ds_nam), RD_I_VAL, iv);
108 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
113 *rra_current += sizeof(rrd_value_t);
115 return (pcdp_summary);
119 const char *filename,
124 const char *filename,
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
133 info_t *rrd_update_v(
138 info_t *result = NULL;
140 struct option long_options[] = {
141 {"template", required_argument, 0, 't'},
147 opterr = 0; /* initialize getopt */
150 int option_index = 0;
153 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
164 rrd_set_error("unknown option '%s'", argv[optind - 1]);
169 /* need at least 2 arguments: filename, data. */
170 if (argc - optind < 2) {
171 rrd_set_error("Not enough arguments");
175 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176 rc.u_int = _rrd_update(argv[optind], tmplt,
178 (const char **) (argv + optind + 1), result);
179 result->value.u_int = rc.u_int;
188 struct option long_options[] = {
189 {"template", required_argument, 0, 't'},
192 int option_index = 0;
198 opterr = 0; /* initialize getopt */
201 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
208 tmplt = strdup(optarg);
212 rrd_set_error("unknown option '%s'", argv[optind - 1]);
217 /* need at least 2 arguments: filename, data. */
218 if (argc - optind < 2) {
219 rrd_set_error("Not enough arguments");
224 rc = rrd_update_r(argv[optind], tmplt,
225 argc - optind - 1, (const char **) (argv + optind + 1));
231 const char *filename,
236 return _rrd_update(filename, tmplt, argc, argv, NULL);
240 const char *filename,
244 info_t *pcdp_summary)
249 unsigned long i, ii, iii = 1;
251 unsigned long rra_begin; /* byte pointer to the rra
252 * area in the rrd file. this
253 * pointer never changes value */
254 unsigned long rra_start; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer changes as each rrd is
258 unsigned long rra_current; /* byte pointer to the current write
259 * spot in the rrd file. */
260 unsigned long rra_pos_tmp; /* temporary byte pointer. */
261 double interval, pre_int, post_int; /* interval between this and
263 unsigned long proc_pdp_st; /* which pdp_st was the last
265 unsigned long occu_pdp_st; /* when was the pdp_st
266 * before the last update
268 unsigned long proc_pdp_age; /* how old was the data in
269 * the pdp prep area when it
270 * was last updated */
271 unsigned long occu_pdp_age; /* how long ago was the last
273 rrd_value_t *pdp_new; /* prepare the incoming data
274 * to be added the the
276 rrd_value_t *pdp_temp; /* prepare the pdp values
277 * to be added the the
280 long *tmpl_idx; /* index representing the settings
281 transported by the tmplt index */
282 unsigned long tmpl_cnt = 2; /* time and data */
285 time_t current_time = 0;
286 time_t rra_time = 0; /* time of update for a RRA */
287 unsigned long current_time_usec = 0; /* microseconds part of current time */
288 struct timeval tmp_time; /* used for time conversion */
291 int schedule_smooth = 0;
292 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
294 /* a vector of future Holt-Winters seasonal coefs */
295 unsigned long elapsed_pdp_st;
297 /* number of elapsed PDP steps since last update */
298 unsigned long *rra_step_cnt = NULL;
300 /* number of rows to be updated in an RRA for a data
302 unsigned long start_pdp_offset;
304 /* number of PDP steps since the last update that
305 * are assigned to the first CDP to be generated
306 * since the last update. */
307 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
312 /* numeric id of the current consolidation function */
313 rpnstack_t rpnstack; /* used for COMPUTE DS */
314 int version; /* rrd version */
315 char *endptr; /* used in the conversion */
316 rrd_file_t *rrd_file;
318 rpnstack_init(&rpnstack);
320 /* need at least 1 arguments: data. */
322 rrd_set_error("Not enough arguments");
326 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327 if (rrd_file == NULL) {
330 /* We are now at the beginning of the rra's */
331 rra_current = rra_start = rra_begin = rrd_file->header_len;
333 /* initialize time */
334 version = atoi(rrd.stat_head->version);
335 gettimeofday(&tmp_time, 0);
336 normalize_time(&tmp_time);
337 current_time = tmp_time.tv_sec;
339 current_time_usec = tmp_time.tv_usec;
341 current_time_usec = 0;
344 /* get exclusive lock to whole file.
345 * lock gets removed when we close the file.
347 if (LockRRD(rrd_file->fd) != 0) {
348 rrd_set_error("could not lock RRD");
353 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354 rrd_set_error("allocating updvals pointer array");
358 if ((pdp_temp = malloc(sizeof(rrd_value_t)
359 * rrd.stat_head->ds_cnt)) == NULL) {
360 rrd_set_error("allocating pdp_temp ...");
361 goto err_free_updvals;
364 if ((tmpl_idx = malloc(sizeof(unsigned long)
365 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366 rrd_set_error("allocating tmpl_idx ...");
367 goto err_free_pdp_temp;
369 /* initialize tmplt redirector */
370 /* default config example (assume DS 1 is a CDEF DS)
371 tmpl_idx[0] -> 0; (time)
372 tmpl_idx[1] -> 1; (DS 0)
373 tmpl_idx[2] -> 3; (DS 2)
374 tmpl_idx[3] -> 4; (DS 3) */
375 tmpl_idx[0] = 0; /* time */
376 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
383 /* we should work on a writeable copy here */
385 unsigned int tmpl_len;
386 char *tmplt_copy = strdup(tmplt);
389 tmpl_cnt = 1; /* the first entry is the time */
390 tmpl_len = strlen(tmplt_copy);
391 for (i = 0; i <= tmpl_len; i++) {
392 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393 tmplt_copy[i] = '\0';
394 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
396 ("tmplt contains more DS definitions than RRD");
397 goto err_free_tmpl_idx;
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400 rrd_set_error("unknown DS name '%s'", dsname);
401 goto err_free_tmpl_idx;
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt - 1]++;
405 /* go to the next entry on the tmplt_copy */
406 dsname = &tmplt_copy[i + 1];
407 /* fix the damage we did before */
417 if ((pdp_new = malloc(sizeof(rrd_value_t)
418 * rrd.stat_head->ds_cnt)) == NULL) {
419 rrd_set_error("allocating pdp_new ...");
420 goto err_free_tmpl_idx;
422 /* loop through the arguments. */
423 for (arg_i = 0; arg_i < argc; arg_i++) {
424 char *stepper = strdup(argv[arg_i]);
425 char *step_start = stepper;
427 char *parsetime_error = NULL;
428 enum { atstyle, normal } timesyntax;
429 struct rrd_time_value ds_tv;
431 if (stepper == NULL) {
432 rrd_set_error("failed duplication argv entry");
434 goto err_free_pdp_new;
436 /* initialize all ds input to unknown except the first one
437 which has always got to be set */
438 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
440 updvals[0] = stepper;
441 /* separate all ds elements; first must be examined separately
442 due to alternate time syntax */
443 if ((p = strchr(stepper, '@')) != NULL) {
444 timesyntax = atstyle;
447 } else if ((p = strchr(stepper, ':')) != NULL) {
453 ("expected timestamp not found in data source from %s",
459 updvals[tmpl_idx[ii]] = stepper;
461 if (*stepper == ':') {
465 updvals[tmpl_idx[ii]] = stepper + 1;
471 if (ii != tmpl_cnt - 1) {
473 ("expected %lu data source readings (got %lu) from %s",
474 tmpl_cnt - 1, ii, argv[arg_i]);
479 /* get the time from the reading ... handle N */
480 if (timesyntax == atstyle) {
481 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
486 if (ds_tv.type == RELATIVE_TO_END_TIME ||
487 ds_tv.type == RELATIVE_TO_START_TIME) {
488 rrd_set_error("specifying time relative to the 'start' "
489 "or 'end' makes no sense here: %s", updvals[0]);
494 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
496 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
498 } else if (strcmp(updvals[0], "N") == 0) {
499 gettimeofday(&tmp_time, 0);
500 normalize_time(&tmp_time);
501 current_time = tmp_time.tv_sec;
502 current_time_usec = tmp_time.tv_usec;
507 old_locale = setlocale(LC_NUMERIC, "C");
508 tmp = strtod(updvals[0], 0);
509 setlocale(LC_NUMERIC, old_locale);
510 current_time = floor(tmp);
512 (long) ((tmp - (double) current_time) * 1000000.0);
514 /* dont do any correction for old version RRDs */
516 current_time_usec = 0;
518 if (current_time < rrd.live_head->last_up ||
519 (current_time == rrd.live_head->last_up &&
520 (long) current_time_usec <=
521 (long) rrd.live_head->last_up_usec)) {
522 rrd_set_error("illegal attempt to update using time %ld when "
523 "last update time is %ld (minimum one second step)",
524 current_time, rrd.live_head->last_up);
529 /* seek to the beginning of the rra's */
530 if (rra_current != rra_begin) {
532 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
533 rrd_set_error("seek error in rrd");
538 rra_current = rra_begin;
540 rra_start = rra_begin;
542 /* when was the current pdp started */
543 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
544 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
546 /* when did the last pdp_st occur */
547 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
548 occu_pdp_st = current_time - occu_pdp_age;
550 /* interval = current_time - rrd.live_head->last_up; */
551 interval = (double) (current_time - rrd.live_head->last_up)
552 + (double) ((long) current_time_usec -
553 (long) rrd.live_head->last_up_usec) / 1000000.0;
555 if (occu_pdp_st > proc_pdp_st) {
556 /* OK we passed the pdp_st moment */
557 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
558 * occurred before the latest
560 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
561 post_int = occu_pdp_age; /* how much after it */
562 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
569 printf("proc_pdp_age %lu\t"
575 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
576 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
579 /* process the data sources and update the pdp_prep
580 * area accordingly */
581 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
584 dst_idx = dst_conv(rrd.ds_def[i].dst);
586 /* make sure we do not build diffs with old last_ds values */
587 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
588 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
589 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
592 /* NOTE: DST_CDEF should never enter this if block, because
593 * updvals[i+1][0] is initialized to 'U'; unless the caller
594 * accidently specified a value for the DST_CDEF. To handle
595 * this case, an extra check is required. */
597 if ((updvals[i + 1][0] != 'U') &&
598 (dst_idx != DST_CDEF) &&
599 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
603 /* the data source type defines how to process the data */
604 /* pdp_new contains rate * time ... eg the bytes
605 * transferred during the interval. Doing it this way saves
606 * a lot of math operations */
610 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611 if ((updvals[i + 1][ii] < '0'
612 || updvals[i + 1][ii] > '9') && (ii != 0
616 rrd_set_error("not a simple integer: '%s'",
621 if (rrd_test_error()) {
624 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
626 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
627 if (dst_idx == DST_COUNTER) {
628 /* simple overflow catcher suggested by Andres Kroonmaa */
629 /* this will fail terribly for non 32 or 64 bit counters ... */
630 /* are there any others in SNMP land ? */
631 if (pdp_new[i] < (double) 0.0)
632 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
633 if (pdp_new[i] < (double) 0.0)
634 pdp_new[i] += (double) 18446744069414584320.0;
637 rate = pdp_new[i] / interval;
643 old_locale = setlocale(LC_NUMERIC, "C");
645 pdp_new[i] = strtod(updvals[i + 1], &endptr);
646 setlocale(LC_NUMERIC, old_locale);
648 rrd_set_error("converting '%s' to float: %s",
649 updvals[i + 1], rrd_strerror(errno));
652 if (endptr[0] != '\0') {
654 ("conversion of '%s' to float not complete: tail '%s'",
655 updvals[i + 1], endptr);
658 rate = pdp_new[i] / interval;
662 old_locale = setlocale(LC_NUMERIC, "C");
663 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
664 setlocale(LC_NUMERIC, old_locale);
666 rrd_set_error("converting '%s' to float: %s",
667 updvals[i + 1], rrd_strerror(errno));
670 if (endptr[0] != '\0') {
672 ("conversion of '%s' to float not complete: tail '%s'",
673 updvals[i + 1], endptr);
676 rate = pdp_new[i] / interval;
679 rrd_set_error("rrd contains unknown DS type : '%s'",
683 /* break out of this for loop if the error string is set */
684 if (rrd_test_error()) {
687 /* make sure pdp_temp is neither too large or too small
688 * if any of these occur it becomes unknown ...
691 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
692 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
693 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
694 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
698 /* no news is news all the same */
703 /* make a copy of the command line argument for the next run */
710 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
712 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
713 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
715 /* break out of the argument parsing loop if the error_string is set */
716 if (rrd_test_error()) {
720 /* has a pdp_st moment occurred since the last run ? */
722 if (proc_pdp_st == occu_pdp_st) {
723 /* no we have not passed a pdp_st moment. therefore update is simple */
725 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
726 if (isnan(pdp_new[i])) {
727 /* this is not realy accurate if we use subsecond data arival time
728 should have thought of it when going subsecond resolution ...
729 sorry next format change we will have it! */
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
733 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
734 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
736 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
745 rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
750 /* an pdp_st has occurred. */
752 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
753 rate*seconds which occurred up to the last run.
754 pdp_new[] contains rate*seconds from the latest run.
755 pdp_temp[] will contain the rate for cdp */
757 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
758 /* update pdp_prep to the current pdp_st. */
759 double pre_unknown = 0.0;
761 if (isnan(pdp_new[i])) {
762 /* a final bit of unkonwn to be added bevore calculation
763 we use a temporary variable for this so that we
764 don't have to turn integer lines before using the value */
765 pre_unknown = pre_int;
767 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
768 rrd.pdp_prep[i].scratch[PDP_val].u_val =
769 pdp_new[i] / interval * pre_int;
771 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
772 pdp_new[i] / interval * pre_int;
777 /* if too much of the pdp_prep is unknown we dump it */
779 /* removed because this does not agree with the
780 definition that a heartbeat can be unknown */
781 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
782 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
783 /* if the interval is larger thatn mrhb we get NAN */
784 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
785 (occu_pdp_st - proc_pdp_st <=
786 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
789 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
790 / ((double) (occu_pdp_st - proc_pdp_st
793 scratch[PDP_unkn_sec_cnt].u_cnt)
797 /* process CDEF data sources; remember each CDEF DS can
798 * only reference other DS with a lower index number */
799 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
803 rpn_expand((rpn_cdefds_t *) &
804 (rrd.ds_def[i].par[DS_cdef]));
805 /* substitue data values for OP_VARIABLE nodes */
806 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
807 if (rpnp[ii].op == OP_VARIABLE) {
808 rpnp[ii].op = OP_NUMBER;
809 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
812 /* run the rpn calculator */
813 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
815 break; /* exits the data sources pdp_temp loop */
819 /* make pdp_prep ready for the next run */
820 if (isnan(pdp_new[i])) {
821 /* this is not realy accurate if we use subsecond data arival time
822 should have thought of it when going subsecond resolution ...
823 sorry next format change we will have it! */
824 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
826 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
828 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
829 rrd.pdp_prep[i].scratch[PDP_val].u_val =
830 pdp_new[i] / interval * post_int;
838 "new_unkn_sec %5lu\n",
840 rrd.pdp_prep[i].scratch[PDP_val].u_val,
841 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
845 /* if there were errors during the last loop, bail out here */
846 if (rrd_test_error()) {
851 /* compute the number of elapsed pdp_st moments */
853 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
855 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
857 if (rra_step_cnt == NULL) {
858 rra_step_cnt = (unsigned long *)
859 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
862 for (i = 0, rra_start = rra_begin;
863 i < rrd.stat_head->rra_cnt;
865 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
866 sizeof(rrd_value_t), i++) {
867 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
868 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
869 (proc_pdp_st / rrd.stat_head->pdp_step) %
870 rrd.rra_def[i].pdp_cnt;
871 if (start_pdp_offset <= elapsed_pdp_st) {
872 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
873 rrd.rra_def[i].pdp_cnt + 1;
878 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
879 /* If this is a bulk update, we need to skip ahead in
880 the seasonal arrays so that they will be correct for
881 the next observed value;
882 note that for the bulk update itself, no update will
883 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
884 and DEVPREDICT will be set to DNAN. */
885 if (rra_step_cnt[i] > 2) {
886 /* skip update by resetting rra_step_cnt[i],
887 note that this is not data source specific; this is
888 due to the bulk update, not a DNAN value for the
889 specific data source. */
891 lookup_seasonal(&rrd, i, rra_start, rrd_file,
892 elapsed_pdp_st, &last_seasonal_coef);
893 lookup_seasonal(&rrd, i, rra_start, rrd_file,
894 elapsed_pdp_st + 1, &seasonal_coef);
897 /* periodically run a smoother for seasonal effects */
898 /* Need to use first cdp parameter buffer to track
899 * burnin (burnin requires a specific smoothing schedule).
900 * The CDP_init_seasonal parameter is really an RRA level,
901 * not a data source within RRA level parameter, but the rra_def
902 * is read only for rrd_update (not flushed to disk). */
903 iii = i * (rrd.stat_head->ds_cnt);
904 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907 > rrd.rra_def[i].row_cnt - 1) {
908 /* mark off one of the burnin cycles */
909 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
914 /* someone has no doubt invented a trick to deal with this
915 * wrap around, but at least this code is clear. */
916 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
917 u_cnt > rrd.rra_ptr[i].cur_row) {
918 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
919 * mapping between PDP and CDP */
920 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
922 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
926 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927 rrd.rra_ptr[i].cur_row,
930 par[RRA_seasonal_smooth_idx].u_cnt);
935 /* can't rely on negative numbers because we are working with
937 /* Don't need modulus here. If we've wrapped more than once, only
938 * one smooth is executed at the end. */
939 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
940 rrd.rra_def[i].row_cnt
941 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
942 rrd.rra_def[i].row_cnt >=
943 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
947 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
948 rrd.rra_ptr[i].cur_row,
951 par[RRA_seasonal_smooth_idx].u_cnt);
958 rra_current = rrd_tell(rrd_file);
960 /* if cf is DEVSEASONAL or SEASONAL */
961 if (rrd_test_error())
964 /* update CDP_PREP areas */
965 /* loop over data soures within each RRA */
966 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
968 /* iii indexes the CDP prep area for this data source within the RRA */
969 iii = i * rrd.stat_head->ds_cnt + ii;
971 if (rrd.rra_def[i].pdp_cnt > 1) {
973 if (rra_step_cnt[i] > 0) {
974 /* If we are in this block, as least 1 CDP value will be written to
975 * disk, this is the CDP_primary_val entry. If more than 1 value needs
976 * to be written, then the "fill in" value is the CDP_secondary_val
978 if (isnan(pdp_temp[ii])) {
979 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
980 u_cnt += start_pdp_offset;
981 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
984 /* CDP_secondary value is the RRA "fill in" value for intermediary
985 * CDP data entries. No matter the CF, the value is the same because
986 * the average, max, min, and last of a list of identical values is
987 * the same, namely, the value itself. */
988 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
989 u_val = pdp_temp[ii];
992 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
994 rrd.rra_def[i].pdp_cnt *
995 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
996 rrd.cdp_prep[iii].scratch[CDP_primary_val].
998 /* initialize carry over */
999 if (current_cf == CF_AVERAGE) {
1000 if (isnan(pdp_temp[ii])) {
1001 rrd.cdp_prep[iii].scratch[CDP_val].
1004 rrd.cdp_prep[iii].scratch[CDP_val].
1009 rrd.rra_def[i].pdp_cnt);
1012 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1016 rrd_value_t cum_val, cur_val;
1018 switch (current_cf) {
1021 IFDNAN(rrd.cdp_prep[iii].
1022 scratch[CDP_val].u_val, 0.0);
1023 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1025 scratch[CDP_primary_val].u_val =
1027 cur_val * start_pdp_offset) /
1028 (rrd.rra_def[i].pdp_cnt -
1030 scratch[CDP_unkn_pdp_cnt].u_cnt);
1031 /* initialize carry over value */
1032 if (isnan(pdp_temp[ii])) {
1033 rrd.cdp_prep[iii].scratch[CDP_val].
1036 rrd.cdp_prep[iii].scratch[CDP_val].
1041 rrd.rra_def[i].pdp_cnt);
1046 IFDNAN(rrd.cdp_prep[iii].
1047 scratch[CDP_val].u_val, -DINF);
1048 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1051 (rrd.cdp_prep[iii].scratch[CDP_val].
1052 u_val) && isnan(pdp_temp[ii])) {
1054 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1059 if (cur_val > cum_val)
1061 scratch[CDP_primary_val].u_val =
1065 scratch[CDP_primary_val].u_val =
1067 /* initialize carry over value */
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1073 IFDNAN(rrd.cdp_prep[iii].
1074 scratch[CDP_val].u_val, DINF);
1075 cur_val = IFDNAN(pdp_temp[ii], DINF);
1078 (rrd.cdp_prep[iii].scratch[CDP_val].
1079 u_val) && isnan(pdp_temp[ii])) {
1081 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1086 if (cur_val < cum_val)
1088 scratch[CDP_primary_val].u_val =
1092 scratch[CDP_primary_val].u_val =
1094 /* initialize carry over value */
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1101 scratch[CDP_primary_val].u_val =
1103 /* initialize carry over value */
1104 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1108 } /* endif meets xff value requirement for a valid value */
1109 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1110 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1111 if (isnan(pdp_temp[ii]))
1112 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1116 rrd.rra_def[i].pdp_cnt;
1118 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1120 } else { /* rra_step_cnt[i] == 0 */
1124 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1126 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1130 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1132 rrd.cdp_prep[iii].scratch[CDP_val].
1136 if (isnan(pdp_temp[ii])) {
1137 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1138 u_cnt += elapsed_pdp_st;
1141 (rrd.cdp_prep[iii].scratch[CDP_val].
1143 if (current_cf == CF_AVERAGE) {
1144 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1145 pdp_temp[ii] * elapsed_pdp_st;
1147 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1152 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1154 rrd.cdp_prep[iii].scratch[CDP_val].
1158 switch (current_cf) {
1160 rrd.cdp_prep[iii].scratch[CDP_val].
1162 pdp_temp[ii] * elapsed_pdp_st;
1166 rrd.cdp_prep[iii].scratch[CDP_val].
1168 rrd.cdp_prep[iii].scratch[CDP_val].
1169 u_val = pdp_temp[ii];
1173 rrd.cdp_prep[iii].scratch[CDP_val].
1175 rrd.cdp_prep[iii].scratch[CDP_val].
1176 u_val = pdp_temp[ii];
1180 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1186 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1187 if (elapsed_pdp_st > 2) {
1188 switch (current_cf) {
1191 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1192 u_val = pdp_temp[ii];
1193 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1194 u_val = pdp_temp[ii];
1197 case CF_DEVSEASONAL:
1198 /* need to update cached seasonal values, so they are consistent
1199 * with the bulk update */
1200 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1201 * CDP_last_deviation are the same. */
1203 scratch[CDP_hw_last_seasonal].u_val =
1204 last_seasonal_coef[ii];
1205 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1206 u_val = seasonal_coef[ii];
1210 /* need to update the null_count and last_null_count.
1211 * even do this for non-DNAN pdp_temp because the
1212 * algorithm is not learning from batch updates. */
1213 rrd.cdp_prep[iii].scratch[CDP_null_count].
1214 u_cnt += elapsed_pdp_st;
1216 scratch[CDP_last_null_count].u_cnt +=
1220 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1226 /* do not count missed bulk values as failures */
1227 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1229 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1231 /* need to reset violations buffer.
1232 * could do this more carefully, but for now, just
1233 * assume a bulk update wipes away all violations. */
1234 erase_violations(&rrd, iii, i);
1238 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1240 if (rrd_test_error())
1243 } /* endif data sources loop */
1244 } /* end RRA Loop */
1246 /* this loop is only entered if elapsed_pdp_st < 3 */
1247 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1248 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1249 for (i = 0, rra_start = rra_begin;
1250 i < rrd.stat_head->rra_cnt;
1252 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1253 sizeof(rrd_value_t), i++) {
1254 if (rrd.rra_def[i].pdp_cnt > 1)
1257 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1258 if (current_cf == CF_SEASONAL
1259 || current_cf == CF_DEVSEASONAL) {
1260 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1261 elapsed_pdp_st + (scratch_idx ==
1265 rra_current = rrd_tell(rrd_file);
1267 if (rrd_test_error())
1269 /* loop over data soures within each RRA */
1270 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1271 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1272 i * (rrd.stat_head->ds_cnt) + ii,
1273 i, ii, scratch_idx, seasonal_coef);
1275 } /* end RRA Loop */
1276 if (rrd_test_error())
1278 } /* end elapsed_pdp_st loop */
1280 if (rrd_test_error())
1283 /* Ready to write to disk */
1284 /* Move sequentially through the file, writing one RRA at a time.
1285 * Note this architecture divorces the computation of CDP with
1286 * flushing updated RRA entries to disk. */
1287 for (i = 0, rra_start = rra_begin;
1288 i < rrd.stat_head->rra_cnt;
1290 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1291 sizeof(rrd_value_t), i++) {
1292 /* is th5Aere anything to write for this RRA? If not, continue. */
1293 if (rra_step_cnt[i] == 0)
1296 /* write the first row */
1298 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1300 rrd.rra_ptr[i].cur_row++;
1301 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1302 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1303 /* positition on the first row */
1304 rra_pos_tmp = rra_start +
1305 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1306 sizeof(rrd_value_t);
1307 if (rra_pos_tmp != rra_current) {
1308 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1309 rrd_set_error("seek error in rrd");
1312 rra_current = rra_pos_tmp;
1315 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1317 scratch_idx = CDP_primary_val;
1318 if (pcdp_summary != NULL) {
1319 rra_time = (current_time - current_time
1320 % (rrd.rra_def[i].pdp_cnt *
1321 rrd.stat_head->pdp_step))
1324 1) * rrd.rra_def[i].pdp_cnt *
1325 rrd.stat_head->pdp_step);
1328 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1329 scratch_idx, pcdp_summary, &rra_time);
1330 if (rrd_test_error())
1333 /* write other rows of the bulk update, if any */
1334 scratch_idx = CDP_secondary_val;
1335 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1336 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1339 "Wraparound for RRA %s, %lu updates left\n",
1340 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1343 rrd.rra_ptr[i].cur_row = 0;
1344 /* seek back to beginning of current rra */
1345 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1346 rrd_set_error("seek error in rrd");
1350 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1353 rra_current = rra_start;
1355 if (pcdp_summary != NULL) {
1356 rra_time = (current_time - current_time
1357 % (rrd.rra_def[i].pdp_cnt *
1358 rrd.stat_head->pdp_step))
1361 2) * rrd.rra_def[i].pdp_cnt *
1362 rrd.stat_head->pdp_step);
1365 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1366 scratch_idx, pcdp_summary, &rra_time);
1369 if (rrd_test_error())
1373 /* break out of the argument parsing loop if error_string is set */
1374 if (rrd_test_error()) {
1379 } /* endif a pdp_st has occurred */
1380 rrd.live_head->last_up = current_time;
1381 rrd.live_head->last_up_usec = current_time_usec;
1383 } /* function argument loop */
1385 if (seasonal_coef != NULL)
1386 free(seasonal_coef);
1387 if (last_seasonal_coef != NULL)
1388 free(last_seasonal_coef);
1389 if (rra_step_cnt != NULL)
1391 rpnstack_free(&rpnstack);
1394 //rrd_flush(rrd_file); //XXX: really needed?
1396 /* if we got here and if there is an error and if the file has not been
1397 * written to, then close things up and return. */
1398 if (rrd_test_error()) {
1399 goto err_free_pdp_new;
1402 /* aargh ... that was tough ... so many loops ... anyway, its done.
1403 * we just need to write back the live header portion now*/
1405 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1406 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1407 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1409 rrd_set_error("seek rrd for live header writeback");
1410 goto err_free_pdp_new;
1412 /* for mmap, we did already write to the underlying mapping, so we do
1413 not need to write again. */
1416 if (rrd_write(rrd_file, rrd.live_head,
1417 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1418 rrd_set_error("rrd_write live_head to rrd");
1419 goto err_free_pdp_new;
1422 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1423 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1424 rrd_set_error("rrd_write live_head to rrd");
1425 goto err_free_pdp_new;
1430 if (rrd_write(rrd_file, rrd.pdp_prep,
1431 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1432 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1433 rrd_set_error("rrd_write pdp_prep to rrd");
1434 goto err_free_pdp_new;
1437 if (rrd_write(rrd_file, rrd.cdp_prep,
1438 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1439 rrd.stat_head->ds_cnt)
1440 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1441 rrd.stat_head->ds_cnt)) {
1443 rrd_set_error("rrd_write cdp_prep to rrd");
1444 goto err_free_pdp_new;
1447 if (rrd_write(rrd_file, rrd.rra_ptr,
1448 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1449 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1450 rrd_set_error("rrd_write rra_ptr to rrd");
1451 goto err_free_pdp_new;
1455 /* rrd_flush(rrd_file); */
1457 /* calling the smoothing code here guarantees at most
1458 * one smoothing operation per rrd_update call. Unfortunately,
1459 * it is possible with bulk updates, or a long-delayed update
1460 * for smoothing to occur off-schedule. This really isn't
1461 * critical except during the burning cycles. */
1462 if (schedule_smooth) {
1464 rra_start = rra_begin;
1465 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1466 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1467 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1469 fprintf(stderr, "Running smoother for rra %ld\n", i);
1471 apply_smoother(&rrd, i, rra_start, rrd_file);
1472 if (rrd_test_error())
1475 rra_start += rrd.rra_def[i].row_cnt
1476 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1480 /* rrd_dontneed(rrd_file,&rrd); */
1482 rrd_close(rrd_file);
1499 rrd_close(rrd_file);
1507 * get exclusive lock to whole file.
1508 * lock gets removed when we close the file
1510 * returns 0 on success
1518 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1521 if (_fstat(in_file, &st) == 0) {
1522 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1529 lock.l_type = F_WRLCK; /* exclusive write lock */
1530 lock.l_len = 0; /* whole file */
1531 lock.l_start = 0; /* start of file */
1532 lock.l_whence = SEEK_SET; /* end of file */
1534 rcstat = fcntl(in_file, F_SETLK, &lock);