1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
13 # include <sys/mman.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(
49 struct __timezone *tz)
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
64 * normilize time as returned by gettimeofday. usec part must
67 static void normalize_time(
72 t->tv_usec += 1000000L;
76 /* Local prototypes */
81 info_t *write_RRA_row(
83 unsigned long rra_idx,
84 unsigned long *rra_current,
85 unsigned short CDP_scratch_idx,
93 void *rrd_mmaped_file);
95 info_t *write_RRA_row(
97 unsigned long rra_idx,
98 unsigned long *rra_current,
99 unsigned short CDP_scratch_idx,
101 info_t *pcdp_summary,
105 const char *filename,
110 const char *filename,
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
119 info_t *rrd_update_v(
124 info_t *result = NULL;
129 opterr = 0; /* initialize getopt */
132 static struct option long_options[] = {
133 {"template", required_argument, 0, 't'},
136 int option_index = 0;
139 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
150 rrd_set_error("unknown option '%s'", argv[optind - 1]);
155 /* need at least 2 arguments: filename, data. */
156 if (argc - optind < 2) {
157 rrd_set_error("Not enough arguments");
161 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162 rc.u_int = _rrd_update(argv[optind], tmplt,
164 (const char **) (argv + optind + 1), result);
165 result->value.u_int = rc.u_int;
178 opterr = 0; /* initialize getopt */
181 static struct option long_options[] = {
182 {"template", required_argument, 0, 't'},
185 int option_index = 0;
188 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
199 rrd_set_error("unknown option '%s'", argv[optind - 1]);
204 /* need at least 2 arguments: filename, data. */
205 if (argc - optind < 2) {
206 rrd_set_error("Not enough arguments");
211 rc = rrd_update_r(argv[optind], tmplt,
212 argc - optind - 1, (const char **) (argv + optind + 1));
217 const char *filename,
222 return _rrd_update(filename, tmplt, argc, argv, NULL);
226 const char *filename,
230 info_t *pcdp_summary)
235 unsigned long i, ii, iii = 1;
237 unsigned long rra_begin; /* byte pointer to the rra
238 * area in the rrd file. this
239 * pointer never changes value */
240 unsigned long rra_start; /* byte pointer to the rra
241 * area in the rrd file. this
242 * pointer changes as each rrd is
244 unsigned long rra_current; /* byte pointer to the current write
245 * spot in the rrd file. */
246 unsigned long rra_pos_tmp; /* temporary byte pointer. */
247 double interval, pre_int, post_int; /* interval between this and
249 unsigned long proc_pdp_st; /* which pdp_st was the last
251 unsigned long occu_pdp_st; /* when was the pdp_st
252 * before the last update
254 unsigned long proc_pdp_age; /* how old was the data in
255 * the pdp prep area when it
256 * was last updated */
257 unsigned long occu_pdp_age; /* how long ago was the last
259 rrd_value_t *pdp_new; /* prepare the incoming data
260 * to be added the the
262 rrd_value_t *pdp_temp; /* prepare the pdp values
263 * to be added the the
266 long *tmpl_idx; /* index representing the settings
267 transported by the tmplt index */
268 unsigned long tmpl_cnt = 2; /* time and data */
271 time_t current_time = 0;
272 time_t rra_time = 0; /* time of update for a RRA */
273 unsigned long current_time_usec = 0; /* microseconds part of current time */
274 struct timeval tmp_time; /* used for time conversion */
277 int schedule_smooth = 0;
278 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
280 /* a vector of future Holt-Winters seasonal coefs */
281 unsigned long elapsed_pdp_st;
283 /* number of elapsed PDP steps since last update */
284 unsigned long *rra_step_cnt = NULL;
286 /* number of rows to be updated in an RRA for a data
288 unsigned long start_pdp_offset;
290 /* number of PDP steps since the last update that
291 * are assigned to the first CDP to be generated
292 * since the last update. */
293 unsigned short scratch_idx;
295 /* index into the CDP scratch array */
296 enum cf_en current_cf;
298 /* numeric id of the current consolidation function */
299 rpnstack_t rpnstack; /* used for COMPUTE DS */
300 int version; /* rrd version */
301 char *endptr; /* used in the conversion */
302 rrd_file_t *rrd_file;
304 rpnstack_init(&rpnstack);
306 /* need at least 1 arguments: data. */
308 rrd_set_error("Not enough arguments");
312 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313 if (rrd_file == NULL) {
317 /* initialize time */
318 version = atoi(rrd.stat_head->version);
319 gettimeofday(&tmp_time, 0);
320 normalize_time(&tmp_time);
321 current_time = tmp_time.tv_sec;
323 current_time_usec = tmp_time.tv_usec;
325 current_time_usec = 0;
328 rra_current = rra_start = rra_begin = rrd_file->header_len;
329 /* This is defined in the ANSI C standard, section 7.9.5.3:
331 When a file is opened with udpate mode ('+' as the second
332 or third character in the ... list of mode argument
333 variables), both input and output may be performed on the
334 associated stream. However, ... input may not be directly
335 followed by output without an intervening call to a file
336 positioning function, unless the input operation encounters
338 #if 0 //def HAVE_MMAP
339 rrd_filesize = rrd_file->file_size;
340 fseek(rrd_file->fd, 0, SEEK_END);
341 rrd_filesize = ftell(rrd_file->fd);
342 fseek(rrd_file->fd, rra_current, SEEK_SET);
344 // fseek(rrd_file->fd, 0, SEEK_CUR);
348 /* get exclusive lock to whole file.
349 * lock gets removed when we close the file.
351 if (LockRRD(rrd_file->fd) != 0) {
352 rrd_set_error("could not lock RRD");
359 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360 rrd_set_error("allocating updvals pointer array");
366 if ((pdp_temp = malloc(sizeof(rrd_value_t)
367 * rrd.stat_head->ds_cnt)) == NULL) {
368 rrd_set_error("allocating pdp_temp ...");
375 if ((tmpl_idx = malloc(sizeof(unsigned long)
376 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377 rrd_set_error("allocating tmpl_idx ...");
384 /* initialize tmplt redirector */
385 /* default config example (assume DS 1 is a CDEF DS)
386 tmpl_idx[0] -> 0; (time)
387 tmpl_idx[1] -> 1; (DS 0)
388 tmpl_idx[2] -> 3; (DS 2)
389 tmpl_idx[3] -> 4; (DS 3) */
390 tmpl_idx[0] = 0; /* time */
391 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
398 /* we should work on a writeable copy here */
400 unsigned int tmpl_len;
401 char *tmplt_copy = strdup(tmplt);
404 tmpl_cnt = 1; /* the first entry is the time */
405 tmpl_len = strlen(tmplt_copy);
406 for (i = 0; i <= tmpl_len; i++) {
407 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408 tmplt_copy[i] = '\0';
409 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
411 ("tmplt contains more DS definitions than RRD");
419 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420 rrd_set_error("unknown DS name '%s'", dsname);
429 /* the first element is always the time */
430 tmpl_idx[tmpl_cnt - 1]++;
431 /* go to the next entry on the tmplt_copy */
432 dsname = &tmplt_copy[i + 1];
433 /* fix the damage we did before */
443 if ((pdp_new = malloc(sizeof(rrd_value_t)
444 * rrd.stat_head->ds_cnt)) == NULL) {
445 rrd_set_error("allocating pdp_new ...");
453 #if 0 //def HAVE_MMAP
454 rrd_mmaped_file = mmap(0,
456 PROT_READ | PROT_WRITE,
457 MAP_SHARED, fileno(in_file), 0);
458 if (rrd_mmaped_file == MAP_FAILED) {
459 rrd_set_error("error mmapping file %s", filename);
468 /* when we use mmaping we tell the kernel the mmap equivalent
469 of POSIX_FADV_RANDOM */
470 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
473 /* loop through the arguments. */
474 for (arg_i = 0; arg_i < argc; arg_i++) {
475 char *stepper = strdup(argv[arg_i]);
476 char *step_start = stepper;
478 char *parsetime_error = NULL;
479 enum { atstyle, normal } timesyntax;
480 struct rrd_time_value ds_tv;
482 if (stepper == NULL) {
483 rrd_set_error("failed duplication argv entry");
495 /* initialize all ds input to unknown except the first one
496 which has always got to be set */
497 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
499 updvals[0] = stepper;
500 /* separate all ds elements; first must be examined separately
501 due to alternate time syntax */
502 if ((p = strchr(stepper, '@')) != NULL) {
503 timesyntax = atstyle;
506 } else if ((p = strchr(stepper, ':')) != NULL) {
512 ("expected timestamp not found in data source from %s",
518 updvals[tmpl_idx[ii]] = stepper;
520 if (*stepper == ':') {
524 updvals[tmpl_idx[ii]] = stepper + 1;
530 if (ii != tmpl_cnt - 1) {
532 ("expected %lu data source readings (got %lu) from %s",
533 tmpl_cnt - 1, ii, argv[arg_i]);
538 /* get the time from the reading ... handle N */
539 if (timesyntax == atstyle) {
540 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
545 if (ds_tv.type == RELATIVE_TO_END_TIME ||
546 ds_tv.type == RELATIVE_TO_START_TIME) {
547 rrd_set_error("specifying time relative to the 'start' "
548 "or 'end' makes no sense here: %s", updvals[0]);
553 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
555 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
557 } else if (strcmp(updvals[0], "N") == 0) {
558 gettimeofday(&tmp_time, 0);
559 normalize_time(&tmp_time);
560 current_time = tmp_time.tv_sec;
561 current_time_usec = tmp_time.tv_usec;
565 tmp = strtod(updvals[0], 0);
566 current_time = floor(tmp);
568 (long) ((tmp - (double) current_time) * 1000000.0);
570 /* dont do any correction for old version RRDs */
572 current_time_usec = 0;
574 if (current_time < rrd.live_head->last_up ||
575 (current_time == rrd.live_head->last_up &&
576 (long) current_time_usec <=
577 (long) rrd.live_head->last_up_usec)) {
578 rrd_set_error("illegal attempt to update using time %ld when "
579 "last update time is %ld (minimum one second step)",
580 current_time, rrd.live_head->last_up);
586 /* seek to the beginning of the rra's */
587 if (rra_current != rra_begin) {
589 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
590 rrd_set_error("seek error in rrd");
595 rra_current = rra_begin;
597 rra_start = rra_begin;
599 /* when was the current pdp started */
600 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
601 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
603 /* when did the last pdp_st occur */
604 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
605 occu_pdp_st = current_time - occu_pdp_age;
607 /* interval = current_time - rrd.live_head->last_up; */
608 interval = (double) (current_time - rrd.live_head->last_up)
609 + (double) ((long) current_time_usec -
610 (long) rrd.live_head->last_up_usec) / 1000000.0;
612 if (occu_pdp_st > proc_pdp_st) {
613 /* OK we passed the pdp_st moment */
614 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
615 * occurred before the latest
617 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
618 post_int = occu_pdp_age; /* how much after it */
619 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
626 printf("proc_pdp_age %lu\t"
632 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
633 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
636 /* process the data sources and update the pdp_prep
637 * area accordingly */
638 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
641 dst_idx = dst_conv(rrd.ds_def[i].dst);
643 /* make sure we do not build diffs with old last_ds values */
644 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
645 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
646 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
649 /* NOTE: DST_CDEF should never enter this if block, because
650 * updvals[i+1][0] is initialized to 'U'; unless the caller
651 * accidently specified a value for the DST_CDEF. To handle
652 * this case, an extra check is required. */
654 if ((updvals[i + 1][0] != 'U') &&
655 (dst_idx != DST_CDEF) &&
656 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
659 /* the data source type defines how to process the data */
660 /* pdp_new contains rate * time ... eg the bytes
661 * transferred during the interval. Doing it this way saves
662 * a lot of math operations */
668 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
669 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
670 if ((updvals[i + 1][ii] < '0'
671 || updvals[i + 1][ii] > '9') && (ii != 0
677 rrd_set_error("not a simple integer: '%s'",
682 if (rrd_test_error()) {
686 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
687 if (dst_idx == DST_COUNTER) {
688 /* simple overflow catcher suggested by Andres Kroonmaa */
689 /* this will fail terribly for non 32 or 64 bit counters ... */
690 /* are there any others in SNMP land ? */
691 if (pdp_new[i] < (double) 0.0)
692 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
693 if (pdp_new[i] < (double) 0.0)
694 pdp_new[i] += (double) 18446744069414584320.0;
697 rate = pdp_new[i] / interval;
704 pdp_new[i] = strtod(updvals[i + 1], &endptr);
706 rrd_set_error("converting '%s' to float: %s",
707 updvals[i + 1], rrd_strerror(errno));
710 if (endptr[0] != '\0') {
712 ("conversion of '%s' to float not complete: tail '%s'",
713 updvals[i + 1], endptr);
716 rate = pdp_new[i] / interval;
720 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
722 rrd_set_error("converting '%s' to float: %s",
723 updvals[i + 1], rrd_strerror(errno));
726 if (endptr[0] != '\0') {
728 ("conversion of '%s' to float not complete: tail '%s'",
729 updvals[i + 1], endptr);
732 rate = pdp_new[i] / interval;
735 rrd_set_error("rrd contains unknown DS type : '%s'",
739 /* break out of this for loop if the error string is set */
740 if (rrd_test_error()) {
743 /* make sure pdp_temp is neither too large or too small
744 * if any of these occur it becomes unknown ...
747 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
748 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
749 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
750 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
754 /* no news is news all the same */
759 /* make a copy of the command line argument for the next run */
766 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
768 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
769 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
771 /* break out of the argument parsing loop if the error_string is set */
772 if (rrd_test_error()) {
776 /* has a pdp_st moment occurred since the last run ? */
778 if (proc_pdp_st == occu_pdp_st) {
779 /* no we have not passed a pdp_st moment. therefore update is simple */
781 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
782 if (isnan(pdp_new[i])) {
783 /* this is not realy accurate if we use subsecond data arival time
784 should have thought of it when going subsecond resolution ...
785 sorry next format change we will have it! */
786 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
789 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
790 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
792 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
801 rrd.pdp_prep[i].scratch[PDP_val].u_val,
802 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
806 /* an pdp_st has occurred. */
808 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
809 * occurred up to the last run.
810 pdp_new[] contains rate*seconds from the latest run.
811 pdp_temp[] will contain the rate for cdp */
813 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
814 /* update pdp_prep to the current pdp_st. */
815 double pre_unknown = 0.0;
817 if (isnan(pdp_new[i]))
818 /* a final bit of unkonwn to be added bevore calculation
819 * we use a tempaorary variable for this so that we
820 * don't have to turn integer lines before using the value */
821 pre_unknown = pre_int;
823 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
824 rrd.pdp_prep[i].scratch[PDP_val].u_val =
825 pdp_new[i] / interval * pre_int;
827 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
828 pdp_new[i] / interval * pre_int;
833 /* if too much of the pdp_prep is unknown we dump it */
835 /* removed because this does not agree with the definition
836 a heart beat can be unknown */
837 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
838 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
839 /* if the interval is larger thatn mrhb we get NAN */
840 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
841 (occu_pdp_st - proc_pdp_st <=
842 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
845 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
846 / ((double) (occu_pdp_st - proc_pdp_st
849 scratch[PDP_unkn_sec_cnt].u_cnt)
853 /* process CDEF data sources; remember each CDEF DS can
854 * only reference other DS with a lower index number */
855 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
859 rpn_expand((rpn_cdefds_t *) &
860 (rrd.ds_def[i].par[DS_cdef]));
861 /* substitue data values for OP_VARIABLE nodes */
862 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
863 if (rpnp[ii].op == OP_VARIABLE) {
864 rpnp[ii].op = OP_NUMBER;
865 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
868 /* run the rpn calculator */
869 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
871 break; /* exits the data sources pdp_temp loop */
875 /* make pdp_prep ready for the next run */
876 if (isnan(pdp_new[i])) {
877 /* this is not realy accurate if we use subsecond data arival time
878 should have thought of it when going subsecond resolution ...
879 sorry next format change we will have it! */
880 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
882 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
884 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
885 rrd.pdp_prep[i].scratch[PDP_val].u_val =
886 pdp_new[i] / interval * post_int;
894 "new_unkn_sec %5lu\n",
896 rrd.pdp_prep[i].scratch[PDP_val].u_val,
897 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
901 /* if there were errors during the last loop, bail out here */
902 if (rrd_test_error()) {
907 /* compute the number of elapsed pdp_st moments */
909 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
911 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
913 if (rra_step_cnt == NULL) {
914 rra_step_cnt = (unsigned long *)
915 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
918 for (i = 0, rra_start = rra_begin;
919 i < rrd.stat_head->rra_cnt;
921 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
922 sizeof(rrd_value_t), i++) {
923 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
924 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
925 (proc_pdp_st / rrd.stat_head->pdp_step) %
926 rrd.rra_def[i].pdp_cnt;
927 if (start_pdp_offset <= elapsed_pdp_st) {
928 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
929 rrd.rra_def[i].pdp_cnt + 1;
934 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
935 /* If this is a bulk update, we need to skip ahead in the seasonal
936 * arrays so that they will be correct for the next observed value;
937 * note that for the bulk update itself, no update will occur to
938 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
940 if (rra_step_cnt[i] > 2) {
941 /* skip update by resetting rra_step_cnt[i],
942 * note that this is not data source specific; this is due
943 * to the bulk update, not a DNAN value for the specific data
946 lookup_seasonal(&rrd, i, rra_start, rrd_file,
947 elapsed_pdp_st, &last_seasonal_coef);
948 lookup_seasonal(&rrd, i, rra_start, rrd_file,
949 elapsed_pdp_st + 1, &seasonal_coef);
952 /* periodically run a smoother for seasonal effects */
953 /* Need to use first cdp parameter buffer to track
954 * burnin (burnin requires a specific smoothing schedule).
955 * The CDP_init_seasonal parameter is really an RRA level,
956 * not a data source within RRA level parameter, but the rra_def
957 * is read only for rrd_update (not flushed to disk). */
958 iii = i * (rrd.stat_head->ds_cnt);
959 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
961 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
962 > rrd.rra_def[i].row_cnt - 1) {
963 /* mark off one of the burnin cycles */
964 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
969 /* someone has no doubt invented a trick to deal with this
970 * wrap around, but at least this code is clear. */
971 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
972 u_cnt > rrd.rra_ptr[i].cur_row) {
973 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
974 * mapping between PDP and CDP */
975 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
977 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
981 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
982 rrd.rra_ptr[i].cur_row,
985 par[RRA_seasonal_smooth_idx].u_cnt);
990 /* can't rely on negative numbers because we are working with
992 /* Don't need modulus here. If we've wrapped more than once, only
993 * one smooth is executed at the end. */
994 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
995 rrd.rra_def[i].row_cnt
996 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
997 rrd.rra_def[i].row_cnt >=
998 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
1002 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1003 rrd.rra_ptr[i].cur_row,
1006 par[RRA_seasonal_smooth_idx].u_cnt);
1008 schedule_smooth = 1;
1013 rra_current = rrd_tell(rrd_file);
1015 /* if cf is DEVSEASONAL or SEASONAL */
1016 if (rrd_test_error())
1019 /* update CDP_PREP areas */
1020 /* loop over data soures within each RRA */
1021 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1023 /* iii indexes the CDP prep area for this data source within the RRA */
1024 iii = i * rrd.stat_head->ds_cnt + ii;
1026 if (rrd.rra_def[i].pdp_cnt > 1) {
1028 if (rra_step_cnt[i] > 0) {
1029 /* If we are in this block, as least 1 CDP value will be written to
1030 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1031 * to be written, then the "fill in" value is the CDP_secondary_val
1033 if (isnan(pdp_temp[ii])) {
1034 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1035 u_cnt += start_pdp_offset;
1036 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1039 /* CDP_secondary value is the RRA "fill in" value for intermediary
1040 * CDP data entries. No matter the CF, the value is the same because
1041 * the average, max, min, and last of a list of identical values is
1042 * the same, namely, the value itself. */
1043 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1044 u_val = pdp_temp[ii];
1047 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1049 rrd.rra_def[i].pdp_cnt *
1050 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1051 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1053 /* initialize carry over */
1054 if (current_cf == CF_AVERAGE) {
1055 if (isnan(pdp_temp[ii])) {
1056 rrd.cdp_prep[iii].scratch[CDP_val].
1059 rrd.cdp_prep[iii].scratch[CDP_val].
1064 rrd.rra_def[i].pdp_cnt);
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1071 rrd_value_t cum_val, cur_val;
1073 switch (current_cf) {
1076 IFDNAN(rrd.cdp_prep[iii].
1077 scratch[CDP_val].u_val, 0.0);
1078 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1080 scratch[CDP_primary_val].u_val =
1082 cur_val * start_pdp_offset) /
1083 (rrd.rra_def[i].pdp_cnt -
1085 scratch[CDP_unkn_pdp_cnt].u_cnt);
1086 /* initialize carry over value */
1087 if (isnan(pdp_temp[ii])) {
1088 rrd.cdp_prep[iii].scratch[CDP_val].
1091 rrd.cdp_prep[iii].scratch[CDP_val].
1096 rrd.rra_def[i].pdp_cnt);
1101 IFDNAN(rrd.cdp_prep[iii].
1102 scratch[CDP_val].u_val, -DINF);
1103 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1106 (rrd.cdp_prep[iii].scratch[CDP_val].
1107 u_val) && isnan(pdp_temp[ii])) {
1109 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1114 if (cur_val > cum_val)
1116 scratch[CDP_primary_val].u_val =
1120 scratch[CDP_primary_val].u_val =
1122 /* initialize carry over value */
1123 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1128 IFDNAN(rrd.cdp_prep[iii].
1129 scratch[CDP_val].u_val, DINF);
1130 cur_val = IFDNAN(pdp_temp[ii], DINF);
1133 (rrd.cdp_prep[iii].scratch[CDP_val].
1134 u_val) && isnan(pdp_temp[ii])) {
1136 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1141 if (cur_val < cum_val)
1143 scratch[CDP_primary_val].u_val =
1147 scratch[CDP_primary_val].u_val =
1149 /* initialize carry over value */
1150 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1156 scratch[CDP_primary_val].u_val =
1158 /* initialize carry over value */
1159 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1163 } /* endif meets xff value requirement for a valid value */
1164 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1165 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1166 if (isnan(pdp_temp[ii]))
1167 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1171 rrd.rra_def[i].pdp_cnt;
1173 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1175 } else { /* rra_step_cnt[i] == 0 */
1179 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1181 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1185 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1187 rrd.cdp_prep[iii].scratch[CDP_val].
1191 if (isnan(pdp_temp[ii])) {
1192 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1193 u_cnt += elapsed_pdp_st;
1196 (rrd.cdp_prep[iii].scratch[CDP_val].
1198 if (current_cf == CF_AVERAGE) {
1199 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1200 pdp_temp[ii] * elapsed_pdp_st;
1202 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1207 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1209 rrd.cdp_prep[iii].scratch[CDP_val].
1213 switch (current_cf) {
1215 rrd.cdp_prep[iii].scratch[CDP_val].
1217 pdp_temp[ii] * elapsed_pdp_st;
1221 rrd.cdp_prep[iii].scratch[CDP_val].
1223 rrd.cdp_prep[iii].scratch[CDP_val].
1224 u_val = pdp_temp[ii];
1228 rrd.cdp_prep[iii].scratch[CDP_val].
1230 rrd.cdp_prep[iii].scratch[CDP_val].
1231 u_val = pdp_temp[ii];
1235 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1241 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1242 if (elapsed_pdp_st > 2) {
1243 switch (current_cf) {
1246 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1247 u_val = pdp_temp[ii];
1248 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1249 u_val = pdp_temp[ii];
1252 case CF_DEVSEASONAL:
1253 /* need to update cached seasonal values, so they are consistent
1254 * with the bulk update */
1255 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1256 * CDP_last_deviation are the same. */
1258 scratch[CDP_hw_last_seasonal].u_val =
1259 last_seasonal_coef[ii];
1260 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1261 u_val = seasonal_coef[ii];
1264 /* need to update the null_count and last_null_count.
1265 * even do this for non-DNAN pdp_temp because the
1266 * algorithm is not learning from batch updates. */
1267 rrd.cdp_prep[iii].scratch[CDP_null_count].
1268 u_cnt += elapsed_pdp_st;
1270 scratch[CDP_last_null_count].u_cnt +=
1274 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1276 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1280 /* do not count missed bulk values as failures */
1281 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1283 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1285 /* need to reset violations buffer.
1286 * could do this more carefully, but for now, just
1287 * assume a bulk update wipes away all violations. */
1288 erase_violations(&rrd, iii, i);
1292 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1294 if (rrd_test_error())
1297 } /* endif data sources loop */
1298 } /* end RRA Loop */
1300 /* this loop is only entered if elapsed_pdp_st < 3 */
1301 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1302 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1303 for (i = 0, rra_start = rra_begin;
1304 i < rrd.stat_head->rra_cnt;
1306 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1307 sizeof(rrd_value_t), i++) {
1308 if (rrd.rra_def[i].pdp_cnt > 1)
1311 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1312 if (current_cf == CF_SEASONAL
1313 || current_cf == CF_DEVSEASONAL) {
1314 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1315 elapsed_pdp_st + (scratch_idx ==
1319 rra_current = rrd_tell(rrd_file);
1321 if (rrd_test_error())
1323 /* loop over data soures within each RRA */
1324 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1325 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1326 i * (rrd.stat_head->ds_cnt) + ii,
1327 i, ii, scratch_idx, seasonal_coef);
1329 } /* end RRA Loop */
1330 if (rrd_test_error())
1332 } /* end elapsed_pdp_st loop */
1334 if (rrd_test_error())
1337 /* Ready to write to disk */
1338 /* Move sequentially through the file, writing one RRA at a time.
1339 * Note this architecture divorces the computation of CDP with
1340 * flushing updated RRA entries to disk. */
1341 for (i = 0, rra_start = rra_begin;
1342 i < rrd.stat_head->rra_cnt;
1344 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1345 sizeof(rrd_value_t), i++) {
1346 /* is th5Aere anything to write for this RRA? If not, continue. */
1347 if (rra_step_cnt[i] == 0)
1350 /* write the first row */
1352 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1354 rrd.rra_ptr[i].cur_row++;
1355 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1356 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1357 /* positition on the first row */
1358 rra_pos_tmp = rra_start +
1359 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1360 sizeof(rrd_value_t);
1361 if (rra_pos_tmp != rra_current) {
1363 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1364 rrd_set_error("seek error in rrd");
1368 rra_current = rra_pos_tmp;
1371 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1373 scratch_idx = CDP_primary_val;
1374 if (pcdp_summary != NULL) {
1375 rra_time = (current_time - current_time
1376 % (rrd.rra_def[i].pdp_cnt *
1377 rrd.stat_head->pdp_step))
1380 1) * rrd.rra_def[i].pdp_cnt *
1381 rrd.stat_head->pdp_step);
1385 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1386 rrd_file->fd, pcdp_summary, &rra_time,
1387 rrd_file->file_start);
1390 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1391 rrd_file->fd, pcdp_summary, &rra_time);
1393 if (rrd_test_error())
1396 /* write other rows of the bulk update, if any */
1397 scratch_idx = CDP_secondary_val;
1398 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1399 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1402 "Wraparound for RRA %s, %lu updates left\n",
1403 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1406 rrd.rra_ptr[i].cur_row = 0;
1407 /* seek back to beginning of current rra */
1408 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1409 rrd_set_error("seek error in rrd");
1413 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1416 rra_current = rra_start;
1418 if (pcdp_summary != NULL) {
1419 rra_time = (current_time - current_time
1420 % (rrd.rra_def[i].pdp_cnt *
1421 rrd.stat_head->pdp_step))
1424 2) * rrd.rra_def[i].pdp_cnt *
1425 rrd.stat_head->pdp_step);
1429 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1430 rrd_file->fd, pcdp_summary, &rra_time,
1431 rrd_file->file_start);
1434 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1435 rrd_file->fd, pcdp_summary, &rra_time);
1439 if (rrd_test_error())
1443 /* break out of the argument parsing loop if error_string is set */
1444 if (rrd_test_error()) {
1449 } /* endif a pdp_st has occurred */
1450 rrd.live_head->last_up = current_time;
1451 rrd.live_head->last_up_usec = current_time_usec;
1453 } /* function argument loop */
1455 if (seasonal_coef != NULL)
1456 free(seasonal_coef);
1457 if (last_seasonal_coef != NULL)
1458 free(last_seasonal_coef);
1459 if (rra_step_cnt != NULL)
1461 rpnstack_free(&rpnstack);
1464 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1465 rrd_set_error("error writing(unmapping) file: %s", filename);
1468 /* if we got here and if there is an error and if the file has not been
1469 * written to, then close things up and return. */
1470 if (rrd_test_error()) {
1476 close(rrd_file->fd);
1480 /* aargh ... that was tough ... so many loops ... anyway, its done.
1481 * we just need to write back the live header portion now*/
1483 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1484 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1485 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1487 rrd_set_error("seek rrd for live header writeback");
1493 close(rrd_file->fd);
1498 if (rrd_write(rrd_file, rrd.live_head,
1499 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1500 rrd_set_error("rrd_write live_head to rrd");
1506 close(rrd_file->fd);
1510 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1511 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1512 rrd_set_error("rrd_write live_head to rrd");
1518 close(rrd_file->fd);
1524 if (rrd_write(rrd_file, rrd.pdp_prep,
1525 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1526 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1527 rrd_set_error("rrd_write pdp_prep to rrd");
1533 close(rrd_file->fd);
1537 if (rrd_write(rrd_file, rrd.cdp_prep,
1538 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539 rrd.stat_head->ds_cnt)
1540 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1541 rrd.stat_head->ds_cnt)) {
1543 rrd_set_error("rrd_write cdp_prep to rrd");
1549 close(rrd_file->fd);
1553 if (rrd_write(rrd_file, rrd.rra_ptr,
1554 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1555 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1556 rrd_set_error("rrd_write rra_ptr to rrd");
1562 close(rrd_file->fd);
1565 #ifdef HAVE_POSIX_FADVISExxx
1567 /* with update we have write ops, so they will probably not be done by now, this means
1568 the buffers will not get freed. But calling this for the whole file - header
1569 will let the data off the hook as soon as it is written when if it is from a previous
1570 update cycle. Calling fdsync to force things is much too hard here. */
1572 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1573 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1574 rrd_strerror(errno));
1575 close(rrd_file->fd);
1579 /*XXX: ? */ rrd_flush(rrd_file);
1581 /* calling the smoothing code here guarantees at most
1582 * one smoothing operation per rrd_update call. Unfortunately,
1583 * it is possible with bulk updates, or a long-delayed update
1584 * for smoothing to occur off-schedule. This really isn't
1585 * critical except during the burning cycles. */
1586 if (schedule_smooth) {
1587 // in_file = fopen(filename,"rb+");
1590 rra_start = rra_begin;
1591 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1592 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1593 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1595 fprintf(stderr, "Running smoother for rra %ld\n", i);
1597 apply_smoother(&rrd, i, rra_start, rrd_file);
1598 if (rrd_test_error())
1601 rra_start += rrd.rra_def[i].row_cnt
1602 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1604 #ifdef HAVE_POSIX_FADVISExxx
1605 /* same procedure as above ... */
1607 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1608 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1609 rrd_strerror(errno));
1610 close(rrd_file->fd);
1614 close(rrd_file->fd);
1617 /* OK now close the files and free the memory */
1618 if (close(rrd_file->fd) != 0) {
1619 rrd_set_error("closing rrd");
1637 * get exclusive lock to whole file.
1638 * lock gets removed when we close the file
1640 * returns 0 on success
1648 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1651 if (_fstat(in_file, &st) == 0) {
1652 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1659 lock.l_type = F_WRLCK; /* exclusive write lock */
1660 lock.l_len = 0; /* whole file */
1661 lock.l_start = 0; /* start of file */
1662 lock.l_whence = SEEK_SET; /* end of file */
1664 rcstat = fcntl(in_file, F_SETLK, &lock);
1684 unsigned long rra_idx,
1685 unsigned long *rra_current,
1686 unsigned short CDP_scratch_idx,
1688 int UNUSED(in_file),
1692 info_t *pcdp_summary,
1694 void *rrd_mmaped_file)
1707 unsigned long rra_idx,
1708 unsigned long *rra_current,
1709 unsigned short CDP_scratch_idx,
1711 info_t *pcdp_summary,
1715 unsigned long ds_idx, cdp_idx;
1718 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1719 /* compute the cdp index */
1720 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1722 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1723 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1724 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1726 if (pcdp_summary != NULL) {
1727 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1728 /* append info to the return hash */
1729 pcdp_summary = info_push(pcdp_summary,
1730 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1732 rrd->rra_def[rra_idx].
1734 rrd->rra_def[rra_idx].
1736 rrd->ds_def[ds_idx].
1737 ds_nam), RD_I_VAL, iv);
1740 memcpy((char *) rrd_mmaped_file + *rra_current,
1741 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1742 sizeof(rrd_value_t));
1746 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1747 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1748 rrd_set_error("writing rrd");
1752 *rra_current += sizeof(rrd_value_t);
1754 return (pcdp_summary);