1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 #include <sys/timeb.h>
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
42 static int gettimeofday(
44 struct __timezone *tz)
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
59 * normilize time as returned by gettimeofday. usec part must
62 static void normalize_time(
67 t->tv_usec += 1000000L;
71 static info_t *write_RRA_row(
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
80 unsigned long ds_idx, cdp_idx;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
97 rrd->rra_def[rra_idx].
99 rrd->rra_def[rra_idx].
102 ds_nam), RD_I_VAL, iv);
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
108 rrd_set_error("writing rrd");
111 *rra_current += sizeof(rrd_value_t);
113 return (pcdp_summary);
117 const char *filename,
122 const char *filename,
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
136 info_t *result = NULL;
141 opterr = 0; /* initialize getopt */
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
148 int option_index = 0;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
190 opterr = 0; /* initialize getopt */
193 static struct option long_options[] = {
194 {"template", required_argument, 0, 't'},
197 int option_index = 0;
200 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
211 rrd_set_error("unknown option '%s'", argv[optind - 1]);
216 /* need at least 2 arguments: filename, data. */
217 if (argc - optind < 2) {
218 rrd_set_error("Not enough arguments");
223 rc = rrd_update_r(argv[optind], tmplt,
224 argc - optind - 1, (const char **) (argv + optind + 1));
229 const char *filename,
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
238 const char *filename,
242 info_t *pcdp_summary)
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
261 unsigned long proc_pdp_st; /* which pdp_st was the last
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
320 rrd_set_error("Not enough arguments");
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
329 /* initialize time */
330 version = atoi(rrd.stat_head->version);
331 gettimeofday(&tmp_time, 0);
332 normalize_time(&tmp_time);
333 current_time = tmp_time.tv_sec;
335 current_time_usec = tmp_time.tv_usec;
337 current_time_usec = 0;
340 rra_current = rra_start = rra_begin = rrd_file->header_len;
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and output may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input operation encounters
350 #if 0 //def HAVE_MMAP
351 rrd_filesize = rrd_file->file_size;
352 fseek(rrd_file->fd, 0, SEEK_END);
353 rrd_filesize = ftell(rrd_file->fd);
354 fseek(rrd_file->fd, rra_current, SEEK_SET);
356 // fseek(rrd_file->fd, 0, SEEK_CUR);
360 /* get exclusive lock to whole file.
361 * lock gets removed when we close the file.
363 if (LockRRD(rrd_file->fd) != 0) {
364 rrd_set_error("could not lock RRD");
371 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
372 rrd_set_error("allocating updvals pointer array");
378 if ((pdp_temp = malloc(sizeof(rrd_value_t)
379 * rrd.stat_head->ds_cnt)) == NULL) {
380 rrd_set_error("allocating pdp_temp ...");
387 if ((tmpl_idx = malloc(sizeof(unsigned long)
388 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
389 rrd_set_error("allocating tmpl_idx ...");
396 /* initialize tmplt redirector */
397 /* default config example (assume DS 1 is a CDEF DS)
398 tmpl_idx[0] -> 0; (time)
399 tmpl_idx[1] -> 1; (DS 0)
400 tmpl_idx[2] -> 3; (DS 2)
401 tmpl_idx[3] -> 4; (DS 3) */
402 tmpl_idx[0] = 0; /* time */
403 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
404 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
410 /* we should work on a writeable copy here */
412 unsigned int tmpl_len;
413 char *tmplt_copy = strdup(tmplt);
416 tmpl_cnt = 1; /* the first entry is the time */
417 tmpl_len = strlen(tmplt_copy);
418 for (i = 0; i <= tmpl_len; i++) {
419 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
420 tmplt_copy[i] = '\0';
421 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
423 ("tmplt contains more DS definitions than RRD");
431 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
432 rrd_set_error("unknown DS name '%s'", dsname);
441 /* the first element is always the time */
442 tmpl_idx[tmpl_cnt - 1]++;
443 /* go to the next entry on the tmplt_copy */
444 dsname = &tmplt_copy[i + 1];
445 /* fix the damage we did before */
455 if ((pdp_new = malloc(sizeof(rrd_value_t)
456 * rrd.stat_head->ds_cnt)) == NULL) {
457 rrd_set_error("allocating pdp_new ...");
465 #if 0 //def HAVE_MMAP
466 rrd_mmaped_file = mmap(0,
468 PROT_READ | PROT_WRITE,
469 MAP_SHARED, fileno(in_file), 0);
470 if (rrd_mmaped_file == MAP_FAILED) {
471 rrd_set_error("error mmapping file %s", filename);
480 /* when we use mmaping we tell the kernel the mmap equivalent
481 of POSIX_FADV_RANDOM */
482 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
485 /* loop through the arguments. */
486 for (arg_i = 0; arg_i < argc; arg_i++) {
487 char *stepper = strdup(argv[arg_i]);
488 char *step_start = stepper;
490 char *parsetime_error = NULL;
491 enum { atstyle, normal } timesyntax;
492 struct rrd_time_value ds_tv;
494 if (stepper == NULL) {
495 rrd_set_error("failed duplication argv entry");
507 /* initialize all ds input to unknown except the first one
508 which has always got to be set */
509 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
511 updvals[0] = stepper;
512 /* separate all ds elements; first must be examined separately
513 due to alternate time syntax */
514 if ((p = strchr(stepper, '@')) != NULL) {
515 timesyntax = atstyle;
518 } else if ((p = strchr(stepper, ':')) != NULL) {
524 ("expected timestamp not found in data source from %s",
530 updvals[tmpl_idx[ii]] = stepper;
532 if (*stepper == ':') {
536 updvals[tmpl_idx[ii]] = stepper + 1;
542 if (ii != tmpl_cnt - 1) {
544 ("expected %lu data source readings (got %lu) from %s",
545 tmpl_cnt - 1, ii, argv[arg_i]);
550 /* get the time from the reading ... handle N */
551 if (timesyntax == atstyle) {
552 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
553 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
557 if (ds_tv.type == RELATIVE_TO_END_TIME ||
558 ds_tv.type == RELATIVE_TO_START_TIME) {
559 rrd_set_error("specifying time relative to the 'start' "
560 "or 'end' makes no sense here: %s", updvals[0]);
565 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
567 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
569 } else if (strcmp(updvals[0], "N") == 0) {
570 gettimeofday(&tmp_time, 0);
571 normalize_time(&tmp_time);
572 current_time = tmp_time.tv_sec;
573 current_time_usec = tmp_time.tv_usec;
577 tmp = strtod(updvals[0], 0);
578 current_time = floor(tmp);
580 (long) ((tmp - (double) current_time) * 1000000.0);
582 /* dont do any correction for old version RRDs */
584 current_time_usec = 0;
586 if (current_time < rrd.live_head->last_up ||
587 (current_time == rrd.live_head->last_up &&
588 (long) current_time_usec <=
589 (long) rrd.live_head->last_up_usec)) {
590 rrd_set_error("illegal attempt to update using time %ld when "
591 "last update time is %ld (minimum one second step)",
592 current_time, rrd.live_head->last_up);
598 /* seek to the beginning of the rra's */
599 if (rra_current != rra_begin) {
601 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
602 rrd_set_error("seek error in rrd");
607 rra_current = rra_begin;
609 rra_start = rra_begin;
611 /* when was the current pdp started */
612 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
613 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
615 /* when did the last pdp_st occur */
616 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
617 occu_pdp_st = current_time - occu_pdp_age;
619 /* interval = current_time - rrd.live_head->last_up; */
620 interval = (double) (current_time - rrd.live_head->last_up)
621 + (double) ((long) current_time_usec -
622 (long) rrd.live_head->last_up_usec) / 1000000.0;
624 if (occu_pdp_st > proc_pdp_st) {
625 /* OK we passed the pdp_st moment */
626 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
627 * occurred before the latest
629 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
630 post_int = occu_pdp_age; /* how much after it */
631 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
638 printf("proc_pdp_age %lu\t"
644 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
645 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
648 /* process the data sources and update the pdp_prep
649 * area accordingly */
650 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
653 dst_idx = dst_conv(rrd.ds_def[i].dst);
655 /* make sure we do not build diffs with old last_ds values */
656 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
657 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
658 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
661 /* NOTE: DST_CDEF should never enter this if block, because
662 * updvals[i+1][0] is initialized to 'U'; unless the caller
663 * accidently specified a value for the DST_CDEF. To handle
664 * this case, an extra check is required. */
666 if ((updvals[i + 1][0] != 'U') &&
667 (dst_idx != DST_CDEF) &&
668 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
671 /* the data source type defines how to process the data */
672 /* pdp_new contains rate * time ... eg the bytes
673 * transferred during the interval. Doing it this way saves
674 * a lot of math operations */
680 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
681 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
682 if ((updvals[i + 1][ii] < '0'
683 || updvals[i + 1][ii] > '9') && (ii != 0
689 rrd_set_error("not a simple integer: '%s'",
694 if (rrd_test_error()) {
698 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
699 if (dst_idx == DST_COUNTER) {
700 /* simple overflow catcher suggested by Andres Kroonmaa */
701 /* this will fail terribly for non 32 or 64 bit counters ... */
702 /* are there any others in SNMP land ? */
703 if (pdp_new[i] < (double) 0.0)
704 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
705 if (pdp_new[i] < (double) 0.0)
706 pdp_new[i] += (double) 18446744069414584320.0;
709 rate = pdp_new[i] / interval;
716 pdp_new[i] = strtod(updvals[i + 1], &endptr);
718 rrd_set_error("converting '%s' to float: %s",
719 updvals[i + 1], rrd_strerror(errno));
722 if (endptr[0] != '\0') {
724 ("conversion of '%s' to float not complete: tail '%s'",
725 updvals[i + 1], endptr);
728 rate = pdp_new[i] / interval;
732 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
734 rrd_set_error("converting '%s' to float: %s",
735 updvals[i + 1], rrd_strerror(errno));
738 if (endptr[0] != '\0') {
740 ("conversion of '%s' to float not complete: tail '%s'",
741 updvals[i + 1], endptr);
744 rate = pdp_new[i] / interval;
747 rrd_set_error("rrd contains unknown DS type : '%s'",
751 /* break out of this for loop if the error string is set */
752 if (rrd_test_error()) {
755 /* make sure pdp_temp is neither too large or too small
756 * if any of these occur it becomes unknown ...
759 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
760 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
761 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
762 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
766 /* no news is news all the same */
771 /* make a copy of the command line argument for the next run */
778 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
780 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
781 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
783 /* break out of the argument parsing loop if the error_string is set */
784 if (rrd_test_error()) {
788 /* has a pdp_st moment occurred since the last run ? */
790 if (proc_pdp_st == occu_pdp_st) {
791 /* no we have not passed a pdp_st moment. therefore update is simple */
793 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
794 if (isnan(pdp_new[i])) {
795 /* this is not realy accurate if we use subsecond data arival time
796 should have thought of it when going subsecond resolution ...
797 sorry next format change we will have it! */
798 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
801 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
802 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
804 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
813 rrd.pdp_prep[i].scratch[PDP_val].u_val,
814 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
818 /* an pdp_st has occurred. */
820 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
821 * occurred up to the last run.
822 pdp_new[] contains rate*seconds from the latest run.
823 pdp_temp[] will contain the rate for cdp */
825 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
826 /* update pdp_prep to the current pdp_st. */
827 double pre_unknown = 0.0;
829 if (isnan(pdp_new[i]))
830 /* a final bit of unkonwn to be added bevore calculation
831 * we use a tempaorary variable for this so that we
832 * don't have to turn integer lines before using the value */
833 pre_unknown = pre_int;
835 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
836 rrd.pdp_prep[i].scratch[PDP_val].u_val =
837 pdp_new[i] / interval * pre_int;
839 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
840 pdp_new[i] / interval * pre_int;
845 /* if too much of the pdp_prep is unknown we dump it */
847 /* removed because this does not agree with the definition
848 a heart beat can be unknown */
849 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
850 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
851 /* if the interval is larger thatn mrhb we get NAN */
852 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
853 (occu_pdp_st - proc_pdp_st <=
854 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
857 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
858 / ((double) (occu_pdp_st - proc_pdp_st
861 scratch[PDP_unkn_sec_cnt].u_cnt)
865 /* process CDEF data sources; remember each CDEF DS can
866 * only reference other DS with a lower index number */
867 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
871 rpn_expand((rpn_cdefds_t *) &
872 (rrd.ds_def[i].par[DS_cdef]));
873 /* substitue data values for OP_VARIABLE nodes */
874 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
875 if (rpnp[ii].op == OP_VARIABLE) {
876 rpnp[ii].op = OP_NUMBER;
877 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
880 /* run the rpn calculator */
881 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
883 break; /* exits the data sources pdp_temp loop */
887 /* make pdp_prep ready for the next run */
888 if (isnan(pdp_new[i])) {
889 /* this is not realy accurate if we use subsecond data arival time
890 should have thought of it when going subsecond resolution ...
891 sorry next format change we will have it! */
892 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
894 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
896 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
897 rrd.pdp_prep[i].scratch[PDP_val].u_val =
898 pdp_new[i] / interval * post_int;
906 "new_unkn_sec %5lu\n",
908 rrd.pdp_prep[i].scratch[PDP_val].u_val,
909 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
913 /* if there were errors during the last loop, bail out here */
914 if (rrd_test_error()) {
919 /* compute the number of elapsed pdp_st moments */
921 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
923 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
925 if (rra_step_cnt == NULL) {
926 rra_step_cnt = (unsigned long *)
927 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
930 for (i = 0, rra_start = rra_begin;
931 i < rrd.stat_head->rra_cnt;
933 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
934 sizeof(rrd_value_t), i++) {
935 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
937 (proc_pdp_st / rrd.stat_head->pdp_step) %
938 rrd.rra_def[i].pdp_cnt;
939 if (start_pdp_offset <= elapsed_pdp_st) {
940 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
941 rrd.rra_def[i].pdp_cnt + 1;
946 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
947 /* If this is a bulk update, we need to skip ahead in the seasonal
948 * arrays so that they will be correct for the next observed value;
949 * note that for the bulk update itself, no update will occur to
950 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
952 if (rra_step_cnt[i] > 2) {
953 /* skip update by resetting rra_step_cnt[i],
954 * note that this is not data source specific; this is due
955 * to the bulk update, not a DNAN value for the specific data
958 lookup_seasonal(&rrd, i, rra_start, rrd_file,
959 elapsed_pdp_st, &last_seasonal_coef);
960 lookup_seasonal(&rrd, i, rra_start, rrd_file,
961 elapsed_pdp_st + 1, &seasonal_coef);
964 /* periodically run a smoother for seasonal effects */
965 /* Need to use first cdp parameter buffer to track
966 * burnin (burnin requires a specific smoothing schedule).
967 * The CDP_init_seasonal parameter is really an RRA level,
968 * not a data source within RRA level parameter, but the rra_def
969 * is read only for rrd_update (not flushed to disk). */
970 iii = i * (rrd.stat_head->ds_cnt);
971 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
973 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
974 > rrd.rra_def[i].row_cnt - 1) {
975 /* mark off one of the burnin cycles */
976 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
981 /* someone has no doubt invented a trick to deal with this
982 * wrap around, but at least this code is clear. */
983 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
984 u_cnt > rrd.rra_ptr[i].cur_row) {
985 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
986 * mapping between PDP and CDP */
987 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
989 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
993 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
994 rrd.rra_ptr[i].cur_row,
997 par[RRA_seasonal_smooth_idx].u_cnt);
1002 /* can't rely on negative numbers because we are working with
1003 * unsigned values */
1004 /* Don't need modulus here. If we've wrapped more than once, only
1005 * one smooth is executed at the end. */
1006 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
1007 rrd.rra_def[i].row_cnt
1008 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
1009 rrd.rra_def[i].row_cnt >=
1010 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
1014 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1015 rrd.rra_ptr[i].cur_row,
1018 par[RRA_seasonal_smooth_idx].u_cnt);
1020 schedule_smooth = 1;
1025 rra_current = rrd_tell(rrd_file);
1027 /* if cf is DEVSEASONAL or SEASONAL */
1028 if (rrd_test_error())
1031 /* update CDP_PREP areas */
1032 /* loop over data soures within each RRA */
1033 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1035 /* iii indexes the CDP prep area for this data source within the RRA */
1036 iii = i * rrd.stat_head->ds_cnt + ii;
1038 if (rrd.rra_def[i].pdp_cnt > 1) {
1040 if (rra_step_cnt[i] > 0) {
1041 /* If we are in this block, as least 1 CDP value will be written to
1042 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1043 * to be written, then the "fill in" value is the CDP_secondary_val
1045 if (isnan(pdp_temp[ii])) {
1046 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1047 u_cnt += start_pdp_offset;
1048 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1051 /* CDP_secondary value is the RRA "fill in" value for intermediary
1052 * CDP data entries. No matter the CF, the value is the same because
1053 * the average, max, min, and last of a list of identical values is
1054 * the same, namely, the value itself. */
1055 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1056 u_val = pdp_temp[ii];
1059 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1061 rrd.rra_def[i].pdp_cnt *
1062 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1063 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1065 /* initialize carry over */
1066 if (current_cf == CF_AVERAGE) {
1067 if (isnan(pdp_temp[ii])) {
1068 rrd.cdp_prep[iii].scratch[CDP_val].
1071 rrd.cdp_prep[iii].scratch[CDP_val].
1076 rrd.rra_def[i].pdp_cnt);
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1083 rrd_value_t cum_val, cur_val;
1085 switch (current_cf) {
1088 IFDNAN(rrd.cdp_prep[iii].
1089 scratch[CDP_val].u_val, 0.0);
1090 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1092 scratch[CDP_primary_val].u_val =
1094 cur_val * start_pdp_offset) /
1095 (rrd.rra_def[i].pdp_cnt -
1097 scratch[CDP_unkn_pdp_cnt].u_cnt);
1098 /* initialize carry over value */
1099 if (isnan(pdp_temp[ii])) {
1100 rrd.cdp_prep[iii].scratch[CDP_val].
1103 rrd.cdp_prep[iii].scratch[CDP_val].
1108 rrd.rra_def[i].pdp_cnt);
1113 IFDNAN(rrd.cdp_prep[iii].
1114 scratch[CDP_val].u_val, -DINF);
1115 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1118 (rrd.cdp_prep[iii].scratch[CDP_val].
1119 u_val) && isnan(pdp_temp[ii])) {
1121 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1126 if (cur_val > cum_val)
1128 scratch[CDP_primary_val].u_val =
1132 scratch[CDP_primary_val].u_val =
1134 /* initialize carry over value */
1135 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1140 IFDNAN(rrd.cdp_prep[iii].
1141 scratch[CDP_val].u_val, DINF);
1142 cur_val = IFDNAN(pdp_temp[ii], DINF);
1145 (rrd.cdp_prep[iii].scratch[CDP_val].
1146 u_val) && isnan(pdp_temp[ii])) {
1148 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1153 if (cur_val < cum_val)
1155 scratch[CDP_primary_val].u_val =
1159 scratch[CDP_primary_val].u_val =
1161 /* initialize carry over value */
1162 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1168 scratch[CDP_primary_val].u_val =
1170 /* initialize carry over value */
1171 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1175 } /* endif meets xff value requirement for a valid value */
1176 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1177 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1178 if (isnan(pdp_temp[ii]))
1179 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1183 rrd.rra_def[i].pdp_cnt;
1185 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1187 } else { /* rra_step_cnt[i] == 0 */
1191 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1193 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1197 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1199 rrd.cdp_prep[iii].scratch[CDP_val].
1203 if (isnan(pdp_temp[ii])) {
1204 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1205 u_cnt += elapsed_pdp_st;
1208 (rrd.cdp_prep[iii].scratch[CDP_val].
1210 if (current_cf == CF_AVERAGE) {
1211 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1212 pdp_temp[ii] * elapsed_pdp_st;
1214 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1219 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1221 rrd.cdp_prep[iii].scratch[CDP_val].
1225 switch (current_cf) {
1227 rrd.cdp_prep[iii].scratch[CDP_val].
1229 pdp_temp[ii] * elapsed_pdp_st;
1233 rrd.cdp_prep[iii].scratch[CDP_val].
1235 rrd.cdp_prep[iii].scratch[CDP_val].
1236 u_val = pdp_temp[ii];
1240 rrd.cdp_prep[iii].scratch[CDP_val].
1242 rrd.cdp_prep[iii].scratch[CDP_val].
1243 u_val = pdp_temp[ii];
1247 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1253 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1254 if (elapsed_pdp_st > 2) {
1255 switch (current_cf) {
1258 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1259 u_val = pdp_temp[ii];
1260 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1261 u_val = pdp_temp[ii];
1264 case CF_DEVSEASONAL:
1265 /* need to update cached seasonal values, so they are consistent
1266 * with the bulk update */
1267 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1268 * CDP_last_deviation are the same. */
1270 scratch[CDP_hw_last_seasonal].u_val =
1271 last_seasonal_coef[ii];
1272 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1273 u_val = seasonal_coef[ii];
1276 /* need to update the null_count and last_null_count.
1277 * even do this for non-DNAN pdp_temp because the
1278 * algorithm is not learning from batch updates. */
1279 rrd.cdp_prep[iii].scratch[CDP_null_count].
1280 u_cnt += elapsed_pdp_st;
1282 scratch[CDP_last_null_count].u_cnt +=
1286 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1288 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1292 /* do not count missed bulk values as failures */
1293 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1295 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1297 /* need to reset violations buffer.
1298 * could do this more carefully, but for now, just
1299 * assume a bulk update wipes away all violations. */
1300 erase_violations(&rrd, iii, i);
1304 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1306 if (rrd_test_error())
1309 } /* endif data sources loop */
1310 } /* end RRA Loop */
1312 /* this loop is only entered if elapsed_pdp_st < 3 */
1313 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1314 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1315 for (i = 0, rra_start = rra_begin;
1316 i < rrd.stat_head->rra_cnt;
1318 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1319 sizeof(rrd_value_t), i++) {
1320 if (rrd.rra_def[i].pdp_cnt > 1)
1323 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1324 if (current_cf == CF_SEASONAL
1325 || current_cf == CF_DEVSEASONAL) {
1326 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1327 elapsed_pdp_st + (scratch_idx ==
1331 rra_current = rrd_tell(rrd_file);
1333 if (rrd_test_error())
1335 /* loop over data soures within each RRA */
1336 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1337 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1338 i * (rrd.stat_head->ds_cnt) + ii,
1339 i, ii, scratch_idx, seasonal_coef);
1341 } /* end RRA Loop */
1342 if (rrd_test_error())
1344 } /* end elapsed_pdp_st loop */
1346 if (rrd_test_error())
1349 /* Ready to write to disk */
1350 /* Move sequentially through the file, writing one RRA at a time.
1351 * Note this architecture divorces the computation of CDP with
1352 * flushing updated RRA entries to disk. */
1353 for (i = 0, rra_start = rra_begin;
1354 i < rrd.stat_head->rra_cnt;
1356 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1357 sizeof(rrd_value_t), i++) {
1358 /* is th5Aere anything to write for this RRA? If not, continue. */
1359 if (rra_step_cnt[i] == 0)
1362 /* write the first row */
1364 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1366 rrd.rra_ptr[i].cur_row++;
1367 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1368 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1369 /* positition on the first row */
1370 rra_pos_tmp = rra_start +
1371 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1372 sizeof(rrd_value_t);
1373 if (rra_pos_tmp != rra_current) {
1375 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1376 rrd_set_error("seek error in rrd");
1380 rra_current = rra_pos_tmp;
1383 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1385 scratch_idx = CDP_primary_val;
1386 if (pcdp_summary != NULL) {
1387 rra_time = (current_time - current_time
1388 % (rrd.rra_def[i].pdp_cnt *
1389 rrd.stat_head->pdp_step))
1392 1) * rrd.rra_def[i].pdp_cnt *
1393 rrd.stat_head->pdp_step);
1396 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1397 scratch_idx, pcdp_summary, &rra_time);
1398 if (rrd_test_error())
1401 /* write other rows of the bulk update, if any */
1402 scratch_idx = CDP_secondary_val;
1403 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1404 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1407 "Wraparound for RRA %s, %lu updates left\n",
1408 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1411 rrd.rra_ptr[i].cur_row = 0;
1412 /* seek back to beginning of current rra */
1413 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1414 rrd_set_error("seek error in rrd");
1418 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1421 rra_current = rra_start;
1423 if (pcdp_summary != NULL) {
1424 rra_time = (current_time - current_time
1425 % (rrd.rra_def[i].pdp_cnt *
1426 rrd.stat_head->pdp_step))
1429 2) * rrd.rra_def[i].pdp_cnt *
1430 rrd.stat_head->pdp_step);
1433 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1434 scratch_idx, pcdp_summary, &rra_time);
1437 if (rrd_test_error())
1441 /* break out of the argument parsing loop if error_string is set */
1442 if (rrd_test_error()) {
1447 } /* endif a pdp_st has occurred */
1448 rrd.live_head->last_up = current_time;
1449 rrd.live_head->last_up_usec = current_time_usec;
1451 } /* function argument loop */
1453 if (seasonal_coef != NULL)
1454 free(seasonal_coef);
1455 if (last_seasonal_coef != NULL)
1456 free(last_seasonal_coef);
1457 if (rra_step_cnt != NULL)
1459 rpnstack_free(&rpnstack);
1462 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1463 rrd_set_error("error writing(unmapping) file: %s", filename);
1466 /* if we got here and if there is an error and if the file has not been
1467 * written to, then close things up and return. */
1468 if (rrd_test_error()) {
1474 close(rrd_file->fd);
1478 /* aargh ... that was tough ... so many loops ... anyway, its done.
1479 * we just need to write back the live header portion now*/
1481 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1482 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1483 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1485 rrd_set_error("seek rrd for live header writeback");
1491 close(rrd_file->fd);
1496 if (rrd_write(rrd_file, rrd.live_head,
1497 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1498 rrd_set_error("rrd_write live_head to rrd");
1504 close(rrd_file->fd);
1508 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1509 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1510 rrd_set_error("rrd_write live_head to rrd");
1516 close(rrd_file->fd);
1522 if (rrd_write(rrd_file, rrd.pdp_prep,
1523 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1524 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1525 rrd_set_error("rrd_write pdp_prep to rrd");
1531 close(rrd_file->fd);
1535 if (rrd_write(rrd_file, rrd.cdp_prep,
1536 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1537 rrd.stat_head->ds_cnt)
1538 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539 rrd.stat_head->ds_cnt)) {
1541 rrd_set_error("rrd_write cdp_prep to rrd");
1547 close(rrd_file->fd);
1551 if (rrd_write(rrd_file, rrd.rra_ptr,
1552 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1553 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1554 rrd_set_error("rrd_write rra_ptr to rrd");
1560 close(rrd_file->fd);
1563 #ifdef HAVE_POSIX_FADVISExxx
1565 /* with update we have write ops, so they will probably not be done by now, this means
1566 the buffers will not get freed. But calling this for the whole file - header
1567 will let the data off the hook as soon as it is written when if it is from a previous
1568 update cycle. Calling fdsync to force things is much too hard here. */
1570 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1571 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1572 rrd_strerror(errno));
1573 close(rrd_file->fd);
1577 /*XXX: ? */ rrd_flush(rrd_file);
1579 /* calling the smoothing code here guarantees at most
1580 * one smoothing operation per rrd_update call. Unfortunately,
1581 * it is possible with bulk updates, or a long-delayed update
1582 * for smoothing to occur off-schedule. This really isn't
1583 * critical except during the burning cycles. */
1584 if (schedule_smooth) {
1585 // in_file = fopen(filename,"rb+");
1588 rra_start = rra_begin;
1589 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1590 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1591 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1593 fprintf(stderr, "Running smoother for rra %ld\n", i);
1595 apply_smoother(&rrd, i, rra_start, rrd_file);
1596 if (rrd_test_error())
1599 rra_start += rrd.rra_def[i].row_cnt
1600 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1602 #ifdef HAVE_POSIX_FADVISExxx
1603 /* same procedure as above ... */
1605 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1606 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1607 rrd_strerror(errno));
1608 close(rrd_file->fd);
1612 close(rrd_file->fd);
1615 /* OK now close the files and free the memory */
1616 if (close(rrd_file->fd) != 0) {
1617 rrd_set_error("closing rrd");
1635 * get exclusive lock to whole file.
1636 * lock gets removed when we close the file
1638 * returns 0 on success
1646 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1649 if (_fstat(in_file, &st) == 0) {
1650 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1657 lock.l_type = F_WRLCK; /* exclusive write lock */
1658 lock.l_len = 0; /* whole file */
1659 lock.l_start = 0; /* start of file */
1660 lock.l_whence = SEEK_SET; /* end of file */
1662 rcstat = fcntl(in_file, F_SETLK, &lock);