1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.3 2001/03/04 13:01:55 oetiker
9 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11 * This is backwards compatible! But new files using the Aberrant stuff are not readable
12 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13 * -- Jake Brutlag <jakeb@corp.webtv.net>
15 * Revision 1.2 2001/03/04 11:14:25 oetiker
16 * added at-style-time@value:value syntax to rrd_update
17 * -- Dave Bodenstab <imdave@mcs.net>
19 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
22 *****************************************************************************/
25 #include <sys/types.h>
29 #include <sys/locking.h>
35 int LockRRD(FILE *rrd_file);
36 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
37 unsigned short CDP_scratch_idx, FILE *rrd_file);
41 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
46 main(int argc, char **argv){
47 rrd_update(argc,argv);
48 if (rrd_test_error()) {
49 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
50 "Usage: rrdupdate filename\n"
51 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
52 "\t\t\ttime|N:value[:value...]\n\n"
53 "\t\t\tat-time@value[:value...]\n\n"
54 "\t\t\t[ time:value[:value...] ..]\n\n");
56 printf("ERROR: %s\n",rrd_get_error());
65 rrd_update(int argc, char **argv)
72 unsigned long rra_begin; /* byte pointer to the rra
73 * area in the rrd file. this
74 * pointer never changes value */
75 unsigned long rra_start; /* byte pointer to the rra
76 * area in the rrd file. this
77 * pointer changes as each rrd is
79 unsigned long rra_current; /* byte pointer to the current write
80 * spot in the rrd file. */
81 unsigned long rra_pos_tmp; /* temporary byte pointer. */
82 unsigned long interval,
83 pre_int,post_int; /* interval between this and
85 unsigned long proc_pdp_st; /* which pdp_st was the last
87 unsigned long occu_pdp_st; /* when was the pdp_st
88 * before the last update
90 unsigned long proc_pdp_age; /* how old was the data in
91 * the pdp prep area when it
93 unsigned long occu_pdp_age; /* how long ago was the last
95 rrd_value_t *pdp_new; /* prepare the incoming data
98 rrd_value_t *pdp_temp; /* prepare the pdp values
102 long *tmpl_idx; /* index representing the settings
103 transported by the template index */
104 long tmpl_cnt = 2; /* time and data */
108 time_t current_time = time(NULL);
110 int schedule_smooth = 0;
111 char *template = NULL;
112 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
113 /* a vector of future Holt-Winters seasonal coefs */
114 unsigned long elapsed_pdp_st;
115 /* number of elapsed PDP steps since last update */
116 unsigned long *rra_step_cnt = NULL;
117 /* number of rows to be updated in an RRA for a data
119 unsigned long start_pdp_offset;
120 /* number of PDP steps since the last update that
121 * are assigned to the first CDP to be generated
122 * since the last update. */
123 unsigned short scratch_idx;
124 /* index into the CDP scratch array */
125 enum cf_en current_cf;
126 /* numeric id of the current consolidation function */
129 static struct option long_options[] =
131 {"template", required_argument, 0, 't'},
134 int option_index = 0;
136 opt = getopt_long(argc, argv, "t:",
137 long_options, &option_index);
148 rrd_set_error("unknown option '%s'",argv[optind-1]);
154 /* need at least 2 arguments: filename, data. */
155 if (argc-optind < 2) {
156 rrd_set_error("Not enough arguments");
160 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
163 rra_current = rra_start = rra_begin = ftell(rrd_file);
164 /* This is defined in the ANSI C standard, section 7.9.5.3:
166 When a file is opened with udpate mode ('+' as the second
167 or third character in the ... list of mode argument
168 variables), both input and ouptut may be performed on the
169 associated stream. However, ... input may not be directly
170 followed by output without an intervening call to a file
171 positioning function, unless the input oepration encounters
173 fseek(rrd_file, 0, SEEK_CUR);
176 /* get exclusive lock to whole file.
177 * lock gets removed when we close the file.
179 if (LockRRD(rrd_file) != 0) {
180 rrd_set_error("could not lock RRD");
186 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
187 rrd_set_error("allocating updvals pointer array");
193 if ((pdp_temp = malloc(sizeof(rrd_value_t)
194 *rrd.stat_head->ds_cnt))==NULL){
195 rrd_set_error("allocating pdp_temp ...");
202 if ((tmpl_idx = malloc(sizeof(unsigned long)
203 *(rrd.stat_head->ds_cnt+1)))==NULL){
204 rrd_set_error("allocating tmpl_idx ...");
211 /* initialize template redirector */
213 tmpl_idx[0] -> 0; (time)
214 tmpl_idx[1] -> 1; (DS 0)
215 tmpl_idx[2] -> 2; (DS 1)
216 tmpl_idx[3] -> 3; (DS 2)
218 for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
219 tmpl_cnt=rrd.stat_head->ds_cnt+1;
224 tmpl_cnt = 1; /* the first entry is the time */
225 tmpl_len = strlen(template);
226 for(i=0;i<=tmpl_len ;i++) {
227 if (template[i] == ':' || template[i] == '\0') {
229 if (tmpl_cnt>rrd.stat_head->ds_cnt){
230 rrd_set_error("Template contains more DS definitions than RRD");
231 free(updvals); free(pdp_temp);
232 free(tmpl_idx); rrd_free(&rrd);
233 fclose(rrd_file); return(-1);
235 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
236 rrd_set_error("unknown DS name '%s'",dsname);
237 free(updvals); free(pdp_temp);
238 free(tmpl_idx); rrd_free(&rrd);
239 fclose(rrd_file); return(-1);
241 /* the first element is always the time */
242 tmpl_idx[tmpl_cnt-1]++;
243 /* go to the next entry on the template */
244 dsname = &template[i+1];
245 /* fix the damage we did before */
254 if ((pdp_new = malloc(sizeof(rrd_value_t)
255 *rrd.stat_head->ds_cnt))==NULL){
256 rrd_set_error("allocating pdp_new ...");
265 /* loop through the arguments. */
266 for(arg_i=optind+1; arg_i<argc;arg_i++) {
267 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
268 char *step_start = stepper;
270 char *parsetime_error = NULL;
271 enum {atstyle, normal} timesyntax;
272 struct time_value ds_tv;
273 if (stepper == NULL){
274 rrd_set_error("failed duplication argv entry");
282 /* initialize all ds input to unknown except the first one
283 which has always got to be set */
284 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
285 strcpy(stepper,argv[arg_i]);
287 /* separate all ds elements; first must be examined separately
288 due to alternate time syntax */
289 if ((p=strchr(stepper,'@'))!=NULL) {
290 timesyntax = atstyle;
293 } else if ((p=strchr(stepper,':'))!=NULL) {
298 rrd_set_error("expected timestamp not found in data source from %s:...",
304 updvals[tmpl_idx[ii]] = stepper;
306 if (*stepper == ':') {
310 updvals[tmpl_idx[ii]] = stepper+1;
316 if (ii != tmpl_cnt-1) {
317 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
318 tmpl_cnt-1, ii, argv[arg_i]);
323 /* get the time from the reading ... handle N */
324 if (timesyntax == atstyle) {
325 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
326 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
330 if (ds_tv.type == RELATIVE_TO_END_TIME ||
331 ds_tv.type == RELATIVE_TO_START_TIME) {
332 rrd_set_error("specifying time relative to the 'start' "
333 "or 'end' makes no sense here: %s",
339 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
340 } else if (strcmp(updvals[0],"N")==0){
341 current_time = time(NULL);
343 current_time = atol(updvals[0]);
346 if(current_time <= rrd.live_head->last_up){
347 rrd_set_error("illegal attempt to update using time %ld when "
348 "last update time is %ld (minimum one second step)",
349 current_time, rrd.live_head->last_up);
355 /* seek to the beginning of the rra's */
356 if (rra_current != rra_begin) {
357 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
358 rrd_set_error("seek error in rrd");
362 rra_current = rra_begin;
364 rra_start = rra_begin;
366 /* when was the current pdp started */
367 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
368 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
370 /* when did the last pdp_st occur */
371 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
372 occu_pdp_st = current_time - occu_pdp_age;
373 interval = current_time - rrd.live_head->last_up;
375 if (occu_pdp_st > proc_pdp_st){
376 /* OK we passed the pdp_st moment*/
377 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
378 * occurred before the latest
380 post_int = occu_pdp_age; /* how much after it */
394 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
395 occu_pdp_age, occu_pdp_st,
396 interval, pre_int, post_int);
399 /* process the data sources and update the pdp_prep
400 * area accordingly */
401 for(i=0;i<rrd.stat_head->ds_cnt;i++){
403 dst_idx= dst_conv(rrd.ds_def[i].dst);
404 if((updvals[i+1][0] != 'U') &&
405 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
407 /* the data source type defines how to process the data */
408 /* pdp_temp contains rate * time ... eg the bytes
409 * transferred during the interval. Doing it this way saves
410 * a lot of math operations */
416 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
417 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
418 if(dst_idx == DST_COUNTER) {
419 /* simple overflow catcher sugestet by andres kroonmaa */
420 /* this will fail terribly for non 32 or 64 bit counters ... */
421 /* are there any others in SNMP land ? */
422 if (pdp_new[i] < (double)0.0 )
423 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
424 if (pdp_new[i] < (double)0.0 )
425 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
427 rate = pdp_new[i] / interval;
434 pdp_new[i]= atof(updvals[i+1]);
435 rate = pdp_new[i] / interval;
438 pdp_new[i] = atof(updvals[i+1]) * interval;
439 rate = pdp_new[i] / interval;
442 rrd_set_error("rrd contains unknown DS type : '%s'",
446 /* break out of this for loop if the error string is set */
447 if (rrd_test_error()){
450 /* make sure pdp_temp is neither too large or too small
451 * if any of these occur it becomes unknown ...
453 if ( ! isnan(rate) &&
454 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
455 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
456 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
457 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
461 /* no news is news all the same */
465 /* make a copy of the command line argument for the next run */
473 rrd.pdp_prep[i].last_ds,
474 updvals[i+1], pdp_new[i]);
476 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
477 strncpy(rrd.pdp_prep[i].last_ds,
478 updvals[i+1],LAST_DS_LEN-1);
479 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
482 /* break out of the argument parsing loop if the error_string is set */
483 if (rrd_test_error()){
487 /* has a pdp_st moment occurred since the last run ? */
489 if (proc_pdp_st == occu_pdp_st){
490 /* no we have not passed a pdp_st moment. therefore update is simple */
492 for(i=0;i<rrd.stat_head->ds_cnt;i++){
493 if(isnan(pdp_new[i]))
494 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
496 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
503 rrd.pdp_prep[i].scratch[PDP_val].u_val,
504 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
508 /* an pdp_st has occurred. */
510 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
511 * occurred up to the last run.
512 pdp_new[] contains rate*seconds from the latest run.
513 pdp_temp[] will contain the rate for cdp */
516 for(i=0;i<rrd.stat_head->ds_cnt;i++){
517 /* update pdp_prep to the current pdp_st */
518 if(isnan(pdp_new[i]))
519 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
521 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
522 pdp_new[i]/(double)interval*(double)pre_int;
524 /* if too much of the pdp_prep is unknown we dump it */
525 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
526 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
527 (occu_pdp_st-proc_pdp_st <=
528 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
531 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
532 / (double)( occu_pdp_st
534 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
536 /* make pdp_prep ready for the next run */
537 if(isnan(pdp_new[i])){
538 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
539 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
541 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
542 rrd.pdp_prep[i].scratch[PDP_val].u_val =
543 pdp_new[i]/(double)interval*(double)post_int;
551 "new_unkn_sec %5lu\n",
553 rrd.pdp_prep[i].scratch[PDP_val].u_val,
554 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
558 /* compute the number of elapsed pdp_st moments */
559 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
561 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
563 if (rra_step_cnt == NULL)
565 rra_step_cnt = (unsigned long *)
566 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
569 for(i = 0, rra_start = rra_begin;
570 i < rrd.stat_head->rra_cnt;
571 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
574 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
575 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
576 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
577 if (start_pdp_offset <= elapsed_pdp_st) {
578 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
579 rrd.rra_def[i].pdp_cnt + 1;
584 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
586 /* If this is a bulk update, we need to skip ahead in the seasonal
587 * arrays so that they will be correct for the next observed value;
588 * note that for the bulk update itself, no update will occur to
589 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
591 if (rra_step_cnt[i] > 2)
593 /* skip update by resetting rra_step_cnt[i],
594 * note that this is not data source specific; this is due
595 * to the bulk update, not a DNAN value for the specific data
598 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
599 &last_seasonal_coef);
600 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
604 /* periodically run a smoother for seasonal effects */
605 /* Need to use first cdp parameter buffer to track
606 * burnin (burnin requires a specific smoothing schedule).
607 * The CDP_init_seasonal parameter is really an RRA level,
608 * not a data source within RRA level parameter, but the rra_def
609 * is read only for rrd_update (not flushed to disk). */
610 iii = i*(rrd.stat_head -> ds_cnt);
611 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
614 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
615 > rrd.rra_def[i].row_cnt - 1) {
616 /* mark off one of the burnin cycles */
617 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
621 /* someone has no doubt invented a trick to deal with this
622 * wrap around, but at least this code is clear. */
623 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
624 rrd.rra_ptr[i].cur_row)
626 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
627 * mapping between PDP and CDP */
628 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
629 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
633 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
634 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
635 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
640 /* can't rely on negative numbers because we are working with
642 /* Don't need modulus here. If we've wrapped more than once, only
643 * one smooth is executed at the end. */
644 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
645 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
646 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
650 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
651 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
652 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
659 rra_current = ftell(rrd_file);
660 } /* if cf is DEVSEASONAL or SEASONAL */
662 if (rrd_test_error()) break;
664 /* update CDP_PREP areas */
665 /* loop over data soures within each RRA */
667 ii < rrd.stat_head->ds_cnt;
671 /* iii indexes the CDP prep area for this data source within the RRA */
672 iii=i*rrd.stat_head->ds_cnt+ii;
674 if (rrd.rra_def[i].pdp_cnt > 1) {
676 if (rra_step_cnt[i] > 0) {
677 /* If we are in this block, as least 1 CDP value will be written to
678 * disk, this is the CDP_primary_val entry. If more than 1 value needs
679 * to be written, then the "fill in" value is the CDP_secondary_val
681 if (isnan(pdp_temp[ii]))
683 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
684 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
686 /* CDP_secondary value is the RRA "fill in" value for intermediary
687 * CDP data entries. No matter the CF, the value is the same because
688 * the average, max, min, and last of a list of identical values is
689 * the same, namely, the value itself. */
690 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
693 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
694 > rrd.rra_def[i].pdp_cnt*
695 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
697 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
698 /* initialize carry over */
699 if (current_cf == CF_AVERAGE) {
700 if (isnan(pdp_temp[ii])) {
701 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
703 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
704 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
707 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
710 rrd_value_t cum_val, cur_val;
711 switch (current_cf) {
713 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
714 cur_val = IFDNAN(pdp_temp[ii],0.0);
715 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
716 (cum_val + cur_val) /
717 (rrd.rra_def[i].pdp_cnt
718 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
719 /* initialize carry over value */
720 if (isnan(pdp_temp[ii])) {
721 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
723 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
724 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
728 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
729 cur_val = IFDNAN(pdp_temp[ii],-DINF);
731 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
732 isnan(pdp_temp[ii])) {
734 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
739 if (cur_val > cum_val)
740 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
742 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
743 /* initialize carry over value */
744 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
747 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
748 cur_val = IFDNAN(pdp_temp[ii],DINF);
750 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
751 isnan(pdp_temp[ii])) {
753 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
758 if (cur_val < cum_val)
759 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
761 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
762 /* initialize carry over value */
763 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
767 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
768 /* initialize carry over value */
769 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
772 } /* endif meets xff value requirement for a valid value */
773 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
774 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
775 if (isnan(pdp_temp[ii]))
776 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
777 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
779 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
780 } else /* rra_step_cnt[i] == 0 */
783 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
784 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
787 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
788 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
791 if (isnan(pdp_temp[ii])) {
792 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
793 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
795 if (current_cf == CF_AVERAGE) {
796 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
799 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
802 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
803 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
806 switch (current_cf) {
808 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
812 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
813 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
816 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
817 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
821 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
826 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
827 if (elapsed_pdp_st > 2)
829 switch (current_cf) {
832 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
833 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
837 /* need to update cached seasonal values, so they are consistent
838 * with the bulk update */
839 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
840 * CDP_last_deviation are the same. */
841 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
842 last_seasonal_coef[ii];
843 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
847 /* need to update the null_count and last_null_count.
848 * even do this for non-DNAN pdp_temp because the
849 * algorithm is not learning from batch updates. */
850 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
852 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
856 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
857 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
860 /* do not count missed bulk values as failures */
861 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
862 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
863 /* need to reset violations buffer.
864 * could do this more carefully, but for now, just
865 * assume a bulk update wipes away all violations. */
866 erase_violations(&rrd, iii, i);
870 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
872 if (rrd_test_error()) break;
874 } /* endif data sources loop */
877 /* this loop is only entered if elapsed_pdp_st < 3 */
878 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
879 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
881 for(i = 0, rra_start = rra_begin;
882 i < rrd.stat_head->rra_cnt;
883 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
886 if (rrd.rra_def[i].pdp_cnt > 1) continue;
888 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
889 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
891 lookup_seasonal(&rrd,i,rra_start,rrd_file,
892 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
895 if (rrd_test_error()) break;
896 /* loop over data soures within each RRA */
898 ii < rrd.stat_head->ds_cnt;
901 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
902 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
903 scratch_idx, seasonal_coef);
906 if (rrd_test_error()) break;
907 } /* end elapsed_pdp_st loop */
909 if (rrd_test_error()) break;
911 /* Ready to write to disk */
912 /* Move sequentially through the file, writing one RRA at a time.
913 * Note this architecture divorces the computation of CDP with
914 * flushing updated RRA entries to disk. */
915 for(i = 0, rra_start = rra_begin;
916 i < rrd.stat_head->rra_cnt;
917 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
919 /* is there anything to write for this RRA? If not, continue. */
920 if (rra_step_cnt[i] == 0) continue;
922 /* write the first row */
924 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
926 rrd.rra_ptr[i].cur_row++;
927 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
928 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
929 /* positition on the first row */
930 rra_pos_tmp = rra_start +
931 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
932 if(rra_pos_tmp != rra_current) {
933 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
934 rrd_set_error("seek error in rrd");
937 rra_current = rra_pos_tmp;
940 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
942 scratch_idx = CDP_primary_val;
943 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
944 if (rrd_test_error()) break;
946 /* write other rows of the bulk update, if any */
947 scratch_idx = CDP_secondary_val;
948 for ( ; rra_step_cnt[i] > 1;
949 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
951 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
954 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
955 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
958 rrd.rra_ptr[i].cur_row = 0;
959 /* seek back to beginning of current rra */
960 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
962 rrd_set_error("seek error in rrd");
966 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
968 rra_current = rra_start;
970 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
973 if (rrd_test_error())
977 /* break out of the argument parsing loop if error_string is set */
978 if (rrd_test_error()){
983 } /* endif a pdp_st has occurred */
984 rrd.live_head->last_up = current_time;
986 } /* function argument loop */
988 if (seasonal_coef != NULL) free(seasonal_coef);
989 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
990 if (rra_step_cnt != NULL) free(rra_step_cnt);
992 /* if we got here and if there is an error and if the file has not been
993 * written to, then close things up and return. */
994 if (rrd_test_error()) {
1004 /* aargh ... that was tough ... so many loops ... anyway, its done.
1005 * we just need to write back the live header portion now*/
1007 if (fseek(rrd_file, (sizeof(stat_head_t)
1008 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1009 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1011 rrd_set_error("seek rrd for live header writeback");
1021 if(fwrite( rrd.live_head,
1022 sizeof(live_head_t), 1, rrd_file) != 1){
1023 rrd_set_error("fwrite live_head to rrd");
1033 if(fwrite( rrd.pdp_prep,
1035 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1036 rrd_set_error("ftwrite pdp_prep to rrd");
1046 if(fwrite( rrd.cdp_prep,
1048 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1049 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1051 rrd_set_error("ftwrite cdp_prep to rrd");
1061 if(fwrite( rrd.rra_ptr,
1063 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1064 rrd_set_error("fwrite rra_ptr to rrd");
1074 /* OK now close the files and free the memory */
1075 if(fclose(rrd_file) != 0){
1076 rrd_set_error("closing rrd");
1085 /* calling the smoothing code here guarantees at most
1086 * one smoothing operation per rrd_update call. Unfortunately,
1087 * it is possible with bulk updates, or a long-delayed update
1088 * for smoothing to occur off-schedule. This really isn't
1089 * critical except during the burning cycles. */
1090 if (schedule_smooth)
1093 rrd_file = fopen(argv[optind],"r+");
1095 rrd_file = fopen(argv[optind],"rb+");
1097 rra_start = rra_begin;
1098 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1100 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1101 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1104 fprintf(stderr,"Running smoother for rra %ld\n",i);
1106 apply_smoother(&rrd,i,rra_start,rrd_file);
1107 if (rrd_test_error())
1110 rra_start += rrd.rra_def[i].row_cnt
1111 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1124 * get exclusive lock to whole file.
1125 * lock gets removed when we close the file
1127 * returns 0 on success
1130 LockRRD(FILE *rrdfile)
1132 int rrd_fd; /* File descriptor for RRD */
1135 rrd_fd = fileno(rrdfile);
1140 lock.l_type = F_WRLCK; /* exclusive write lock */
1141 lock.l_len = 0; /* whole file */
1142 lock.l_start = 0; /* start of file */
1143 lock.l_whence = SEEK_SET; /* end of file */
1145 stat = fcntl(rrd_fd, F_SETLK, &lock);
1149 if ( _fstat( rrd_fd, &st ) == 0 ) {
1150 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1162 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1163 unsigned short CDP_scratch_idx, FILE *rrd_file)
1165 unsigned long ds_idx, cdp_idx;
1167 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1169 /* compute the cdp index */
1170 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1172 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1173 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1174 rrd -> rra_def[rra_idx].cf_nam);
1177 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1178 sizeof(rrd_value_t),1,rrd_file) != 1)
1180 rrd_set_error("writing rrd");
1183 *rra_current += sizeof(rrd_value_t);