1 /*****************************************************************************
2 * RRDtool 1.2rc6 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.3 2001/03/04 13:01:56 oetiker
9 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11 * This is backwards compatible! But new files using the Aberrant stuff are not readable
12 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13 * -- Jake Brutlag <jakeb@corp.webtv.net>
15 * Revision 1.2 2001/03/04 11:14:25 oetiker
16 * added at-style-time@value:value syntax to rrd_update
17 * -- Dave Bodenstab <imdave@mcs.net>
18 * Revision 1.1 2001/02/25 22:25:06 oetiker
21 *****************************************************************************/
24 #include <sys/types.h>
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 #include <sys/locking.h>
34 int LockRRD(FILE *rrd_file);
35 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
36 unsigned short CDP_scratch_idx, FILE *rrd_file);
40 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
45 main(int argc, char **argv){
46 rrd_update(argc,argv);
47 if (rrd_test_error()) {
48 printf("RRDtool 1.2rc6 Copyright 1997-2005 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
49 "Usage: rrdupdate filename\n"
50 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
51 "\t\t\ttime|N:value[:value...]\n\n"
52 "\t\t\tat-time@value[:value...]\n\n"
53 "\t\t\t[ time:value[:value...] ..]\n\n");
55 printf("ERROR: %s\n",rrd_get_error());
64 rrd_update(int argc, char **argv)
71 unsigned long rra_begin; /* byte pointer to the rra
72 * area in the rrd file. this
73 * pointer never changes value */
74 unsigned long rra_start; /* byte pointer to the rra
75 * area in the rrd file. this
76 * pointer changes as each rrd is
78 unsigned long rra_current; /* byte pointer to the current write
79 * spot in the rrd file. */
80 unsigned long rra_pos_tmp; /* temporary byte pointer. */
81 unsigned long interval,
82 pre_int,post_int; /* interval between this and
84 unsigned long proc_pdp_st; /* which pdp_st was the last
86 unsigned long occu_pdp_st; /* when was the pdp_st
87 * before the last update
89 unsigned long proc_pdp_age; /* how old was the data in
90 * the pdp prep area when it
92 unsigned long occu_pdp_age; /* how long ago was the last
94 rrd_value_t *pdp_new; /* prepare the incoming data
97 rrd_value_t *pdp_temp; /* prepare the pdp values
101 long *tmpl_idx; /* index representing the settings
102 transported by the template index */
103 long tmpl_cnt = 2; /* time and data */
107 time_t current_time = time(NULL);
109 int schedule_smooth = 0;
110 char *template = NULL;
111 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
112 /* a vector of future Holt-Winters seasonal coefs */
113 unsigned long elapsed_pdp_st;
114 /* number of elapsed PDP steps since last update */
115 unsigned long *rra_step_cnt = NULL;
116 /* number of rows to be updated in an RRA for a data
118 unsigned long start_pdp_offset;
119 /* number of PDP steps since the last update that
120 * are assigned to the first CDP to be generated
121 * since the last update. */
122 unsigned short scratch_idx;
123 /* index into the CDP scratch array */
124 enum cf_en current_cf;
125 /* numeric id of the current consolidation function */
128 static struct option long_options[] =
130 {"template", required_argument, 0, 't'},
133 int option_index = 0;
135 opt = getopt_long(argc, argv, "t:",
136 long_options, &option_index);
147 rrd_set_error("unknown option '%s'",argv[optind-1]);
153 /* need at least 2 arguments: filename, data. */
154 if (argc-optind < 2) {
155 rrd_set_error("Not enough arguments");
159 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
162 rra_current = rra_start = rra_begin = ftell(rrd_file);
163 /* This is defined in the ANSI C standard, section 7.9.5.3:
165 When a file is opened with udpate mode ('+' as the second
166 or third character in the ... list of mode argument
167 variables), both input and ouptut may be performed on the
168 associated stream. However, ... input may not be directly
169 followed by output without an intervening call to a file
170 positioning function, unless the input oepration encounters
172 fseek(rrd_file, 0, SEEK_CUR);
175 /* get exclusive lock to whole file.
176 * lock gets removed when we close the file.
178 if (LockRRD(rrd_file) != 0) {
179 rrd_set_error("could not lock RRD");
185 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
186 rrd_set_error("allocating updvals pointer array");
192 if ((pdp_temp = malloc(sizeof(rrd_value_t)
193 *rrd.stat_head->ds_cnt))==NULL){
194 rrd_set_error("allocating pdp_temp ...");
201 if ((tmpl_idx = malloc(sizeof(unsigned long)
202 *(rrd.stat_head->ds_cnt+1)))==NULL){
203 rrd_set_error("allocating tmpl_idx ...");
210 /* initialize template redirector */
212 tmpl_idx[0] -> 0; (time)
213 tmpl_idx[1] -> 1; (DS 0)
214 tmpl_idx[2] -> 2; (DS 1)
215 tmpl_idx[3] -> 3; (DS 2)
217 for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
218 tmpl_cnt=rrd.stat_head->ds_cnt+1;
223 tmpl_cnt = 1; /* the first entry is the time */
224 tmpl_len = strlen(template);
225 for(i=0;i<=tmpl_len ;i++) {
226 if (template[i] == ':' || template[i] == '\0') {
228 if (tmpl_cnt>rrd.stat_head->ds_cnt){
229 rrd_set_error("Template contains more DS definitions than RRD");
230 free(updvals); free(pdp_temp);
231 free(tmpl_idx); rrd_free(&rrd);
232 fclose(rrd_file); return(-1);
234 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
235 rrd_set_error("unknown DS name '%s'",dsname);
236 free(updvals); free(pdp_temp);
237 free(tmpl_idx); rrd_free(&rrd);
238 fclose(rrd_file); return(-1);
240 /* the first element is always the time */
241 tmpl_idx[tmpl_cnt-1]++;
242 /* go to the next entry on the template */
243 dsname = &template[i+1];
244 /* fix the damage we did before */
253 if ((pdp_new = malloc(sizeof(rrd_value_t)
254 *rrd.stat_head->ds_cnt))==NULL){
255 rrd_set_error("allocating pdp_new ...");
264 /* loop through the arguments. */
265 for(arg_i=optind+1; arg_i<argc;arg_i++) {
266 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
267 char *step_start = stepper;
269 char *parsetime_error = NULL;
270 enum {atstyle, normal} timesyntax;
271 struct time_value ds_tv;
272 if (stepper == NULL){
273 rrd_set_error("failed duplication argv entry");
281 /* initialize all ds input to unknown except the first one
282 which has always got to be set */
283 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
284 strcpy(stepper,argv[arg_i]);
286 /* separate all ds elements; first must be examined separately
287 due to alternate time syntax */
288 if ((p=strchr(stepper,'@'))!=NULL) {
289 timesyntax = atstyle;
292 } else if ((p=strchr(stepper,':'))!=NULL) {
297 rrd_set_error("expected timestamp not found in data source from %s:...",
303 updvals[tmpl_idx[ii]] = stepper;
305 if (*stepper == ':') {
309 updvals[tmpl_idx[ii]] = stepper+1;
315 if (ii != tmpl_cnt-1) {
316 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
317 tmpl_cnt-1, ii, argv[arg_i]);
322 /* get the time from the reading ... handle N */
323 if (timesyntax == atstyle) {
324 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
325 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
329 if (ds_tv.type == RELATIVE_TO_END_TIME ||
330 ds_tv.type == RELATIVE_TO_START_TIME) {
331 rrd_set_error("specifying time relative to the 'start' "
332 "or 'end' makes no sense here: %s",
338 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
339 } else if (strcmp(updvals[0],"N")==0){
340 current_time = time(NULL);
342 current_time = atol(updvals[0]);
345 if(current_time <= rrd.live_head->last_up){
346 rrd_set_error("illegal attempt to update using time %ld when "
347 "last update time is %ld (minimum one second step)",
348 current_time, rrd.live_head->last_up);
354 /* seek to the beginning of the rra's */
355 if (rra_current != rra_begin) {
356 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
357 rrd_set_error("seek error in rrd");
361 rra_current = rra_begin;
363 rra_start = rra_begin;
365 /* when was the current pdp started */
366 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
367 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
369 /* when did the last pdp_st occur */
370 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
371 occu_pdp_st = current_time - occu_pdp_age;
372 interval = current_time - rrd.live_head->last_up;
374 if (occu_pdp_st > proc_pdp_st){
375 /* OK we passed the pdp_st moment*/
376 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
377 * occurred before the latest
379 post_int = occu_pdp_age; /* how much after it */
393 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
394 occu_pdp_age, occu_pdp_st,
395 interval, pre_int, post_int);
398 /* process the data sources and update the pdp_prep
399 * area accordingly */
400 for(i=0;i<rrd.stat_head->ds_cnt;i++){
402 dst_idx= dst_conv(rrd.ds_def[i].dst);
403 if((updvals[i+1][0] != 'U') &&
404 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
406 /* the data source type defines how to process the data */
407 /* pdp_temp contains rate * time ... eg the bytes
408 * transferred during the interval. Doing it this way saves
409 * a lot of math operations */
415 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
416 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
417 if(dst_idx == DST_COUNTER) {
418 /* simple overflow catcher sugestet by andres kroonmaa */
419 /* this will fail terribly for non 32 or 64 bit counters ... */
420 /* are there any others in SNMP land ? */
421 if (pdp_new[i] < (double)0.0 )
422 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
423 if (pdp_new[i] < (double)0.0 )
424 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
426 rate = pdp_new[i] / interval;
433 pdp_new[i]= atof(updvals[i+1]);
434 rate = pdp_new[i] / interval;
437 pdp_new[i] = atof(updvals[i+1]) * interval;
438 rate = pdp_new[i] / interval;
441 rrd_set_error("rrd contains unknown DS type : '%s'",
445 /* break out of this for loop if the error string is set */
446 if (rrd_test_error()){
449 /* make sure pdp_temp is neither too large or too small
450 * if any of these occur it becomes unknown ...
452 if ( ! isnan(rate) &&
453 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
454 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
455 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
456 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
460 /* no news is news all the same */
464 /* make a copy of the command line argument for the next run */
472 rrd.pdp_prep[i].last_ds,
473 updvals[i+1], pdp_new[i]);
475 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
476 strncpy(rrd.pdp_prep[i].last_ds,
477 updvals[i+1],LAST_DS_LEN-1);
478 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
481 /* break out of the argument parsing loop if the error_string is set */
482 if (rrd_test_error()){
486 /* has a pdp_st moment occurred since the last run ? */
488 if (proc_pdp_st == occu_pdp_st){
489 /* no we have not passed a pdp_st moment. therefore update is simple */
491 for(i=0;i<rrd.stat_head->ds_cnt;i++){
492 if(isnan(pdp_new[i]))
493 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
495 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
502 rrd.pdp_prep[i].scratch[PDP_val].u_val,
503 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
507 /* an pdp_st has occurred. */
509 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
510 * occurred up to the last run.
511 pdp_new[] contains rate*seconds from the latest run.
512 pdp_temp[] will contain the rate for cdp */
515 for(i=0;i<rrd.stat_head->ds_cnt;i++){
516 /* update pdp_prep to the current pdp_st */
517 if(isnan(pdp_new[i]))
518 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
520 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
521 pdp_new[i]/(double)interval*(double)pre_int;
523 /* if too much of the pdp_prep is unknown we dump it */
524 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
525 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
526 (occu_pdp_st-proc_pdp_st <=
527 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
530 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
531 / (double)( occu_pdp_st
533 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535 /* make pdp_prep ready for the next run */
536 if(isnan(pdp_new[i])){
537 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
538 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
540 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
541 rrd.pdp_prep[i].scratch[PDP_val].u_val =
542 pdp_new[i]/(double)interval*(double)post_int;
550 "new_unkn_sec %5lu\n",
552 rrd.pdp_prep[i].scratch[PDP_val].u_val,
553 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
557 /* compute the number of elapsed pdp_st moments */
558 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
560 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
562 if (rra_step_cnt == NULL)
564 rra_step_cnt = (unsigned long *)
565 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
568 for(i = 0, rra_start = rra_begin;
569 i < rrd.stat_head->rra_cnt;
570 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
573 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
574 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
575 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
576 if (start_pdp_offset <= elapsed_pdp_st) {
577 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
578 rrd.rra_def[i].pdp_cnt + 1;
583 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
585 /* If this is a bulk update, we need to skip ahead in the seasonal
586 * arrays so that they will be correct for the next observed value;
587 * note that for the bulk update itself, no update will occur to
588 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
590 if (rra_step_cnt[i] > 2)
592 /* skip update by resetting rra_step_cnt[i],
593 * note that this is not data source specific; this is due
594 * to the bulk update, not a DNAN value for the specific data
597 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
598 &last_seasonal_coef);
599 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
603 /* periodically run a smoother for seasonal effects */
604 /* Need to use first cdp parameter buffer to track
605 * burnin (burnin requires a specific smoothing schedule).
606 * The CDP_init_seasonal parameter is really an RRA level,
607 * not a data source within RRA level parameter, but the rra_def
608 * is read only for rrd_update (not flushed to disk). */
609 iii = i*(rrd.stat_head -> ds_cnt);
610 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
613 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
614 > rrd.rra_def[i].row_cnt - 1) {
615 /* mark off one of the burnin cycles */
616 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
620 /* someone has no doubt invented a trick to deal with this
621 * wrap around, but at least this code is clear. */
622 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
623 rrd.rra_ptr[i].cur_row)
625 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
626 * mapping between PDP and CDP */
627 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
628 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
632 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
633 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
634 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
639 /* can't rely on negative numbers because we are working with
641 /* Don't need modulus here. If we've wrapped more than once, only
642 * one smooth is executed at the end. */
643 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
644 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
645 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
649 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
650 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
651 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
658 rra_current = ftell(rrd_file);
659 } /* if cf is DEVSEASONAL or SEASONAL */
661 if (rrd_test_error()) break;
663 /* update CDP_PREP areas */
664 /* loop over data soures within each RRA */
666 ii < rrd.stat_head->ds_cnt;
670 /* iii indexes the CDP prep area for this data source within the RRA */
671 iii=i*rrd.stat_head->ds_cnt+ii;
673 if (rrd.rra_def[i].pdp_cnt > 1) {
675 if (rra_step_cnt[i] > 0) {
676 /* If we are in this block, as least 1 CDP value will be written to
677 * disk, this is the CDP_primary_val entry. If more than 1 value needs
678 * to be written, then the "fill in" value is the CDP_secondary_val
680 if (isnan(pdp_temp[ii]))
682 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
683 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
685 /* CDP_secondary value is the RRA "fill in" value for intermediary
686 * CDP data entries. No matter the CF, the value is the same because
687 * the average, max, min, and last of a list of identical values is
688 * the same, namely, the value itself. */
689 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
692 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
693 > rrd.rra_def[i].pdp_cnt*
694 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
696 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
697 /* initialize carry over */
698 if (current_cf == CF_AVERAGE) {
699 if (isnan(pdp_temp[ii])) {
700 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
702 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
703 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
706 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
709 rrd_value_t cum_val, cur_val;
710 switch (current_cf) {
712 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
713 cur_val = IFDNAN(pdp_temp[ii],0.0);
714 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
715 (cum_val + cur_val) /
716 (rrd.rra_def[i].pdp_cnt
717 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
718 /* initialize carry over value */
719 if (isnan(pdp_temp[ii])) {
720 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
722 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
723 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
727 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
728 cur_val = IFDNAN(pdp_temp[ii],-DINF);
730 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
731 isnan(pdp_temp[ii])) {
733 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
738 if (cur_val > cum_val)
739 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
741 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
742 /* initialize carry over value */
743 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
746 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
747 cur_val = IFDNAN(pdp_temp[ii],DINF);
749 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
750 isnan(pdp_temp[ii])) {
752 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
757 if (cur_val < cum_val)
758 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
760 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
761 /* initialize carry over value */
762 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
766 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
767 /* initialize carry over value */
768 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
771 } /* endif meets xff value requirement for a valid value */
772 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
773 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
774 if (isnan(pdp_temp[ii]))
775 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
776 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
778 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
779 } else /* rra_step_cnt[i] == 0 */
782 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
783 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
786 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
787 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
790 if (isnan(pdp_temp[ii])) {
791 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
792 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
794 if (current_cf == CF_AVERAGE) {
795 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
798 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
801 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
802 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
805 switch (current_cf) {
807 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
811 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
812 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
815 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
820 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
825 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
826 if (elapsed_pdp_st > 2)
828 switch (current_cf) {
831 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
832 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
836 /* need to update cached seasonal values, so they are consistent
837 * with the bulk update */
838 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
839 * CDP_last_deviation are the same. */
840 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
841 last_seasonal_coef[ii];
842 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
846 /* need to update the null_count and last_null_count.
847 * even do this for non-DNAN pdp_temp because the
848 * algorithm is not learning from batch updates. */
849 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
851 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
855 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
856 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
859 /* do not count missed bulk values as failures */
860 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
861 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
862 /* need to reset violations buffer.
863 * could do this more carefully, but for now, just
864 * assume a bulk update wipes away all violations. */
865 erase_violations(&rrd, iii, i);
869 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
871 if (rrd_test_error()) break;
873 } /* endif data sources loop */
876 /* this loop is only entered if elapsed_pdp_st < 3 */
877 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
878 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
880 for(i = 0, rra_start = rra_begin;
881 i < rrd.stat_head->rra_cnt;
882 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
885 if (rrd.rra_def[i].pdp_cnt > 1) continue;
887 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
888 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
890 lookup_seasonal(&rrd,i,rra_start,rrd_file,
891 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
894 if (rrd_test_error()) break;
895 /* loop over data soures within each RRA */
897 ii < rrd.stat_head->ds_cnt;
900 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
901 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
902 scratch_idx, seasonal_coef);
905 if (rrd_test_error()) break;
906 } /* end elapsed_pdp_st loop */
908 if (rrd_test_error()) break;
910 /* Ready to write to disk */
911 /* Move sequentially through the file, writing one RRA at a time.
912 * Note this architecture divorces the computation of CDP with
913 * flushing updated RRA entries to disk. */
914 for(i = 0, rra_start = rra_begin;
915 i < rrd.stat_head->rra_cnt;
916 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
918 /* is there anything to write for this RRA? If not, continue. */
919 if (rra_step_cnt[i] == 0) continue;
921 /* write the first row */
923 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
925 rrd.rra_ptr[i].cur_row++;
926 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
927 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
928 /* positition on the first row */
929 rra_pos_tmp = rra_start +
930 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
931 if(rra_pos_tmp != rra_current) {
932 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
933 rrd_set_error("seek error in rrd");
936 rra_current = rra_pos_tmp;
939 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
941 scratch_idx = CDP_primary_val;
942 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
943 if (rrd_test_error()) break;
945 /* write other rows of the bulk update, if any */
946 scratch_idx = CDP_secondary_val;
947 for ( ; rra_step_cnt[i] > 1;
948 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
950 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
953 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
954 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
957 rrd.rra_ptr[i].cur_row = 0;
958 /* seek back to beginning of current rra */
959 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
961 rrd_set_error("seek error in rrd");
965 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
967 rra_current = rra_start;
969 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
972 if (rrd_test_error())
976 /* break out of the argument parsing loop if error_string is set */
977 if (rrd_test_error()){
982 } /* endif a pdp_st has occurred */
983 rrd.live_head->last_up = current_time;
985 } /* function argument loop */
987 if (seasonal_coef != NULL) free(seasonal_coef);
988 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
989 if (rra_step_cnt != NULL) free(rra_step_cnt);
991 /* if we got here and if there is an error and if the file has not been
992 * written to, then close things up and return. */
993 if (rrd_test_error()) {
1003 /* aargh ... that was tough ... so many loops ... anyway, its done.
1004 * we just need to write back the live header portion now*/
1006 if (fseek(rrd_file, (sizeof(stat_head_t)
1007 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1008 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1010 rrd_set_error("seek rrd for live header writeback");
1020 if(fwrite( rrd.live_head,
1021 sizeof(live_head_t), 1, rrd_file) != 1){
1022 rrd_set_error("fwrite live_head to rrd");
1032 if(fwrite( rrd.pdp_prep,
1034 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1035 rrd_set_error("ftwrite pdp_prep to rrd");
1045 if(fwrite( rrd.cdp_prep,
1047 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1048 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1050 rrd_set_error("ftwrite cdp_prep to rrd");
1060 if(fwrite( rrd.rra_ptr,
1062 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1063 rrd_set_error("fwrite rra_ptr to rrd");
1073 /* OK now close the files and free the memory */
1074 if(fclose(rrd_file) != 0){
1075 rrd_set_error("closing rrd");
1084 /* calling the smoothing code here guarantees at most
1085 * one smoothing operation per rrd_update call. Unfortunately,
1086 * it is possible with bulk updates, or a long-delayed update
1087 * for smoothing to occur off-schedule. This really isn't
1088 * critical except during the burning cycles. */
1089 if (schedule_smooth)
1091 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1092 rrd_file = fopen(argv[optind],"rb+");
1094 rrd_file = fopen(argv[optind],"r+");
1096 rra_start = rra_begin;
1097 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1099 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1100 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1103 fprintf(stderr,"Running smoother for rra %ld\n",i);
1105 apply_smoother(&rrd,i,rra_start,rrd_file);
1106 if (rrd_test_error())
1109 rra_start += rrd.rra_def[i].row_cnt
1110 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1123 * get exclusive lock to whole file.
1124 * lock gets removed when we close the file
1126 * returns 0 on success
1129 LockRRD(FILE *rrdfile)
1131 int rrd_fd; /* File descriptor for RRD */
1134 rrd_fd = fileno(rrdfile);
1137 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1140 if ( _fstat( rrd_fd, &st ) == 0 ) {
1141 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1147 lock.l_type = F_WRLCK; /* exclusive write lock */
1148 lock.l_len = 0; /* whole file */
1149 lock.l_start = 0; /* start of file */
1150 lock.l_whence = SEEK_SET; /* end of file */
1152 stat = fcntl(rrd_fd, F_SETLK, &lock);
1161 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1162 unsigned short CDP_scratch_idx, FILE *rrd_file)
1164 unsigned long ds_idx, cdp_idx;
1166 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1168 /* compute the cdp index */
1169 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1171 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1172 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1173 rrd -> rra_def[rra_idx].cf_nam);
1176 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1177 sizeof(rrd_value_t),1,rrd_file) != 1)
1179 rrd_set_error("writing rrd");
1182 *rra_current += sizeof(rrd_value_t);