1 /*****************************************************************************
2 * RRDtool 1.2.15 Copyright by Tobi Oetiker, 1997-2006
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normilize time as returned by gettimeofday. usec part must
64 static void normalize_time(struct timeval *t)
68 t->tv_usec += 1000000L;
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
79 FILE UNUSED(*rrd_file),
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv,
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
100 info_t *result = NULL;
103 optind = 0; opterr = 0; /* initialize getopt */
106 static struct option long_options[] =
108 {"template", required_argument, 0, 't'},
111 int option_index = 0;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, argv + optind + 1, result);
139 result->value.u_int = rc.u_int;
145 rrd_update(int argc, char **argv)
149 optind = 0; opterr = 0; /* initialize getopt */
152 static struct option long_options[] =
154 {"template", required_argument, 0, 't'},
157 int option_index = 0;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, argv + optind + 1);
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv,
196 info_t *pcdp_summary)
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
214 pre_int,post_int; /* interval between this and
216 unsigned long proc_pdp_st; /* which pdp_st was the last
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
239 time_t current_time = 0;
240 time_t rra_time = 0; /* time of update for a RRA */
241 unsigned long current_time_usec=0;/* microseconds part of current time */
242 struct timeval tmp_time; /* used for time conversion */
245 int schedule_smooth = 0;
246 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247 /* a vector of future Holt-Winters seasonal coefs */
248 unsigned long elapsed_pdp_st;
249 /* number of elapsed PDP steps since last update */
250 unsigned long *rra_step_cnt = NULL;
251 /* number of rows to be updated in an RRA for a data
253 unsigned long start_pdp_offset;
254 /* number of PDP steps since the last update that
255 * are assigned to the first CDP to be generated
256 * since the last update. */
257 unsigned short scratch_idx;
258 /* index into the CDP scratch array */
259 enum cf_en current_cf;
260 /* numeric id of the current consolidation function */
261 rpnstack_t rpnstack; /* used for COMPUTE DS */
262 int version; /* rrd version */
263 char *endptr; /* used in the conversion */
265 void *rrd_mmaped_file;
266 unsigned long rrd_filesize;
269 rpnstack_init(&rpnstack);
271 /* need at least 1 arguments: data. */
273 rrd_set_error("Not enough arguments");
279 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
282 /* initialize time */
283 version = atoi(rrd.stat_head->version);
284 gettimeofday(&tmp_time, 0);
285 normalize_time(&tmp_time);
286 current_time = tmp_time.tv_sec;
288 current_time_usec = tmp_time.tv_usec;
291 current_time_usec = 0;
294 rra_current = rra_start = rra_begin = ftell(rrd_file);
295 /* This is defined in the ANSI C standard, section 7.9.5.3:
297 When a file is opened with udpate mode ('+' as the second
298 or third character in the ... list of mode argument
299 variables), both input and ouptut may be performed on the
300 associated stream. However, ... input may not be directly
301 followed by output without an intervening call to a file
302 positioning function, unless the input oepration encounters
305 fseek(rrd_file, 0, SEEK_END);
306 rrd_filesize = ftell(rrd_file);
307 fseek(rrd_file, rra_current, SEEK_SET);
309 fseek(rrd_file, 0, SEEK_CUR);
313 /* get exclusive lock to whole file.
314 * lock gets removed when we close the file.
316 if (LockRRD(rrd_file) != 0) {
317 rrd_set_error("could not lock RRD");
323 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
324 rrd_set_error("allocating updvals pointer array");
330 if ((pdp_temp = malloc(sizeof(rrd_value_t)
331 *rrd.stat_head->ds_cnt))==NULL){
332 rrd_set_error("allocating pdp_temp ...");
339 if ((tmpl_idx = malloc(sizeof(unsigned long)
340 *(rrd.stat_head->ds_cnt+1)))==NULL){
341 rrd_set_error("allocating tmpl_idx ...");
348 /* initialize tmplt redirector */
349 /* default config example (assume DS 1 is a CDEF DS)
350 tmpl_idx[0] -> 0; (time)
351 tmpl_idx[1] -> 1; (DS 0)
352 tmpl_idx[2] -> 3; (DS 2)
353 tmpl_idx[3] -> 4; (DS 3) */
354 tmpl_idx[0] = 0; /* time */
355 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
357 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
363 /* we should work on a writeable copy here */
365 unsigned int tmpl_len;
366 tmplt = strdup(tmplt);
368 tmpl_cnt = 1; /* the first entry is the time */
369 tmpl_len = strlen(tmplt);
370 for(i=0;i<=tmpl_len ;i++) {
371 if (tmplt[i] == ':' || tmplt[i] == '\0') {
373 if (tmpl_cnt>rrd.stat_head->ds_cnt){
374 rrd_set_error("tmplt contains more DS definitions than RRD");
375 free(updvals); free(pdp_temp);
376 free(tmpl_idx); rrd_free(&rrd);
377 fclose(rrd_file); return(-1);
379 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
380 rrd_set_error("unknown DS name '%s'",dsname);
381 free(updvals); free(pdp_temp);
383 free(tmpl_idx); rrd_free(&rrd);
384 fclose(rrd_file); return(-1);
386 /* the first element is always the time */
387 tmpl_idx[tmpl_cnt-1]++;
388 /* go to the next entry on the tmplt */
389 dsname = &tmplt[i+1];
390 /* fix the damage we did before */
400 if ((pdp_new = malloc(sizeof(rrd_value_t)
401 *rrd.stat_head->ds_cnt))==NULL){
402 rrd_set_error("allocating pdp_new ...");
412 rrd_mmaped_file = mmap(0,
414 PROT_READ | PROT_WRITE,
418 if (rrd_mmaped_file == MAP_FAILED) {
419 rrd_set_error("error mmapping file %s", filename);
428 /* loop through the arguments. */
429 for(arg_i=0; arg_i<argc;arg_i++) {
430 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
431 char *step_start = stepper;
433 char *parsetime_error = NULL;
434 enum {atstyle, normal} timesyntax;
435 struct rrd_time_value ds_tv;
436 if (stepper == NULL){
437 rrd_set_error("failed duplication argv entry");
443 munmap(rrd_mmaped_file, rrd_filesize);
448 /* initialize all ds input to unknown except the first one
449 which has always got to be set */
450 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
451 strcpy(stepper,argv[arg_i]);
453 /* separate all ds elements; first must be examined separately
454 due to alternate time syntax */
455 if ((p=strchr(stepper,'@'))!=NULL) {
456 timesyntax = atstyle;
459 } else if ((p=strchr(stepper,':'))!=NULL) {
464 rrd_set_error("expected timestamp not found in data source from %s:...",
470 updvals[tmpl_idx[ii]] = stepper;
472 if (*stepper == ':') {
476 updvals[tmpl_idx[ii]] = stepper+1;
482 if (ii != tmpl_cnt-1) {
483 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
484 tmpl_cnt-1, ii, argv[arg_i]);
489 /* get the time from the reading ... handle N */
490 if (timesyntax == atstyle) {
491 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
492 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
496 if (ds_tv.type == RELATIVE_TO_END_TIME ||
497 ds_tv.type == RELATIVE_TO_START_TIME) {
498 rrd_set_error("specifying time relative to the 'start' "
499 "or 'end' makes no sense here: %s",
505 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
506 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
508 } else if (strcmp(updvals[0],"N")==0){
509 gettimeofday(&tmp_time, 0);
510 normalize_time(&tmp_time);
511 current_time = tmp_time.tv_sec;
512 current_time_usec = tmp_time.tv_usec;
515 tmp = strtod(updvals[0], 0);
516 current_time = floor(tmp);
517 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
519 /* dont do any correction for old version RRDs */
521 current_time_usec = 0;
523 if(current_time < rrd.live_head->last_up ||
524 (current_time == rrd.live_head->last_up &&
525 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
526 rrd_set_error("illegal attempt to update using time %ld when "
527 "last update time is %ld (minimum one second step)",
528 current_time, rrd.live_head->last_up);
534 /* seek to the beginning of the rra's */
535 if (rra_current != rra_begin) {
537 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
538 rrd_set_error("seek error in rrd");
543 rra_current = rra_begin;
545 rra_start = rra_begin;
547 /* when was the current pdp started */
548 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
549 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
551 /* when did the last pdp_st occur */
552 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
553 occu_pdp_st = current_time - occu_pdp_age;
555 /* interval = current_time - rrd.live_head->last_up; */
556 interval = (double)(current_time - rrd.live_head->last_up)
557 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
559 if (occu_pdp_st > proc_pdp_st){
560 /* OK we passed the pdp_st moment*/
561 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
562 * occurred before the latest
564 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
565 post_int = occu_pdp_age; /* how much after it */
566 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
580 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
581 occu_pdp_age, occu_pdp_st,
582 interval, pre_int, post_int);
585 /* process the data sources and update the pdp_prep
586 * area accordingly */
587 for(i=0;i<rrd.stat_head->ds_cnt;i++){
589 dst_idx= dst_conv(rrd.ds_def[i].dst);
591 /* make sure we do not build diffs with old last_ds values */
592 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
593 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
594 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
597 /* NOTE: DST_CDEF should never enter this if block, because
598 * updvals[i+1][0] is initialized to 'U'; unless the caller
599 * accidently specified a value for the DST_CDEF. To handle
600 * this case, an extra check is required. */
602 if((updvals[i+1][0] != 'U') &&
603 (dst_idx != DST_CDEF) &&
604 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
606 /* the data source type defines how to process the data */
607 /* pdp_new contains rate * time ... eg the bytes
608 * transferred during the interval. Doing it this way saves
609 * a lot of math operations */
615 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
616 for(ii=0;updvals[i+1][ii] != '\0';ii++){
617 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
618 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
622 if (rrd_test_error()){
625 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
626 if(dst_idx == DST_COUNTER) {
627 /* simple overflow catcher suggested by Andres Kroonmaa */
628 /* this will fail terribly for non 32 or 64 bit counters ... */
629 /* are there any others in SNMP land ? */
630 if (pdp_new[i] < (double)0.0 )
631 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
632 if (pdp_new[i] < (double)0.0 )
633 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
635 rate = pdp_new[i] / interval;
643 pdp_new[i] = strtod(updvals[i+1],&endptr);
645 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
648 if (endptr[0] != '\0'){
649 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
652 rate = pdp_new[i] / interval;
656 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
658 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661 if (endptr[0] != '\0'){
662 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665 rate = pdp_new[i] / interval;
668 rrd_set_error("rrd contains unknown DS type : '%s'",
672 /* break out of this for loop if the error string is set */
673 if (rrd_test_error()){
676 /* make sure pdp_temp is neither too large or too small
677 * if any of these occur it becomes unknown ...
679 if ( ! isnan(rate) &&
680 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
681 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
682 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
683 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
687 /* no news is news all the same */
692 /* make a copy of the command line argument for the next run */
700 rrd.pdp_prep[i].last_ds,
701 updvals[i+1], pdp_new[i]);
703 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
704 strncpy(rrd.pdp_prep[i].last_ds,
705 updvals[i+1],LAST_DS_LEN-1);
706 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
709 /* break out of the argument parsing loop if the error_string is set */
710 if (rrd_test_error()){
714 /* has a pdp_st moment occurred since the last run ? */
716 if (proc_pdp_st == occu_pdp_st){
717 /* no we have not passed a pdp_st moment. therefore update is simple */
719 for(i=0;i<rrd.stat_head->ds_cnt;i++){
720 if(isnan(pdp_new[i])) {
721 /* this is not realy accurate if we use subsecond data arival time
722 should have thought of it when going subsecond resolution ...
723 sorry next format change we will have it! */
724 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
726 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
727 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
729 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
738 rrd.pdp_prep[i].scratch[PDP_val].u_val,
739 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
743 /* an pdp_st has occurred. */
745 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
746 * occurred up to the last run.
747 pdp_new[] contains rate*seconds from the latest run.
748 pdp_temp[] will contain the rate for cdp */
750 for(i=0;i<rrd.stat_head->ds_cnt;i++){
751 /* update pdp_prep to the current pdp_st. */
752 double pre_unknown = 0.0;
753 if(isnan(pdp_new[i]))
754 /* a final bit of unkonwn to be added bevore calculation
755 * we use a tempaorary variable for this so that we
756 * don't have to turn integer lines before using the value */
757 pre_unknown = pre_int;
759 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
760 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
762 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
767 /* if too much of the pdp_prep is unknown we dump it */
769 /* removed because this does not agree with the definition
770 a heart beat can be unknown */
771 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
772 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
773 /* if the interval is larger thatn mrhb we get NAN */
774 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775 (occu_pdp_st-proc_pdp_st <=
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780 / ((double)(occu_pdp_st - proc_pdp_st
781 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
785 /* process CDEF data sources; remember each CDEF DS can
786 * only reference other DS with a lower index number */
787 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
789 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790 /* substitue data values for OP_VARIABLE nodes */
791 for (ii = 0; rpnp[ii].op != OP_END; ii++)
793 if (rpnp[ii].op == OP_VARIABLE) {
794 rpnp[ii].op = OP_NUMBER;
795 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
798 /* run the rpn calculator */
799 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
801 break; /* exits the data sources pdp_temp loop */
805 /* make pdp_prep ready for the next run */
806 if(isnan(pdp_new[i])){
807 /* this is not realy accurate if we use subsecond data arival time
808 should have thought of it when going subsecond resolution ...
809 sorry next format change we will have it! */
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
811 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
813 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814 rrd.pdp_prep[i].scratch[PDP_val].u_val =
815 pdp_new[i]/interval*post_int;
823 "new_unkn_sec %5lu\n",
825 rrd.pdp_prep[i].scratch[PDP_val].u_val,
826 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
830 /* if there were errors during the last loop, bail out here */
831 if (rrd_test_error()){
836 /* compute the number of elapsed pdp_st moments */
837 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
839 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
841 if (rra_step_cnt == NULL)
843 rra_step_cnt = (unsigned long *)
844 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
847 for(i = 0, rra_start = rra_begin;
848 i < rrd.stat_head->rra_cnt;
849 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
852 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855 if (start_pdp_offset <= elapsed_pdp_st) {
856 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
857 rrd.rra_def[i].pdp_cnt + 1;
862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
864 /* If this is a bulk update, we need to skip ahead in the seasonal
865 * arrays so that they will be correct for the next observed value;
866 * note that for the bulk update itself, no update will occur to
867 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
869 if (rra_step_cnt[i] > 2)
871 /* skip update by resetting rra_step_cnt[i],
872 * note that this is not data source specific; this is due
873 * to the bulk update, not a DNAN value for the specific data
876 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
877 &last_seasonal_coef);
878 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
882 /* periodically run a smoother for seasonal effects */
883 /* Need to use first cdp parameter buffer to track
884 * burnin (burnin requires a specific smoothing schedule).
885 * The CDP_init_seasonal parameter is really an RRA level,
886 * not a data source within RRA level parameter, but the rra_def
887 * is read only for rrd_update (not flushed to disk). */
888 iii = i*(rrd.stat_head -> ds_cnt);
889 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
892 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
893 > rrd.rra_def[i].row_cnt - 1) {
894 /* mark off one of the burnin cycles */
895 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
899 /* someone has no doubt invented a trick to deal with this
900 * wrap around, but at least this code is clear. */
901 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902 rrd.rra_ptr[i].cur_row)
904 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905 * mapping between PDP and CDP */
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
911 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
913 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
918 /* can't rely on negative numbers because we are working with
920 /* Don't need modulus here. If we've wrapped more than once, only
921 * one smooth is executed at the end. */
922 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
928 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
930 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
937 rra_current = ftell(rrd_file);
938 } /* if cf is DEVSEASONAL or SEASONAL */
940 if (rrd_test_error()) break;
942 /* update CDP_PREP areas */
943 /* loop over data soures within each RRA */
945 ii < rrd.stat_head->ds_cnt;
949 /* iii indexes the CDP prep area for this data source within the RRA */
950 iii=i*rrd.stat_head->ds_cnt+ii;
952 if (rrd.rra_def[i].pdp_cnt > 1) {
954 if (rra_step_cnt[i] > 0) {
955 /* If we are in this block, as least 1 CDP value will be written to
956 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957 * to be written, then the "fill in" value is the CDP_secondary_val
959 if (isnan(pdp_temp[ii]))
961 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
964 /* CDP_secondary value is the RRA "fill in" value for intermediary
965 * CDP data entries. No matter the CF, the value is the same because
966 * the average, max, min, and last of a list of identical values is
967 * the same, namely, the value itself. */
968 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
971 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972 > rrd.rra_def[i].pdp_cnt*
973 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
975 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976 /* initialize carry over */
977 if (current_cf == CF_AVERAGE) {
978 if (isnan(pdp_temp[ii])) {
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
988 rrd_value_t cum_val, cur_val;
989 switch (current_cf) {
991 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992 cur_val = IFDNAN(pdp_temp[ii],0.0);
993 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994 (cum_val + cur_val * start_pdp_offset) /
995 (rrd.rra_def[i].pdp_cnt
996 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997 /* initialize carry over value */
998 if (isnan(pdp_temp[ii])) {
999 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1006 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1009 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010 isnan(pdp_temp[ii])) {
1012 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1017 if (cur_val > cum_val)
1018 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021 /* initialize carry over value */
1022 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1025 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026 cur_val = IFDNAN(pdp_temp[ii],DINF);
1028 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029 isnan(pdp_temp[ii])) {
1031 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1036 if (cur_val < cum_val)
1037 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040 /* initialize carry over value */
1041 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046 /* initialize carry over value */
1047 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1050 } /* endif meets xff value requirement for a valid value */
1051 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053 if (isnan(pdp_temp[ii]))
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1055 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1057 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058 } else /* rra_step_cnt[i] == 0 */
1061 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1065 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1069 if (isnan(pdp_temp[ii])) {
1070 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1073 if (current_cf == CF_AVERAGE) {
1074 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1080 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1084 switch (current_cf) {
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1090 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1104 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105 if (elapsed_pdp_st > 2)
1107 switch (current_cf) {
1110 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1114 case CF_DEVSEASONAL:
1115 /* need to update cached seasonal values, so they are consistent
1116 * with the bulk update */
1117 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118 * CDP_last_deviation are the same. */
1119 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120 last_seasonal_coef[ii];
1121 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1125 /* need to update the null_count and last_null_count.
1126 * even do this for non-DNAN pdp_temp because the
1127 * algorithm is not learning from batch updates. */
1128 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1130 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1134 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1138 /* do not count missed bulk values as failures */
1139 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141 /* need to reset violations buffer.
1142 * could do this more carefully, but for now, just
1143 * assume a bulk update wipes away all violations. */
1144 erase_violations(&rrd, iii, i);
1148 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1150 if (rrd_test_error()) break;
1152 } /* endif data sources loop */
1153 } /* end RRA Loop */
1155 /* this loop is only entered if elapsed_pdp_st < 3 */
1156 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1157 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1159 for(i = 0, rra_start = rra_begin;
1160 i < rrd.stat_head->rra_cnt;
1161 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1164 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1166 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1169 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1172 rra_current = ftell(rrd_file);
1174 if (rrd_test_error()) break;
1175 /* loop over data soures within each RRA */
1177 ii < rrd.stat_head->ds_cnt;
1180 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182 scratch_idx, seasonal_coef);
1184 } /* end RRA Loop */
1185 if (rrd_test_error()) break;
1186 } /* end elapsed_pdp_st loop */
1188 if (rrd_test_error()) break;
1190 /* Ready to write to disk */
1191 /* Move sequentially through the file, writing one RRA at a time.
1192 * Note this architecture divorces the computation of CDP with
1193 * flushing updated RRA entries to disk. */
1194 for(i = 0, rra_start = rra_begin;
1195 i < rrd.stat_head->rra_cnt;
1196 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1198 /* is there anything to write for this RRA? If not, continue. */
1199 if (rra_step_cnt[i] == 0) continue;
1201 /* write the first row */
1203 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1205 rrd.rra_ptr[i].cur_row++;
1206 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208 /* positition on the first row */
1209 rra_pos_tmp = rra_start +
1210 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211 if(rra_pos_tmp != rra_current) {
1213 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1214 rrd_set_error("seek error in rrd");
1218 rra_current = rra_pos_tmp;
1222 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1224 scratch_idx = CDP_primary_val;
1225 if (pcdp_summary != NULL)
1227 rra_time = (current_time - current_time
1228 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1229 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1232 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1233 pcdp_summary, &rra_time, rrd_mmaped_file);
1235 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1236 pcdp_summary, &rra_time);
1238 if (rrd_test_error()) break;
1240 /* write other rows of the bulk update, if any */
1241 scratch_idx = CDP_secondary_val;
1242 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1244 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1247 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1248 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1251 rrd.rra_ptr[i].cur_row = 0;
1252 /* seek back to beginning of current rra */
1253 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1255 rrd_set_error("seek error in rrd");
1259 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1261 rra_current = rra_start;
1263 if (pcdp_summary != NULL)
1265 rra_time = (current_time - current_time
1266 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1267 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1270 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1271 pcdp_summary, &rra_time, rrd_mmaped_file);
1273 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1274 pcdp_summary, &rra_time);
1278 if (rrd_test_error())
1282 /* break out of the argument parsing loop if error_string is set */
1283 if (rrd_test_error()){
1288 } /* endif a pdp_st has occurred */
1289 rrd.live_head->last_up = current_time;
1290 rrd.live_head->last_up_usec = current_time_usec;
1292 } /* function argument loop */
1294 if (seasonal_coef != NULL) free(seasonal_coef);
1295 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1296 if (rra_step_cnt != NULL) free(rra_step_cnt);
1297 rpnstack_free(&rpnstack);
1300 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1301 rrd_set_error("error writing(unmapping) file: %s", filename);
1304 /* if we got here and if there is an error and if the file has not been
1305 * written to, then close things up and return. */
1306 if (rrd_test_error()) {
1316 /* aargh ... that was tough ... so many loops ... anyway, its done.
1317 * we just need to write back the live header portion now*/
1319 if (fseek(rrd_file, (sizeof(stat_head_t)
1320 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1321 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1323 rrd_set_error("seek rrd for live header writeback");
1334 if(fwrite( rrd.live_head,
1335 sizeof(live_head_t), 1, rrd_file) != 1){
1336 rrd_set_error("fwrite live_head to rrd");
1347 if(fwrite( &rrd.live_head->last_up,
1348 sizeof(time_t), 1, rrd_file) != 1){
1349 rrd_set_error("fwrite live_head to rrd");
1361 if(fwrite( rrd.pdp_prep,
1363 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1364 rrd_set_error("ftwrite pdp_prep to rrd");
1374 if(fwrite( rrd.cdp_prep,
1376 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1377 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1379 rrd_set_error("ftwrite cdp_prep to rrd");
1389 if(fwrite( rrd.rra_ptr,
1391 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1392 rrd_set_error("fwrite rra_ptr to rrd");
1402 /* OK now close the files and free the memory */
1403 if(fclose(rrd_file) != 0){
1404 rrd_set_error("closing rrd");
1413 /* calling the smoothing code here guarantees at most
1414 * one smoothing operation per rrd_update call. Unfortunately,
1415 * it is possible with bulk updates, or a long-delayed update
1416 * for smoothing to occur off-schedule. This really isn't
1417 * critical except during the burning cycles. */
1418 if (schedule_smooth)
1420 rrd_file = fopen(filename,"rb+");
1421 rra_start = rra_begin;
1422 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1424 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1425 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1428 fprintf(stderr,"Running smoother for rra %ld\n",i);
1430 apply_smoother(&rrd,i,rra_start,rrd_file);
1431 if (rrd_test_error())
1434 rra_start += rrd.rra_def[i].row_cnt
1435 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1448 * get exclusive lock to whole file.
1449 * lock gets removed when we close the file
1451 * returns 0 on success
1454 LockRRD(FILE *rrdfile)
1456 int rrd_fd; /* File descriptor for RRD */
1459 rrd_fd = fileno(rrdfile);
1462 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1465 if ( _fstat( rrd_fd, &st ) == 0 ) {
1466 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1472 lock.l_type = F_WRLCK; /* exclusive write lock */
1473 lock.l_len = 0; /* whole file */
1474 lock.l_start = 0; /* start of file */
1475 lock.l_whence = SEEK_SET; /* end of file */
1477 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1487 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1488 unsigned short CDP_scratch_idx,
1490 FILE UNUSED(*rrd_file),
1494 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1497 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1498 unsigned short CDP_scratch_idx, FILE *rrd_file,
1499 info_t *pcdp_summary, time_t *rra_time)
1502 unsigned long ds_idx, cdp_idx;
1505 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1507 /* compute the cdp index */
1508 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1510 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1511 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1512 rrd -> rra_def[rra_idx].cf_nam);
1514 if (pcdp_summary != NULL)
1516 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1517 /* append info to the return hash */
1518 pcdp_summary = info_push(pcdp_summary,
1519 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1520 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1521 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1525 memcpy((char *)rrd_mmaped_file + *rra_current,
1526 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1527 sizeof(rrd_value_t));
1529 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1530 sizeof(rrd_value_t),1,rrd_file) != 1)
1532 rrd_set_error("writing rrd");
1536 *rra_current += sizeof(rrd_value_t);
1538 return (pcdp_summary);