1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 #include <sys/timeb.h>
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
42 static int gettimeofday(
44 struct __timezone *tz)
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
59 * normalize time as returned by gettimeofday. usec part must
62 static inline void normalize_time(
67 t->tv_usec += 1000000L;
71 static inline info_t *write_RRA_row(
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
80 unsigned long ds_idx, cdp_idx;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
97 rrd->rra_def[rra_idx].
99 rrd->rra_def[rra_idx].
102 ds_nam), RD_I_VAL, iv);
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 *rra_current += sizeof(rrd_value_t);
113 return (pcdp_summary);
117 const char *filename,
122 const char *filename,
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
136 info_t *result = NULL;
141 opterr = 0; /* initialize getopt */
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
148 int option_index = 0;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
190 opterr = 0; /* initialize getopt */
193 static struct option long_options[] = {
194 {"template", required_argument, 0, 't'},
197 int option_index = 0;
200 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
211 rrd_set_error("unknown option '%s'", argv[optind - 1]);
216 /* need at least 2 arguments: filename, data. */
217 if (argc - optind < 2) {
218 rrd_set_error("Not enough arguments");
223 rc = rrd_update_r(argv[optind], tmplt,
224 argc - optind - 1, (const char **) (argv + optind + 1));
229 const char *filename,
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
238 const char *filename,
242 info_t *pcdp_summary)
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
261 unsigned long proc_pdp_st; /* which pdp_st was the last
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
320 rrd_set_error("Not enough arguments");
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
329 /* initialize time */
330 version = atoi(rrd.stat_head->version);
331 gettimeofday(&tmp_time, 0);
332 normalize_time(&tmp_time);
333 current_time = tmp_time.tv_sec;
335 current_time_usec = tmp_time.tv_usec;
337 current_time_usec = 0;
340 rra_current = rra_start = rra_begin = rrd_file->header_len;
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and output may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input operation encounters
350 #if 0 //def HAVE_MMAP
351 rrd_filesize = rrd_file->file_size;
352 fseek(rrd_file->fd, 0, SEEK_END);
353 rrd_filesize = ftell(rrd_file->fd);
354 fseek(rrd_file->fd, rra_current, SEEK_SET);
356 // fseek(rrd_file->fd, 0, SEEK_CUR);
360 /* get exclusive lock to whole file.
361 * lock gets removed when we close the file.
363 if (LockRRD(rrd_file->fd) != 0) {
364 rrd_set_error("could not lock RRD");
369 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
370 rrd_set_error("allocating updvals pointer array");
374 if ((pdp_temp = malloc(sizeof(rrd_value_t)
375 * rrd.stat_head->ds_cnt)) == NULL) {
376 rrd_set_error("allocating pdp_temp ...");
377 goto err_free_updvals;
380 if ((tmpl_idx = malloc(sizeof(unsigned long)
381 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
382 rrd_set_error("allocating tmpl_idx ...");
383 goto err_free_pdp_temp;
385 /* initialize tmplt redirector */
386 /* default config example (assume DS 1 is a CDEF DS)
387 tmpl_idx[0] -> 0; (time)
388 tmpl_idx[1] -> 1; (DS 0)
389 tmpl_idx[2] -> 3; (DS 2)
390 tmpl_idx[3] -> 4; (DS 3) */
391 tmpl_idx[0] = 0; /* time */
392 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
393 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
399 /* we should work on a writeable copy here */
401 unsigned int tmpl_len;
402 char *tmplt_copy = strdup(tmplt);
405 tmpl_cnt = 1; /* the first entry is the time */
406 tmpl_len = strlen(tmplt_copy);
407 for (i = 0; i <= tmpl_len; i++) {
408 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
409 tmplt_copy[i] = '\0';
410 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
412 ("tmplt contains more DS definitions than RRD");
413 goto err_free_tmpl_idx;
415 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
416 rrd_set_error("unknown DS name '%s'", dsname);
417 goto err_free_tmpl_idx;
419 /* the first element is always the time */
420 tmpl_idx[tmpl_cnt - 1]++;
421 /* go to the next entry on the tmplt_copy */
422 dsname = &tmplt_copy[i + 1];
423 /* fix the damage we did before */
433 if ((pdp_new = malloc(sizeof(rrd_value_t)
434 * rrd.stat_head->ds_cnt)) == NULL) {
435 rrd_set_error("allocating pdp_new ...");
436 goto err_free_tmpl_idx;
438 #if 0 //def HAVE_MMAP
439 rrd_mmaped_file = mmap(0,
441 PROT_READ | PROT_WRITE,
442 MAP_SHARED, fileno(in_file), 0);
443 if (rrd_mmaped_file == MAP_FAILED) {
444 rrd_set_error("error mmapping file %s", filename);
453 /* when we use mmaping we tell the kernel the mmap equivalent
454 of POSIX_FADV_RANDOM */
455 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
458 /* loop through the arguments. */
459 for (arg_i = 0; arg_i < argc; arg_i++) {
460 char *stepper = strdup(argv[arg_i]);
461 char *step_start = stepper;
463 char *parsetime_error = NULL;
464 enum { atstyle, normal } timesyntax;
465 struct rrd_time_value ds_tv;
467 if (stepper == NULL) {
468 rrd_set_error("failed duplication argv entry");
470 goto err_free_pdp_new;
472 /* initialize all ds input to unknown except the first one
473 which has always got to be set */
474 memset(updvals + 1, 'U', rrd.stat_head->ds_cnt);
475 updvals[0] = stepper;
476 /* separate all ds elements; first must be examined separately
477 due to alternate time syntax */
478 if ((p = strchr(stepper, '@')) != NULL) {
479 timesyntax = atstyle;
482 } else if ((p = strchr(stepper, ':')) != NULL) {
488 ("expected timestamp not found in data source from %s",
494 updvals[tmpl_idx[ii]] = stepper;
496 if (*stepper == ':') {
500 updvals[tmpl_idx[ii]] = stepper + 1;
506 if (ii != tmpl_cnt - 1) {
508 ("expected %lu data source readings (got %lu) from %s",
509 tmpl_cnt - 1, ii, argv[arg_i]);
514 /* get the time from the reading ... handle N */
515 if (timesyntax == atstyle) {
516 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
517 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
521 if (ds_tv.type == RELATIVE_TO_END_TIME ||
522 ds_tv.type == RELATIVE_TO_START_TIME) {
523 rrd_set_error("specifying time relative to the 'start' "
524 "or 'end' makes no sense here: %s", updvals[0]);
529 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
531 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
533 } else if (strcmp(updvals[0], "N") == 0) {
534 gettimeofday(&tmp_time, 0);
535 normalize_time(&tmp_time);
536 current_time = tmp_time.tv_sec;
537 current_time_usec = tmp_time.tv_usec;
541 tmp = strtod(updvals[0], 0);
542 current_time = floor(tmp);
544 (long) ((tmp - (double) current_time) * 1000000.0);
546 /* dont do any correction for old version RRDs */
548 current_time_usec = 0;
550 if (current_time < rrd.live_head->last_up ||
551 (current_time == rrd.live_head->last_up &&
552 (long) current_time_usec <=
553 (long) rrd.live_head->last_up_usec)) {
554 rrd_set_error("illegal attempt to update using time %ld when "
555 "last update time is %ld (minimum one second step)",
556 current_time, rrd.live_head->last_up);
562 /* seek to the beginning of the rra's */
563 if (rra_current != rra_begin) {
565 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
566 rrd_set_error("seek error in rrd");
571 rra_current = rra_begin;
573 rra_start = rra_begin;
575 /* when was the current pdp started */
576 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
577 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
579 /* when did the last pdp_st occur */
580 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
581 occu_pdp_st = current_time - occu_pdp_age;
583 /* interval = current_time - rrd.live_head->last_up; */
584 interval = (double) (current_time - rrd.live_head->last_up)
585 + (double) ((long) current_time_usec -
586 (long) rrd.live_head->last_up_usec) / 1000000.0;
588 if (occu_pdp_st > proc_pdp_st) {
589 /* OK we passed the pdp_st moment */
590 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
591 * occurred before the latest
593 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
594 post_int = occu_pdp_age; /* how much after it */
595 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
602 printf("proc_pdp_age %lu\t"
608 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
609 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
612 /* process the data sources and update the pdp_prep
613 * area accordingly */
614 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
617 dst_idx = dst_conv(rrd.ds_def[i].dst);
619 /* make sure we do not build diffs with old last_ds values */
620 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
621 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
622 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
625 /* NOTE: DST_CDEF should never enter this if block, because
626 * updvals[i+1][0] is initialized to 'U'; unless the caller
627 * accidently specified a value for the DST_CDEF. To handle
628 * this case, an extra check is required. */
630 if ((updvals[i + 1][0] != 'U') &&
631 (dst_idx != DST_CDEF) &&
632 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
635 /* the data source type defines how to process the data */
636 /* pdp_new contains rate * time ... eg the bytes
637 * transferred during the interval. Doing it this way saves
638 * a lot of math operations */
642 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
643 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
644 if ((updvals[i + 1][ii] < '0'
645 || updvals[i + 1][ii] > '9') && (ii != 0
651 rrd_set_error("not a simple integer: '%s'",
656 if (rrd_test_error()) {
660 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
661 if (dst_idx == DST_COUNTER) {
662 /* simple overflow catcher suggested by Andres Kroonmaa */
663 /* this will fail terribly for non 32 or 64 bit counters ... */
664 /* are there any others in SNMP land ? */
665 if (pdp_new[i] < (double) 0.0)
666 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
667 if (pdp_new[i] < (double) 0.0)
668 pdp_new[i] += (double) 18446744069414584320.0;
671 rate = pdp_new[i] / interval;
678 pdp_new[i] = strtod(updvals[i + 1], &endptr);
680 rrd_set_error("converting '%s' to float: %s",
681 updvals[i + 1], rrd_strerror(errno));
684 if (endptr[0] != '\0') {
686 ("conversion of '%s' to float not complete: tail '%s'",
687 updvals[i + 1], endptr);
690 rate = pdp_new[i] / interval;
694 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
696 rrd_set_error("converting '%s' to float: %s",
697 updvals[i + 1], rrd_strerror(errno));
700 if (endptr[0] != '\0') {
702 ("conversion of '%s' to float not complete: tail '%s'",
703 updvals[i + 1], endptr);
706 rate = pdp_new[i] / interval;
709 rrd_set_error("rrd contains unknown DS type : '%s'",
713 /* break out of this for loop if the error string is set */
714 if (rrd_test_error()) {
717 /* make sure pdp_temp is neither too large or too small
718 * if any of these occur it becomes unknown ...
721 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
722 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
723 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
724 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
728 /* no news is news all the same */
733 /* make a copy of the command line argument for the next run */
740 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
742 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
743 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
745 /* break out of the argument parsing loop if the error_string is set */
746 if (rrd_test_error()) {
750 /* has a pdp_st moment occurred since the last run ? */
752 if (proc_pdp_st == occu_pdp_st) {
753 /* no we have not passed a pdp_st moment. therefore update is simple */
755 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
756 if (isnan(pdp_new[i])) {
757 /* this is not realy accurate if we use subsecond data arival time
758 should have thought of it when going subsecond resolution ...
759 sorry next format change we will have it! */
760 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
763 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
764 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
766 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
775 rrd.pdp_prep[i].scratch[PDP_val].u_val,
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
780 /* an pdp_st has occurred. */
782 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
783 rate*seconds which occurred up to the last run.
784 pdp_new[] contains rate*seconds from the latest run.
785 pdp_temp[] will contain the rate for cdp */
787 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
788 /* update pdp_prep to the current pdp_st. */
789 double pre_unknown = 0.0;
791 if (isnan(pdp_new[i])) {
792 /* a final bit of unkonwn to be added bevore calculation
793 we use a temporary variable for this so that we
794 don't have to turn integer lines before using the value */
795 pre_unknown = pre_int;
797 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
798 rrd.pdp_prep[i].scratch[PDP_val].u_val =
799 pdp_new[i] / interval * pre_int;
801 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
802 pdp_new[i] / interval * pre_int;
807 /* if too much of the pdp_prep is unknown we dump it */
809 /* removed because this does not agree with the
810 definition that a heartbeat can be unknown */
811 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
812 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
813 /* if the interval is larger thatn mrhb we get NAN */
814 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
815 (occu_pdp_st - proc_pdp_st <=
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
819 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
820 / ((double) (occu_pdp_st - proc_pdp_st
823 scratch[PDP_unkn_sec_cnt].u_cnt)
827 /* process CDEF data sources; remember each CDEF DS can
828 * only reference other DS with a lower index number */
829 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
833 rpn_expand((rpn_cdefds_t *) &
834 (rrd.ds_def[i].par[DS_cdef]));
835 /* substitue data values for OP_VARIABLE nodes */
836 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
837 if (rpnp[ii].op == OP_VARIABLE) {
838 rpnp[ii].op = OP_NUMBER;
839 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
842 /* run the rpn calculator */
843 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
845 break; /* exits the data sources pdp_temp loop */
849 /* make pdp_prep ready for the next run */
850 if (isnan(pdp_new[i])) {
851 /* this is not realy accurate if we use subsecond data arival time
852 should have thought of it when going subsecond resolution ...
853 sorry next format change we will have it! */
854 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
856 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
858 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
859 rrd.pdp_prep[i].scratch[PDP_val].u_val =
860 pdp_new[i] / interval * post_int;
868 "new_unkn_sec %5lu\n",
870 rrd.pdp_prep[i].scratch[PDP_val].u_val,
871 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
875 /* if there were errors during the last loop, bail out here */
876 if (rrd_test_error()) {
881 /* compute the number of elapsed pdp_st moments */
883 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
885 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
887 if (rra_step_cnt == NULL) {
888 rra_step_cnt = (unsigned long *)
889 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
892 for (i = 0, rra_start = rra_begin;
893 i < rrd.stat_head->rra_cnt;
895 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
896 sizeof(rrd_value_t), i++) {
897 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
898 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
899 (proc_pdp_st / rrd.stat_head->pdp_step) %
900 rrd.rra_def[i].pdp_cnt;
901 if (start_pdp_offset <= elapsed_pdp_st) {
902 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
903 rrd.rra_def[i].pdp_cnt + 1;
908 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
909 /* If this is a bulk update, we need to skip ahead in
910 the seasonal arrays so that they will be correct for
911 the next observed value;
912 note that for the bulk update itself, no update will
913 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
914 and DEVPREDICT will be set to DNAN. */
915 if (rra_step_cnt[i] > 2) {
916 /* skip update by resetting rra_step_cnt[i],
917 note that this is not data source specific; this is
918 due to the bulk update, not a DNAN value for the
919 specific data source. */
921 lookup_seasonal(&rrd, i, rra_start, rrd_file,
922 elapsed_pdp_st, &last_seasonal_coef);
923 lookup_seasonal(&rrd, i, rra_start, rrd_file,
924 elapsed_pdp_st + 1, &seasonal_coef);
927 /* periodically run a smoother for seasonal effects */
928 /* Need to use first cdp parameter buffer to track
929 * burnin (burnin requires a specific smoothing schedule).
930 * The CDP_init_seasonal parameter is really an RRA level,
931 * not a data source within RRA level parameter, but the rra_def
932 * is read only for rrd_update (not flushed to disk). */
933 iii = i * (rrd.stat_head->ds_cnt);
934 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
936 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
937 > rrd.rra_def[i].row_cnt - 1) {
938 /* mark off one of the burnin cycles */
939 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
944 /* someone has no doubt invented a trick to deal with this
945 * wrap around, but at least this code is clear. */
946 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
947 u_cnt > rrd.rra_ptr[i].cur_row) {
948 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
949 * mapping between PDP and CDP */
950 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
952 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
956 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
957 rrd.rra_ptr[i].cur_row,
960 par[RRA_seasonal_smooth_idx].u_cnt);
965 /* can't rely on negative numbers because we are working with
967 /* Don't need modulus here. If we've wrapped more than once, only
968 * one smooth is executed at the end. */
969 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
970 rrd.rra_def[i].row_cnt
971 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
972 rrd.rra_def[i].row_cnt >=
973 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
977 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
978 rrd.rra_ptr[i].cur_row,
981 par[RRA_seasonal_smooth_idx].u_cnt);
988 rra_current = rrd_tell(rrd_file);
990 /* if cf is DEVSEASONAL or SEASONAL */
991 if (rrd_test_error())
994 /* update CDP_PREP areas */
995 /* loop over data soures within each RRA */
996 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
998 /* iii indexes the CDP prep area for this data source within the RRA */
999 iii = i * rrd.stat_head->ds_cnt + ii;
1001 if (rrd.rra_def[i].pdp_cnt > 1) {
1003 if (rra_step_cnt[i] > 0) {
1004 /* If we are in this block, as least 1 CDP value will be written to
1005 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1006 * to be written, then the "fill in" value is the CDP_secondary_val
1008 if (isnan(pdp_temp[ii])) {
1009 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1010 u_cnt += start_pdp_offset;
1011 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1014 /* CDP_secondary value is the RRA "fill in" value for intermediary
1015 * CDP data entries. No matter the CF, the value is the same because
1016 * the average, max, min, and last of a list of identical values is
1017 * the same, namely, the value itself. */
1018 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1019 u_val = pdp_temp[ii];
1022 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1024 rrd.rra_def[i].pdp_cnt *
1025 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1026 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1028 /* initialize carry over */
1029 if (current_cf == CF_AVERAGE) {
1030 if (isnan(pdp_temp[ii])) {
1031 rrd.cdp_prep[iii].scratch[CDP_val].
1034 rrd.cdp_prep[iii].scratch[CDP_val].
1039 rrd.rra_def[i].pdp_cnt);
1042 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1046 rrd_value_t cum_val, cur_val;
1048 switch (current_cf) {
1051 IFDNAN(rrd.cdp_prep[iii].
1052 scratch[CDP_val].u_val, 0.0);
1053 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1055 scratch[CDP_primary_val].u_val =
1057 cur_val * start_pdp_offset) /
1058 (rrd.rra_def[i].pdp_cnt -
1060 scratch[CDP_unkn_pdp_cnt].u_cnt);
1061 /* initialize carry over value */
1062 if (isnan(pdp_temp[ii])) {
1063 rrd.cdp_prep[iii].scratch[CDP_val].
1066 rrd.cdp_prep[iii].scratch[CDP_val].
1071 rrd.rra_def[i].pdp_cnt);
1076 IFDNAN(rrd.cdp_prep[iii].
1077 scratch[CDP_val].u_val, -DINF);
1078 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1081 (rrd.cdp_prep[iii].scratch[CDP_val].
1082 u_val) && isnan(pdp_temp[ii])) {
1084 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1089 if (cur_val > cum_val)
1091 scratch[CDP_primary_val].u_val =
1095 scratch[CDP_primary_val].u_val =
1097 /* initialize carry over value */
1098 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1103 IFDNAN(rrd.cdp_prep[iii].
1104 scratch[CDP_val].u_val, DINF);
1105 cur_val = IFDNAN(pdp_temp[ii], DINF);
1108 (rrd.cdp_prep[iii].scratch[CDP_val].
1109 u_val) && isnan(pdp_temp[ii])) {
1111 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1116 if (cur_val < cum_val)
1118 scratch[CDP_primary_val].u_val =
1122 scratch[CDP_primary_val].u_val =
1124 /* initialize carry over value */
1125 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1131 scratch[CDP_primary_val].u_val =
1133 /* initialize carry over value */
1134 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1138 } /* endif meets xff value requirement for a valid value */
1139 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1140 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1141 if (isnan(pdp_temp[ii]))
1142 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1146 rrd.rra_def[i].pdp_cnt;
1148 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1150 } else { /* rra_step_cnt[i] == 0 */
1154 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1156 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1160 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1162 rrd.cdp_prep[iii].scratch[CDP_val].
1166 if (isnan(pdp_temp[ii])) {
1167 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1168 u_cnt += elapsed_pdp_st;
1171 (rrd.cdp_prep[iii].scratch[CDP_val].
1173 if (current_cf == CF_AVERAGE) {
1174 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1175 pdp_temp[ii] * elapsed_pdp_st;
1177 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1182 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1184 rrd.cdp_prep[iii].scratch[CDP_val].
1188 switch (current_cf) {
1190 rrd.cdp_prep[iii].scratch[CDP_val].
1192 pdp_temp[ii] * elapsed_pdp_st;
1196 rrd.cdp_prep[iii].scratch[CDP_val].
1198 rrd.cdp_prep[iii].scratch[CDP_val].
1199 u_val = pdp_temp[ii];
1203 rrd.cdp_prep[iii].scratch[CDP_val].
1205 rrd.cdp_prep[iii].scratch[CDP_val].
1206 u_val = pdp_temp[ii];
1210 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1216 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1217 if (elapsed_pdp_st > 2) {
1218 switch (current_cf) {
1221 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222 u_val = pdp_temp[ii];
1223 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1224 u_val = pdp_temp[ii];
1227 case CF_DEVSEASONAL:
1228 /* need to update cached seasonal values, so they are consistent
1229 * with the bulk update */
1230 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1231 * CDP_last_deviation are the same. */
1233 scratch[CDP_hw_last_seasonal].u_val =
1234 last_seasonal_coef[ii];
1235 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1236 u_val = seasonal_coef[ii];
1239 /* need to update the null_count and last_null_count.
1240 * even do this for non-DNAN pdp_temp because the
1241 * algorithm is not learning from batch updates. */
1242 rrd.cdp_prep[iii].scratch[CDP_null_count].
1243 u_cnt += elapsed_pdp_st;
1245 scratch[CDP_last_null_count].u_cnt +=
1249 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1251 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1255 /* do not count missed bulk values as failures */
1256 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1258 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1260 /* need to reset violations buffer.
1261 * could do this more carefully, but for now, just
1262 * assume a bulk update wipes away all violations. */
1263 erase_violations(&rrd, iii, i);
1267 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1269 if (rrd_test_error())
1272 } /* endif data sources loop */
1273 } /* end RRA Loop */
1275 /* this loop is only entered if elapsed_pdp_st < 3 */
1276 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1277 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1278 for (i = 0, rra_start = rra_begin;
1279 i < rrd.stat_head->rra_cnt;
1281 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1282 sizeof(rrd_value_t), i++) {
1283 if (rrd.rra_def[i].pdp_cnt > 1)
1286 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1287 if (current_cf == CF_SEASONAL
1288 || current_cf == CF_DEVSEASONAL) {
1289 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1290 elapsed_pdp_st + (scratch_idx ==
1294 rra_current = rrd_tell(rrd_file);
1296 if (rrd_test_error())
1298 /* loop over data soures within each RRA */
1299 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1300 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1301 i * (rrd.stat_head->ds_cnt) + ii,
1302 i, ii, scratch_idx, seasonal_coef);
1304 } /* end RRA Loop */
1305 if (rrd_test_error())
1307 } /* end elapsed_pdp_st loop */
1309 if (rrd_test_error())
1312 /* Ready to write to disk */
1313 /* Move sequentially through the file, writing one RRA at a time.
1314 * Note this architecture divorces the computation of CDP with
1315 * flushing updated RRA entries to disk. */
1316 for (i = 0, rra_start = rra_begin;
1317 i < rrd.stat_head->rra_cnt;
1319 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1320 sizeof(rrd_value_t), i++) {
1321 /* is th5Aere anything to write for this RRA? If not, continue. */
1322 if (rra_step_cnt[i] == 0)
1325 /* write the first row */
1327 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1329 rrd.rra_ptr[i].cur_row++;
1330 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1331 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1332 /* positition on the first row */
1333 rra_pos_tmp = rra_start +
1334 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1335 sizeof(rrd_value_t);
1336 if (rra_pos_tmp != rra_current) {
1337 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1338 rrd_set_error("seek error in rrd");
1341 rra_current = rra_pos_tmp;
1344 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1346 scratch_idx = CDP_primary_val;
1347 if (pcdp_summary != NULL) {
1348 rra_time = (current_time - current_time
1349 % (rrd.rra_def[i].pdp_cnt *
1350 rrd.stat_head->pdp_step))
1353 1) * rrd.rra_def[i].pdp_cnt *
1354 rrd.stat_head->pdp_step);
1357 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1358 scratch_idx, pcdp_summary, &rra_time);
1359 if (rrd_test_error())
1362 /* write other rows of the bulk update, if any */
1363 scratch_idx = CDP_secondary_val;
1364 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1365 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1368 "Wraparound for RRA %s, %lu updates left\n",
1369 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1372 rrd.rra_ptr[i].cur_row = 0;
1373 /* seek back to beginning of current rra */
1374 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1375 rrd_set_error("seek error in rrd");
1379 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1382 rra_current = rra_start;
1384 if (pcdp_summary != NULL) {
1385 rra_time = (current_time - current_time
1386 % (rrd.rra_def[i].pdp_cnt *
1387 rrd.stat_head->pdp_step))
1390 2) * rrd.rra_def[i].pdp_cnt *
1391 rrd.stat_head->pdp_step);
1394 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1395 scratch_idx, pcdp_summary, &rra_time);
1398 if (rrd_test_error())
1402 /* break out of the argument parsing loop if error_string is set */
1403 if (rrd_test_error()) {
1408 } /* endif a pdp_st has occurred */
1409 rrd.live_head->last_up = current_time;
1410 rrd.live_head->last_up_usec = current_time_usec;
1412 } /* function argument loop */
1414 if (seasonal_coef != NULL)
1415 free(seasonal_coef);
1416 if (last_seasonal_coef != NULL)
1417 free(last_seasonal_coef);
1418 if (rra_step_cnt != NULL)
1420 rpnstack_free(&rpnstack);
1422 #if 0 //def HAVE_MMAP
1423 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1424 rrd_set_error("error writing(unmapping) file: %s", filename);
1427 //rrd_flush(rrd_file); //XXX: really needed?
1429 /* if we got here and if there is an error and if the file has not been
1430 * written to, then close things up and return. */
1431 if (rrd_test_error()) {
1432 goto err_free_pdp_new;
1435 /* aargh ... that was tough ... so many loops ... anyway, its done.
1436 * we just need to write back the live header portion now*/
1438 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1439 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1440 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1442 rrd_set_error("seek rrd for live header writeback");
1443 goto err_free_pdp_new;
1447 if (rrd_write(rrd_file, rrd.live_head,
1448 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1449 rrd_set_error("rrd_write live_head to rrd");
1450 goto err_free_pdp_new;
1453 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1454 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1455 rrd_set_error("rrd_write live_head to rrd");
1456 goto err_free_pdp_new;
1461 if (rrd_write(rrd_file, rrd.pdp_prep,
1462 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1463 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1464 rrd_set_error("rrd_write pdp_prep to rrd");
1465 goto err_free_pdp_new;
1468 if (rrd_write(rrd_file, rrd.cdp_prep,
1469 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1470 rrd.stat_head->ds_cnt)
1471 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1472 rrd.stat_head->ds_cnt)) {
1474 rrd_set_error("rrd_write cdp_prep to rrd");
1475 goto err_free_pdp_new;
1478 if (rrd_write(rrd_file, rrd.rra_ptr,
1479 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1480 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1481 rrd_set_error("rrd_write rra_ptr to rrd");
1482 goto err_free_pdp_new;
1485 #ifdef HAVE_POSIX_FADVISExxx
1487 /* with update we have write ops, so they will probably not be done by now, this means
1488 the buffers will not get freed. But calling this for the whole file - header
1489 will let the data off the hook as soon as it is written when if it is from a previous
1490 update cycle. Calling fdsync to force things is much too hard here. */
1492 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1493 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1494 rrd_strerror(errno));
1495 goto err_free_pdp_new;
1498 /* rrd_flush(rrd_file); */
1500 /* calling the smoothing code here guarantees at most
1501 * one smoothing operation per rrd_update call. Unfortunately,
1502 * it is possible with bulk updates, or a long-delayed update
1503 * for smoothing to occur off-schedule. This really isn't
1504 * critical except during the burning cycles. */
1505 if (schedule_smooth) {
1507 rra_start = rra_begin;
1508 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1509 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1510 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1512 fprintf(stderr, "Running smoother for rra %ld\n", i);
1514 apply_smoother(&rrd, i, rra_start, rrd_file);
1515 if (rrd_test_error())
1518 rra_start += rrd.rra_def[i].row_cnt
1519 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1521 #ifdef HAVE_POSIX_FADVISExxx
1522 /* same procedure as above ... */
1524 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1525 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1526 rrd_strerror(errno));
1527 goto err_free_pdp_new;
1533 rrd_close(rrd_file);
1550 rrd_close(rrd_file);
1558 * get exclusive lock to whole file.
1559 * lock gets removed when we close the file
1561 * returns 0 on success
1569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1572 if (_fstat(in_file, &st) == 0) {
1573 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1580 lock.l_type = F_WRLCK; /* exclusive write lock */
1581 lock.l_len = 0; /* whole file */
1582 lock.l_start = 0; /* start of file */
1583 lock.l_whence = SEEK_SET; /* end of file */
1585 rcstat = fcntl(in_file, F_SETLK, &lock);