1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.4 2001/03/10 23:54:41 oetiker
9 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
10 * parser and calculator from rrd_graph and puts then in a new file,
11 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
12 * clean-up of aberrant behavior stuff, including a bug fix.
13 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
14 * -- Jake Brutlag <jakeb@corp.webtv.net>
16 * Revision 1.3 2001/03/04 13:01:55 oetiker
17 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
18 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
19 * This is backwards compatible! But new files using the Aberrant stuff are not readable
20 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
21 * -- Jake Brutlag <jakeb@corp.webtv.net>
23 * Revision 1.2 2001/03/04 11:14:25 oetiker
24 * added at-style-time@value:value syntax to rrd_update
25 * -- Dave Bodenstab <imdave@mcs.net>
27 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
30 *****************************************************************************/
33 #include <sys/types.h>
36 #include "rrd_rpncalc.h"
39 #include <sys/locking.h>
44 /* Local prototypes */
45 int LockRRD(FILE *rrd_file);
46 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
47 unsigned short CDP_scratch_idx, FILE *rrd_file);
49 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
54 main(int argc, char **argv){
55 rrd_update(argc,argv);
56 if (rrd_test_error()) {
57 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
58 "Usage: rrdupdate filename\n"
59 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
60 "\t\t\ttime|N:value[:value...]\n\n"
61 "\t\t\tat-time@value[:value...]\n\n"
62 "\t\t\t[ time:value[:value...] ..]\n\n");
64 printf("ERROR: %s\n",rrd_get_error());
73 rrd_update(int argc, char **argv)
80 unsigned long rra_begin; /* byte pointer to the rra
81 * area in the rrd file. this
82 * pointer never changes value */
83 unsigned long rra_start; /* byte pointer to the rra
84 * area in the rrd file. this
85 * pointer changes as each rrd is
87 unsigned long rra_current; /* byte pointer to the current write
88 * spot in the rrd file. */
89 unsigned long rra_pos_tmp; /* temporary byte pointer. */
90 unsigned long interval,
91 pre_int,post_int; /* interval between this and
93 unsigned long proc_pdp_st; /* which pdp_st was the last
95 unsigned long occu_pdp_st; /* when was the pdp_st
96 * before the last update
98 unsigned long proc_pdp_age; /* how old was the data in
99 * the pdp prep area when it
100 * was last updated */
101 unsigned long occu_pdp_age; /* how long ago was the last
103 rrd_value_t *pdp_new; /* prepare the incoming data
104 * to be added the the
106 rrd_value_t *pdp_temp; /* prepare the pdp values
107 * to be added the the
110 long *tmpl_idx; /* index representing the settings
111 transported by the template index */
112 long tmpl_cnt = 2; /* time and data */
116 time_t current_time = time(NULL);
118 int schedule_smooth = 0;
119 char *template = NULL;
120 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
121 /* a vector of future Holt-Winters seasonal coefs */
122 unsigned long elapsed_pdp_st;
123 /* number of elapsed PDP steps since last update */
124 unsigned long *rra_step_cnt = NULL;
125 /* number of rows to be updated in an RRA for a data
127 unsigned long start_pdp_offset;
128 /* number of PDP steps since the last update that
129 * are assigned to the first CDP to be generated
130 * since the last update. */
131 unsigned short scratch_idx;
132 /* index into the CDP scratch array */
133 enum cf_en current_cf;
134 /* numeric id of the current consolidation function */
135 rpnstack_t rpnstack; /* used for COMPUTE DS */
137 rpnstack_init(&rpnstack);
140 static struct option long_options[] =
142 {"template", required_argument, 0, 't'},
145 int option_index = 0;
147 opt = getopt_long(argc, argv, "t:",
148 long_options, &option_index);
159 rrd_set_error("unknown option '%s'",argv[optind-1]);
165 /* need at least 2 arguments: filename, data. */
166 if (argc-optind < 2) {
167 rrd_set_error("Not enough arguments");
171 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
174 rra_current = rra_start = rra_begin = ftell(rrd_file);
175 /* This is defined in the ANSI C standard, section 7.9.5.3:
177 When a file is opened with udpate mode ('+' as the second
178 or third character in the ... list of mode argument
179 variables), both input and ouptut may be performed on the
180 associated stream. However, ... input may not be directly
181 followed by output without an intervening call to a file
182 positioning function, unless the input oepration encounters
184 fseek(rrd_file, 0, SEEK_CUR);
187 /* get exclusive lock to whole file.
188 * lock gets removed when we close the file.
190 if (LockRRD(rrd_file) != 0) {
191 rrd_set_error("could not lock RRD");
197 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
198 rrd_set_error("allocating updvals pointer array");
204 if ((pdp_temp = malloc(sizeof(rrd_value_t)
205 *rrd.stat_head->ds_cnt))==NULL){
206 rrd_set_error("allocating pdp_temp ...");
213 if ((tmpl_idx = malloc(sizeof(unsigned long)
214 *(rrd.stat_head->ds_cnt+1)))==NULL){
215 rrd_set_error("allocating tmpl_idx ...");
222 /* initialize template redirector */
223 /* default config example (assume DS 1 is a CDEF DS)
224 tmpl_idx[0] -> 0; (time)
225 tmpl_idx[1] -> 1; (DS 0)
226 tmpl_idx[2] -> 3; (DS 2)
227 tmpl_idx[3] -> 4; (DS 3) */
228 tmpl_idx[0] = 0; /* time */
229 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
231 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
240 tmpl_cnt = 1; /* the first entry is the time */
241 tmpl_len = strlen(template);
242 for(i=0;i<=tmpl_len ;i++) {
243 if (template[i] == ':' || template[i] == '\0') {
245 if (tmpl_cnt>rrd.stat_head->ds_cnt){
246 rrd_set_error("Template contains more DS definitions than RRD");
247 free(updvals); free(pdp_temp);
248 free(tmpl_idx); rrd_free(&rrd);
249 fclose(rrd_file); return(-1);
251 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
252 rrd_set_error("unknown DS name '%s'",dsname);
253 free(updvals); free(pdp_temp);
254 free(tmpl_idx); rrd_free(&rrd);
255 fclose(rrd_file); return(-1);
257 /* the first element is always the time */
258 tmpl_idx[tmpl_cnt-1]++;
259 /* go to the next entry on the template */
260 dsname = &template[i+1];
261 /* fix the damage we did before */
270 if ((pdp_new = malloc(sizeof(rrd_value_t)
271 *rrd.stat_head->ds_cnt))==NULL){
272 rrd_set_error("allocating pdp_new ...");
281 /* loop through the arguments. */
282 for(arg_i=optind+1; arg_i<argc;arg_i++) {
283 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
284 char *step_start = stepper;
286 char *parsetime_error = NULL;
287 enum {atstyle, normal} timesyntax;
288 struct time_value ds_tv;
289 if (stepper == NULL){
290 rrd_set_error("failed duplication argv entry");
298 /* initialize all ds input to unknown except the first one
299 which has always got to be set */
300 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
301 strcpy(stepper,argv[arg_i]);
303 /* separate all ds elements; first must be examined separately
304 due to alternate time syntax */
305 if ((p=strchr(stepper,'@'))!=NULL) {
306 timesyntax = atstyle;
309 } else if ((p=strchr(stepper,':'))!=NULL) {
314 rrd_set_error("expected timestamp not found in data source from %s:...",
320 updvals[tmpl_idx[ii]] = stepper;
322 if (*stepper == ':') {
326 updvals[tmpl_idx[ii]] = stepper+1;
332 if (ii != tmpl_cnt-1) {
333 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
334 tmpl_cnt-1, ii, argv[arg_i]);
339 /* get the time from the reading ... handle N */
340 if (timesyntax == atstyle) {
341 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
342 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
346 if (ds_tv.type == RELATIVE_TO_END_TIME ||
347 ds_tv.type == RELATIVE_TO_START_TIME) {
348 rrd_set_error("specifying time relative to the 'start' "
349 "or 'end' makes no sense here: %s",
355 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
356 } else if (strcmp(updvals[0],"N")==0){
357 current_time = time(NULL);
359 current_time = atol(updvals[0]);
362 if(current_time <= rrd.live_head->last_up){
363 rrd_set_error("illegal attempt to update using time %ld when "
364 "last update time is %ld (minimum one second step)",
365 current_time, rrd.live_head->last_up);
371 /* seek to the beginning of the rra's */
372 if (rra_current != rra_begin) {
373 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
374 rrd_set_error("seek error in rrd");
378 rra_current = rra_begin;
380 rra_start = rra_begin;
382 /* when was the current pdp started */
383 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
384 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
386 /* when did the last pdp_st occur */
387 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
388 occu_pdp_st = current_time - occu_pdp_age;
389 interval = current_time - rrd.live_head->last_up;
391 if (occu_pdp_st > proc_pdp_st){
392 /* OK we passed the pdp_st moment*/
393 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
394 * occurred before the latest
396 post_int = occu_pdp_age; /* how much after it */
410 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
411 occu_pdp_age, occu_pdp_st,
412 interval, pre_int, post_int);
415 /* process the data sources and update the pdp_prep
416 * area accordingly */
417 for(i=0;i<rrd.stat_head->ds_cnt;i++){
419 dst_idx= dst_conv(rrd.ds_def[i].dst);
420 /* NOTE: DST_CDEF should never enter this if block, because
421 * updvals[i+1][0] is initialized to 'U'; unless the caller
422 * accidently specified a value for the DST_CDEF. To handle
423 * this case, an extra check is required. */
424 if((updvals[i+1][0] != 'U') &&
425 (dst_idx != DST_CDEF) &&
426 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
428 /* the data source type defines how to process the data */
429 /* pdp_temp contains rate * time ... eg the bytes
430 * transferred during the interval. Doing it this way saves
431 * a lot of math operations */
437 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
438 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
439 if(dst_idx == DST_COUNTER) {
440 /* simple overflow catcher sugestet by andres kroonmaa */
441 /* this will fail terribly for non 32 or 64 bit counters ... */
442 /* are there any others in SNMP land ? */
443 if (pdp_new[i] < (double)0.0 )
444 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
445 if (pdp_new[i] < (double)0.0 )
446 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
448 rate = pdp_new[i] / interval;
455 pdp_new[i]= atof(updvals[i+1]);
456 rate = pdp_new[i] / interval;
459 pdp_new[i] = atof(updvals[i+1]) * interval;
460 rate = pdp_new[i] / interval;
463 rrd_set_error("rrd contains unknown DS type : '%s'",
467 /* break out of this for loop if the error string is set */
468 if (rrd_test_error()){
471 /* make sure pdp_temp is neither too large or too small
472 * if any of these occur it becomes unknown ...
474 if ( ! isnan(rate) &&
475 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
476 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
477 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
478 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
482 /* no news is news all the same */
486 /* make a copy of the command line argument for the next run */
494 rrd.pdp_prep[i].last_ds,
495 updvals[i+1], pdp_new[i]);
497 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
498 strncpy(rrd.pdp_prep[i].last_ds,
499 updvals[i+1],LAST_DS_LEN-1);
500 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
503 /* break out of the argument parsing loop if the error_string is set */
504 if (rrd_test_error()){
508 /* has a pdp_st moment occurred since the last run ? */
510 if (proc_pdp_st == occu_pdp_st){
511 /* no we have not passed a pdp_st moment. therefore update is simple */
513 for(i=0;i<rrd.stat_head->ds_cnt;i++){
514 if(isnan(pdp_new[i]))
515 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
517 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
524 rrd.pdp_prep[i].scratch[PDP_val].u_val,
525 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
529 /* an pdp_st has occurred. */
531 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
532 * occurred up to the last run.
533 pdp_new[] contains rate*seconds from the latest run.
534 pdp_temp[] will contain the rate for cdp */
536 for(i=0;i<rrd.stat_head->ds_cnt;i++){
537 /* update pdp_prep to the current pdp_st */
538 if(isnan(pdp_new[i]))
539 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
541 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
542 pdp_new[i]/(double)interval*(double)pre_int;
544 /* if too much of the pdp_prep is unknown we dump it */
545 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
546 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
547 (occu_pdp_st-proc_pdp_st <=
548 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
551 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
552 / (double)( occu_pdp_st
554 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
557 /* process CDEF data sources; remember each CDEF DS can
558 * only reference other DS with a lower index number */
559 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
561 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
562 /* substitue data values for OP_VARIABLE nodes */
563 for (ii = 0; rpnp[ii].op != OP_END; ii++)
565 if (rpnp[ii].op == OP_VARIABLE) {
566 rpnp[ii].op = OP_NUMBER;
567 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
570 /* run the rpn calculator */
571 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
573 break; /* exits the data sources pdp_temp loop */
577 /* make pdp_prep ready for the next run */
578 if(isnan(pdp_new[i])){
579 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
580 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
582 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
583 rrd.pdp_prep[i].scratch[PDP_val].u_val =
584 pdp_new[i]/(double)interval*(double)post_int;
592 "new_unkn_sec %5lu\n",
594 rrd.pdp_prep[i].scratch[PDP_val].u_val,
595 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
599 /* if there were errors during the last loop, bail out here */
600 if (rrd_test_error()){
605 /* compute the number of elapsed pdp_st moments */
606 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
608 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
610 if (rra_step_cnt == NULL)
612 rra_step_cnt = (unsigned long *)
613 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
616 for(i = 0, rra_start = rra_begin;
617 i < rrd.stat_head->rra_cnt;
618 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
621 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
622 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
623 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
624 if (start_pdp_offset <= elapsed_pdp_st) {
625 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
626 rrd.rra_def[i].pdp_cnt + 1;
631 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
633 /* If this is a bulk update, we need to skip ahead in the seasonal
634 * arrays so that they will be correct for the next observed value;
635 * note that for the bulk update itself, no update will occur to
636 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
638 if (rra_step_cnt[i] > 2)
640 /* skip update by resetting rra_step_cnt[i],
641 * note that this is not data source specific; this is due
642 * to the bulk update, not a DNAN value for the specific data
645 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
646 &last_seasonal_coef);
647 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
651 /* periodically run a smoother for seasonal effects */
652 /* Need to use first cdp parameter buffer to track
653 * burnin (burnin requires a specific smoothing schedule).
654 * The CDP_init_seasonal parameter is really an RRA level,
655 * not a data source within RRA level parameter, but the rra_def
656 * is read only for rrd_update (not flushed to disk). */
657 iii = i*(rrd.stat_head -> ds_cnt);
658 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
661 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
662 > rrd.rra_def[i].row_cnt - 1) {
663 /* mark off one of the burnin cycles */
664 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
668 /* someone has no doubt invented a trick to deal with this
669 * wrap around, but at least this code is clear. */
670 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
671 rrd.rra_ptr[i].cur_row)
673 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
674 * mapping between PDP and CDP */
675 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
676 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
680 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
681 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
682 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
687 /* can't rely on negative numbers because we are working with
689 /* Don't need modulus here. If we've wrapped more than once, only
690 * one smooth is executed at the end. */
691 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
692 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
693 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
697 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
698 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
699 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
706 rra_current = ftell(rrd_file);
707 } /* if cf is DEVSEASONAL or SEASONAL */
709 if (rrd_test_error()) break;
711 /* update CDP_PREP areas */
712 /* loop over data soures within each RRA */
714 ii < rrd.stat_head->ds_cnt;
718 /* iii indexes the CDP prep area for this data source within the RRA */
719 iii=i*rrd.stat_head->ds_cnt+ii;
721 if (rrd.rra_def[i].pdp_cnt > 1) {
723 if (rra_step_cnt[i] > 0) {
724 /* If we are in this block, as least 1 CDP value will be written to
725 * disk, this is the CDP_primary_val entry. If more than 1 value needs
726 * to be written, then the "fill in" value is the CDP_secondary_val
728 if (isnan(pdp_temp[ii]))
730 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
731 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
733 /* CDP_secondary value is the RRA "fill in" value for intermediary
734 * CDP data entries. No matter the CF, the value is the same because
735 * the average, max, min, and last of a list of identical values is
736 * the same, namely, the value itself. */
737 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
740 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
741 > rrd.rra_def[i].pdp_cnt*
742 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
744 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
745 /* initialize carry over */
746 if (current_cf == CF_AVERAGE) {
747 if (isnan(pdp_temp[ii])) {
748 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
750 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
751 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
754 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
757 rrd_value_t cum_val, cur_val;
758 switch (current_cf) {
760 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
761 cur_val = IFDNAN(pdp_temp[ii],0.0);
762 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
763 (cum_val + cur_val) /
764 (rrd.rra_def[i].pdp_cnt
765 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
766 /* initialize carry over value */
767 if (isnan(pdp_temp[ii])) {
768 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
770 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
771 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
775 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
776 cur_val = IFDNAN(pdp_temp[ii],-DINF);
778 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
779 isnan(pdp_temp[ii])) {
781 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
786 if (cur_val > cum_val)
787 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
789 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
790 /* initialize carry over value */
791 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
794 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
795 cur_val = IFDNAN(pdp_temp[ii],DINF);
797 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
798 isnan(pdp_temp[ii])) {
800 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
805 if (cur_val < cum_val)
806 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
808 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
809 /* initialize carry over value */
810 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
814 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
815 /* initialize carry over value */
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
819 } /* endif meets xff value requirement for a valid value */
820 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
821 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
822 if (isnan(pdp_temp[ii]))
823 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
824 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
826 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
827 } else /* rra_step_cnt[i] == 0 */
830 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
831 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
834 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
835 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
838 if (isnan(pdp_temp[ii])) {
839 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
840 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
842 if (current_cf == CF_AVERAGE) {
843 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
846 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
849 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
850 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
853 switch (current_cf) {
855 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
859 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
860 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
863 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
864 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
868 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
873 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
874 if (elapsed_pdp_st > 2)
876 switch (current_cf) {
879 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
880 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
884 /* need to update cached seasonal values, so they are consistent
885 * with the bulk update */
886 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
887 * CDP_last_deviation are the same. */
888 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
889 last_seasonal_coef[ii];
890 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
894 /* need to update the null_count and last_null_count.
895 * even do this for non-DNAN pdp_temp because the
896 * algorithm is not learning from batch updates. */
897 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
899 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
903 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
904 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
907 /* do not count missed bulk values as failures */
908 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
909 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
910 /* need to reset violations buffer.
911 * could do this more carefully, but for now, just
912 * assume a bulk update wipes away all violations. */
913 erase_violations(&rrd, iii, i);
917 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
919 if (rrd_test_error()) break;
921 } /* endif data sources loop */
924 /* this loop is only entered if elapsed_pdp_st < 3 */
925 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
926 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
928 for(i = 0, rra_start = rra_begin;
929 i < rrd.stat_head->rra_cnt;
930 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
933 if (rrd.rra_def[i].pdp_cnt > 1) continue;
935 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
938 lookup_seasonal(&rrd,i,rra_start,rrd_file,
939 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
941 rra_current = ftell(rrd_file);
943 if (rrd_test_error()) break;
944 /* loop over data soures within each RRA */
946 ii < rrd.stat_head->ds_cnt;
949 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
950 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
951 scratch_idx, seasonal_coef);
954 if (rrd_test_error()) break;
955 } /* end elapsed_pdp_st loop */
957 if (rrd_test_error()) break;
959 /* Ready to write to disk */
960 /* Move sequentially through the file, writing one RRA at a time.
961 * Note this architecture divorces the computation of CDP with
962 * flushing updated RRA entries to disk. */
963 for(i = 0, rra_start = rra_begin;
964 i < rrd.stat_head->rra_cnt;
965 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
967 /* is there anything to write for this RRA? If not, continue. */
968 if (rra_step_cnt[i] == 0) continue;
970 /* write the first row */
972 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
974 rrd.rra_ptr[i].cur_row++;
975 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
976 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
977 /* positition on the first row */
978 rra_pos_tmp = rra_start +
979 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
980 if(rra_pos_tmp != rra_current) {
981 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
982 rrd_set_error("seek error in rrd");
985 rra_current = rra_pos_tmp;
989 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
991 scratch_idx = CDP_primary_val;
992 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
993 if (rrd_test_error()) break;
995 /* write other rows of the bulk update, if any */
996 scratch_idx = CDP_secondary_val;
997 for ( ; rra_step_cnt[i] > 1;
998 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1000 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1003 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1004 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1007 rrd.rra_ptr[i].cur_row = 0;
1008 /* seek back to beginning of current rra */
1009 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1011 rrd_set_error("seek error in rrd");
1015 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1017 rra_current = rra_start;
1019 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1022 if (rrd_test_error())
1026 /* break out of the argument parsing loop if error_string is set */
1027 if (rrd_test_error()){
1032 } /* endif a pdp_st has occurred */
1033 rrd.live_head->last_up = current_time;
1035 } /* function argument loop */
1037 if (seasonal_coef != NULL) free(seasonal_coef);
1038 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1039 if (rra_step_cnt != NULL) free(rra_step_cnt);
1040 rpnstack_free(&rpnstack);
1042 /* if we got here and if there is an error and if the file has not been
1043 * written to, then close things up and return. */
1044 if (rrd_test_error()) {
1054 /* aargh ... that was tough ... so many loops ... anyway, its done.
1055 * we just need to write back the live header portion now*/
1057 if (fseek(rrd_file, (sizeof(stat_head_t)
1058 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1059 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1061 rrd_set_error("seek rrd for live header writeback");
1071 if(fwrite( rrd.live_head,
1072 sizeof(live_head_t), 1, rrd_file) != 1){
1073 rrd_set_error("fwrite live_head to rrd");
1083 if(fwrite( rrd.pdp_prep,
1085 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1086 rrd_set_error("ftwrite pdp_prep to rrd");
1096 if(fwrite( rrd.cdp_prep,
1098 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1099 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1101 rrd_set_error("ftwrite cdp_prep to rrd");
1111 if(fwrite( rrd.rra_ptr,
1113 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1114 rrd_set_error("fwrite rra_ptr to rrd");
1124 /* OK now close the files and free the memory */
1125 if(fclose(rrd_file) != 0){
1126 rrd_set_error("closing rrd");
1135 /* calling the smoothing code here guarantees at most
1136 * one smoothing operation per rrd_update call. Unfortunately,
1137 * it is possible with bulk updates, or a long-delayed update
1138 * for smoothing to occur off-schedule. This really isn't
1139 * critical except during the burning cycles. */
1140 if (schedule_smooth)
1143 rrd_file = fopen(argv[optind],"r+");
1145 rrd_file = fopen(argv[optind],"rb+");
1147 rra_start = rra_begin;
1148 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1150 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1151 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1154 fprintf(stderr,"Running smoother for rra %ld\n",i);
1156 apply_smoother(&rrd,i,rra_start,rrd_file);
1157 if (rrd_test_error())
1160 rra_start += rrd.rra_def[i].row_cnt
1161 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1174 * get exclusive lock to whole file.
1175 * lock gets removed when we close the file
1177 * returns 0 on success
1180 LockRRD(FILE *rrdfile)
1182 int rrd_fd; /* File descriptor for RRD */
1185 rrd_fd = fileno(rrdfile);
1190 lock.l_type = F_WRLCK; /* exclusive write lock */
1191 lock.l_len = 0; /* whole file */
1192 lock.l_start = 0; /* start of file */
1193 lock.l_whence = SEEK_SET; /* end of file */
1195 stat = fcntl(rrd_fd, F_SETLK, &lock);
1199 if ( _fstat( rrd_fd, &st ) == 0 ) {
1200 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1212 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1213 unsigned short CDP_scratch_idx, FILE *rrd_file)
1215 unsigned long ds_idx, cdp_idx;
1217 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1219 /* compute the cdp index */
1220 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1222 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1223 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1224 rrd -> rra_def[rra_idx].cf_nam);
1227 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1228 sizeof(rrd_value_t),1,rrd_file) != 1)
1230 rrd_set_error("writing rrd");
1233 *rra_current += sizeof(rrd_value_t);