1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.6 2002/02/01 20:34:49 oetiker
9 * fixed version number and date/time
11 * Revision 1.5 2001/05/09 05:31:01 oetiker
12 * Bug fix: when update of multiple PDP/CDP RRAs coincided
13 * with interpolation of multiple PDPs an incorrect value was
14 * stored as the CDP. Especially evident for GAUGE data sources.
15 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
17 * Revision 1.4 2001/03/10 23:54:41 oetiker
18 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
19 * parser and calculator from rrd_graph and puts then in a new file,
20 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
21 * clean-up of aberrant behavior stuff, including a bug fix.
22 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
23 * -- Jake Brutlag <jakeb@corp.webtv.net>
25 * Revision 1.3 2001/03/04 13:01:55 oetiker
26 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
27 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
28 * This is backwards compatible! But new files using the Aberrant stuff are not readable
29 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
30 * -- Jake Brutlag <jakeb@corp.webtv.net>
32 * Revision 1.2 2001/03/04 11:14:25 oetiker
33 * added at-style-time@value:value syntax to rrd_update
34 * -- Dave Bodenstab <imdave@mcs.net>
36 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
39 *****************************************************************************/
42 #include <sys/types.h>
45 #include "rrd_rpncalc.h"
48 #include <sys/locking.h>
53 /* Local prototypes */
54 int LockRRD(FILE *rrd_file);
55 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
56 unsigned short CDP_scratch_idx, FILE *rrd_file);
58 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
63 main(int argc, char **argv){
64 rrd_update(argc,argv);
65 if (rrd_test_error()) {
66 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
67 "Usage: rrdupdate filename\n"
68 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
69 "\t\t\ttime|N:value[:value...]\n\n"
70 "\t\t\tat-time@value[:value...]\n\n"
71 "\t\t\t[ time:value[:value...] ..]\n\n");
73 printf("ERROR: %s\n",rrd_get_error());
82 rrd_update(int argc, char **argv)
89 unsigned long rra_begin; /* byte pointer to the rra
90 * area in the rrd file. this
91 * pointer never changes value */
92 unsigned long rra_start; /* byte pointer to the rra
93 * area in the rrd file. this
94 * pointer changes as each rrd is
96 unsigned long rra_current; /* byte pointer to the current write
97 * spot in the rrd file. */
98 unsigned long rra_pos_tmp; /* temporary byte pointer. */
99 unsigned long interval,
100 pre_int,post_int; /* interval between this and
102 unsigned long proc_pdp_st; /* which pdp_st was the last
104 unsigned long occu_pdp_st; /* when was the pdp_st
105 * before the last update
107 unsigned long proc_pdp_age; /* how old was the data in
108 * the pdp prep area when it
109 * was last updated */
110 unsigned long occu_pdp_age; /* how long ago was the last
112 rrd_value_t *pdp_new; /* prepare the incoming data
113 * to be added the the
115 rrd_value_t *pdp_temp; /* prepare the pdp values
116 * to be added the the
119 long *tmpl_idx; /* index representing the settings
120 transported by the template index */
121 long tmpl_cnt = 2; /* time and data */
125 time_t current_time = time(NULL);
127 int schedule_smooth = 0;
128 char *template = NULL;
129 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
130 /* a vector of future Holt-Winters seasonal coefs */
131 unsigned long elapsed_pdp_st;
132 /* number of elapsed PDP steps since last update */
133 unsigned long *rra_step_cnt = NULL;
134 /* number of rows to be updated in an RRA for a data
136 unsigned long start_pdp_offset;
137 /* number of PDP steps since the last update that
138 * are assigned to the first CDP to be generated
139 * since the last update. */
140 unsigned short scratch_idx;
141 /* index into the CDP scratch array */
142 enum cf_en current_cf;
143 /* numeric id of the current consolidation function */
144 rpnstack_t rpnstack; /* used for COMPUTE DS */
146 rpnstack_init(&rpnstack);
149 static struct option long_options[] =
151 {"template", required_argument, 0, 't'},
154 int option_index = 0;
156 opt = getopt_long(argc, argv, "t:",
157 long_options, &option_index);
168 rrd_set_error("unknown option '%s'",argv[optind-1]);
174 /* need at least 2 arguments: filename, data. */
175 if (argc-optind < 2) {
176 rrd_set_error("Not enough arguments");
180 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
183 rra_current = rra_start = rra_begin = ftell(rrd_file);
184 /* This is defined in the ANSI C standard, section 7.9.5.3:
186 When a file is opened with udpate mode ('+' as the second
187 or third character in the ... list of mode argument
188 variables), both input and ouptut may be performed on the
189 associated stream. However, ... input may not be directly
190 followed by output without an intervening call to a file
191 positioning function, unless the input oepration encounters
193 fseek(rrd_file, 0, SEEK_CUR);
196 /* get exclusive lock to whole file.
197 * lock gets removed when we close the file.
199 if (LockRRD(rrd_file) != 0) {
200 rrd_set_error("could not lock RRD");
206 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
207 rrd_set_error("allocating updvals pointer array");
213 if ((pdp_temp = malloc(sizeof(rrd_value_t)
214 *rrd.stat_head->ds_cnt))==NULL){
215 rrd_set_error("allocating pdp_temp ...");
222 if ((tmpl_idx = malloc(sizeof(unsigned long)
223 *(rrd.stat_head->ds_cnt+1)))==NULL){
224 rrd_set_error("allocating tmpl_idx ...");
231 /* initialize template redirector */
232 /* default config example (assume DS 1 is a CDEF DS)
233 tmpl_idx[0] -> 0; (time)
234 tmpl_idx[1] -> 1; (DS 0)
235 tmpl_idx[2] -> 3; (DS 2)
236 tmpl_idx[3] -> 4; (DS 3) */
237 tmpl_idx[0] = 0; /* time */
238 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
240 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
249 tmpl_cnt = 1; /* the first entry is the time */
250 tmpl_len = strlen(template);
251 for(i=0;i<=tmpl_len ;i++) {
252 if (template[i] == ':' || template[i] == '\0') {
254 if (tmpl_cnt>rrd.stat_head->ds_cnt){
255 rrd_set_error("Template contains more DS definitions than RRD");
256 free(updvals); free(pdp_temp);
257 free(tmpl_idx); rrd_free(&rrd);
258 fclose(rrd_file); return(-1);
260 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
261 rrd_set_error("unknown DS name '%s'",dsname);
262 free(updvals); free(pdp_temp);
263 free(tmpl_idx); rrd_free(&rrd);
264 fclose(rrd_file); return(-1);
266 /* the first element is always the time */
267 tmpl_idx[tmpl_cnt-1]++;
268 /* go to the next entry on the template */
269 dsname = &template[i+1];
270 /* fix the damage we did before */
279 if ((pdp_new = malloc(sizeof(rrd_value_t)
280 *rrd.stat_head->ds_cnt))==NULL){
281 rrd_set_error("allocating pdp_new ...");
290 /* loop through the arguments. */
291 for(arg_i=optind+1; arg_i<argc;arg_i++) {
292 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
293 char *step_start = stepper;
295 char *parsetime_error = NULL;
296 enum {atstyle, normal} timesyntax;
297 struct time_value ds_tv;
298 if (stepper == NULL){
299 rrd_set_error("failed duplication argv entry");
307 /* initialize all ds input to unknown except the first one
308 which has always got to be set */
309 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
310 strcpy(stepper,argv[arg_i]);
312 /* separate all ds elements; first must be examined separately
313 due to alternate time syntax */
314 if ((p=strchr(stepper,'@'))!=NULL) {
315 timesyntax = atstyle;
318 } else if ((p=strchr(stepper,':'))!=NULL) {
323 rrd_set_error("expected timestamp not found in data source from %s:...",
329 updvals[tmpl_idx[ii]] = stepper;
331 if (*stepper == ':') {
335 updvals[tmpl_idx[ii]] = stepper+1;
341 if (ii != tmpl_cnt-1) {
342 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
343 tmpl_cnt-1, ii, argv[arg_i]);
348 /* get the time from the reading ... handle N */
349 if (timesyntax == atstyle) {
350 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
351 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
355 if (ds_tv.type == RELATIVE_TO_END_TIME ||
356 ds_tv.type == RELATIVE_TO_START_TIME) {
357 rrd_set_error("specifying time relative to the 'start' "
358 "or 'end' makes no sense here: %s",
364 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
365 } else if (strcmp(updvals[0],"N")==0){
366 current_time = time(NULL);
368 current_time = atol(updvals[0]);
371 if(current_time <= rrd.live_head->last_up){
372 rrd_set_error("illegal attempt to update using time %ld when "
373 "last update time is %ld (minimum one second step)",
374 current_time, rrd.live_head->last_up);
380 /* seek to the beginning of the rra's */
381 if (rra_current != rra_begin) {
382 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
383 rrd_set_error("seek error in rrd");
387 rra_current = rra_begin;
389 rra_start = rra_begin;
391 /* when was the current pdp started */
392 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
393 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
395 /* when did the last pdp_st occur */
396 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
397 occu_pdp_st = current_time - occu_pdp_age;
398 interval = current_time - rrd.live_head->last_up;
400 if (occu_pdp_st > proc_pdp_st){
401 /* OK we passed the pdp_st moment*/
402 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
403 * occurred before the latest
405 post_int = occu_pdp_age; /* how much after it */
419 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
420 occu_pdp_age, occu_pdp_st,
421 interval, pre_int, post_int);
424 /* process the data sources and update the pdp_prep
425 * area accordingly */
426 for(i=0;i<rrd.stat_head->ds_cnt;i++){
428 dst_idx= dst_conv(rrd.ds_def[i].dst);
429 /* NOTE: DST_CDEF should never enter this if block, because
430 * updvals[i+1][0] is initialized to 'U'; unless the caller
431 * accidently specified a value for the DST_CDEF. To handle
432 * this case, an extra check is required. */
433 if((updvals[i+1][0] != 'U') &&
434 (dst_idx != DST_CDEF) &&
435 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
437 /* the data source type defines how to process the data */
438 /* pdp_new contains rate * time ... eg the bytes
439 * transferred during the interval. Doing it this way saves
440 * a lot of math operations */
446 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
447 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
448 if(dst_idx == DST_COUNTER) {
449 /* simple overflow catcher sugestet by andres kroonmaa */
450 /* this will fail terribly for non 32 or 64 bit counters ... */
451 /* are there any others in SNMP land ? */
452 if (pdp_new[i] < (double)0.0 )
453 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
454 if (pdp_new[i] < (double)0.0 )
455 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
457 rate = pdp_new[i] / interval;
464 pdp_new[i]= atof(updvals[i+1]);
465 rate = pdp_new[i] / interval;
468 pdp_new[i] = atof(updvals[i+1]) * interval;
469 rate = pdp_new[i] / interval;
472 rrd_set_error("rrd contains unknown DS type : '%s'",
476 /* break out of this for loop if the error string is set */
477 if (rrd_test_error()){
480 /* make sure pdp_temp is neither too large or too small
481 * if any of these occur it becomes unknown ...
483 if ( ! isnan(rate) &&
484 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
485 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
486 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
487 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
491 /* no news is news all the same */
495 /* make a copy of the command line argument for the next run */
503 rrd.pdp_prep[i].last_ds,
504 updvals[i+1], pdp_new[i]);
506 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
507 strncpy(rrd.pdp_prep[i].last_ds,
508 updvals[i+1],LAST_DS_LEN-1);
509 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
512 /* break out of the argument parsing loop if the error_string is set */
513 if (rrd_test_error()){
517 /* has a pdp_st moment occurred since the last run ? */
519 if (proc_pdp_st == occu_pdp_st){
520 /* no we have not passed a pdp_st moment. therefore update is simple */
522 for(i=0;i<rrd.stat_head->ds_cnt;i++){
523 if(isnan(pdp_new[i]))
524 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
526 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
533 rrd.pdp_prep[i].scratch[PDP_val].u_val,
534 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
538 /* an pdp_st has occurred. */
540 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
541 * occurred up to the last run.
542 pdp_new[] contains rate*seconds from the latest run.
543 pdp_temp[] will contain the rate for cdp */
545 for(i=0;i<rrd.stat_head->ds_cnt;i++){
546 /* update pdp_prep to the current pdp_st */
547 if(isnan(pdp_new[i]))
548 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
550 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
551 pdp_new[i]/(double)interval*(double)pre_int;
553 /* if too much of the pdp_prep is unknown we dump it */
554 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
555 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
556 (occu_pdp_st-proc_pdp_st <=
557 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
560 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
561 / (double)( occu_pdp_st
563 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
566 /* process CDEF data sources; remember each CDEF DS can
567 * only reference other DS with a lower index number */
568 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
570 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
571 /* substitue data values for OP_VARIABLE nodes */
572 for (ii = 0; rpnp[ii].op != OP_END; ii++)
574 if (rpnp[ii].op == OP_VARIABLE) {
575 rpnp[ii].op = OP_NUMBER;
576 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
579 /* run the rpn calculator */
580 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
582 break; /* exits the data sources pdp_temp loop */
586 /* make pdp_prep ready for the next run */
587 if(isnan(pdp_new[i])){
588 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
589 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
591 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
592 rrd.pdp_prep[i].scratch[PDP_val].u_val =
593 pdp_new[i]/(double)interval*(double)post_int;
601 "new_unkn_sec %5lu\n",
603 rrd.pdp_prep[i].scratch[PDP_val].u_val,
604 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
608 /* if there were errors during the last loop, bail out here */
609 if (rrd_test_error()){
614 /* compute the number of elapsed pdp_st moments */
615 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
617 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
619 if (rra_step_cnt == NULL)
621 rra_step_cnt = (unsigned long *)
622 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
625 for(i = 0, rra_start = rra_begin;
626 i < rrd.stat_head->rra_cnt;
627 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
630 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
631 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
632 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
633 if (start_pdp_offset <= elapsed_pdp_st) {
634 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
635 rrd.rra_def[i].pdp_cnt + 1;
640 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
642 /* If this is a bulk update, we need to skip ahead in the seasonal
643 * arrays so that they will be correct for the next observed value;
644 * note that for the bulk update itself, no update will occur to
645 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
647 if (rra_step_cnt[i] > 2)
649 /* skip update by resetting rra_step_cnt[i],
650 * note that this is not data source specific; this is due
651 * to the bulk update, not a DNAN value for the specific data
654 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
655 &last_seasonal_coef);
656 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
660 /* periodically run a smoother for seasonal effects */
661 /* Need to use first cdp parameter buffer to track
662 * burnin (burnin requires a specific smoothing schedule).
663 * The CDP_init_seasonal parameter is really an RRA level,
664 * not a data source within RRA level parameter, but the rra_def
665 * is read only for rrd_update (not flushed to disk). */
666 iii = i*(rrd.stat_head -> ds_cnt);
667 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
670 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
671 > rrd.rra_def[i].row_cnt - 1) {
672 /* mark off one of the burnin cycles */
673 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
677 /* someone has no doubt invented a trick to deal with this
678 * wrap around, but at least this code is clear. */
679 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
680 rrd.rra_ptr[i].cur_row)
682 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
683 * mapping between PDP and CDP */
684 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
685 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
689 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
690 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
691 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
696 /* can't rely on negative numbers because we are working with
698 /* Don't need modulus here. If we've wrapped more than once, only
699 * one smooth is executed at the end. */
700 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
701 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
702 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
706 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
707 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
708 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
715 rra_current = ftell(rrd_file);
716 } /* if cf is DEVSEASONAL or SEASONAL */
718 if (rrd_test_error()) break;
720 /* update CDP_PREP areas */
721 /* loop over data soures within each RRA */
723 ii < rrd.stat_head->ds_cnt;
727 /* iii indexes the CDP prep area for this data source within the RRA */
728 iii=i*rrd.stat_head->ds_cnt+ii;
730 if (rrd.rra_def[i].pdp_cnt > 1) {
732 if (rra_step_cnt[i] > 0) {
733 /* If we are in this block, as least 1 CDP value will be written to
734 * disk, this is the CDP_primary_val entry. If more than 1 value needs
735 * to be written, then the "fill in" value is the CDP_secondary_val
737 if (isnan(pdp_temp[ii]))
739 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
740 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
742 /* CDP_secondary value is the RRA "fill in" value for intermediary
743 * CDP data entries. No matter the CF, the value is the same because
744 * the average, max, min, and last of a list of identical values is
745 * the same, namely, the value itself. */
746 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
749 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
750 > rrd.rra_def[i].pdp_cnt*
751 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
753 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
754 /* initialize carry over */
755 if (current_cf == CF_AVERAGE) {
756 if (isnan(pdp_temp[ii])) {
757 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
759 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
760 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
763 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
766 rrd_value_t cum_val, cur_val;
767 switch (current_cf) {
769 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
770 cur_val = IFDNAN(pdp_temp[ii],0.0);
771 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
772 (cum_val + cur_val * start_pdp_offset) /
773 (rrd.rra_def[i].pdp_cnt
774 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
775 /* initialize carry over value */
776 if (isnan(pdp_temp[ii])) {
777 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
779 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
780 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
784 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
785 cur_val = IFDNAN(pdp_temp[ii],-DINF);
787 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
788 isnan(pdp_temp[ii])) {
790 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
795 if (cur_val > cum_val)
796 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
798 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
799 /* initialize carry over value */
800 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
803 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
804 cur_val = IFDNAN(pdp_temp[ii],DINF);
806 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
807 isnan(pdp_temp[ii])) {
809 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
814 if (cur_val < cum_val)
815 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
817 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
818 /* initialize carry over value */
819 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
823 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
824 /* initialize carry over value */
825 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
828 } /* endif meets xff value requirement for a valid value */
829 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
830 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
831 if (isnan(pdp_temp[ii]))
832 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
833 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
835 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
836 } else /* rra_step_cnt[i] == 0 */
839 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
840 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
843 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
844 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
847 if (isnan(pdp_temp[ii])) {
848 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
849 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
851 if (current_cf == CF_AVERAGE) {
852 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
855 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
858 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
859 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
862 switch (current_cf) {
864 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
868 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
869 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
872 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
873 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
877 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
882 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
883 if (elapsed_pdp_st > 2)
885 switch (current_cf) {
888 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
889 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
893 /* need to update cached seasonal values, so they are consistent
894 * with the bulk update */
895 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
896 * CDP_last_deviation are the same. */
897 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
898 last_seasonal_coef[ii];
899 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
903 /* need to update the null_count and last_null_count.
904 * even do this for non-DNAN pdp_temp because the
905 * algorithm is not learning from batch updates. */
906 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
908 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
912 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
913 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
916 /* do not count missed bulk values as failures */
917 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
918 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
919 /* need to reset violations buffer.
920 * could do this more carefully, but for now, just
921 * assume a bulk update wipes away all violations. */
922 erase_violations(&rrd, iii, i);
926 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
928 if (rrd_test_error()) break;
930 } /* endif data sources loop */
933 /* this loop is only entered if elapsed_pdp_st < 3 */
934 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
935 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
937 for(i = 0, rra_start = rra_begin;
938 i < rrd.stat_head->rra_cnt;
939 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
942 if (rrd.rra_def[i].pdp_cnt > 1) continue;
944 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
945 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
947 lookup_seasonal(&rrd,i,rra_start,rrd_file,
948 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
950 rra_current = ftell(rrd_file);
952 if (rrd_test_error()) break;
953 /* loop over data soures within each RRA */
955 ii < rrd.stat_head->ds_cnt;
958 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
959 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
960 scratch_idx, seasonal_coef);
963 if (rrd_test_error()) break;
964 } /* end elapsed_pdp_st loop */
966 if (rrd_test_error()) break;
968 /* Ready to write to disk */
969 /* Move sequentially through the file, writing one RRA at a time.
970 * Note this architecture divorces the computation of CDP with
971 * flushing updated RRA entries to disk. */
972 for(i = 0, rra_start = rra_begin;
973 i < rrd.stat_head->rra_cnt;
974 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
976 /* is there anything to write for this RRA? If not, continue. */
977 if (rra_step_cnt[i] == 0) continue;
979 /* write the first row */
981 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
983 rrd.rra_ptr[i].cur_row++;
984 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
985 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
986 /* positition on the first row */
987 rra_pos_tmp = rra_start +
988 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
989 if(rra_pos_tmp != rra_current) {
990 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
991 rrd_set_error("seek error in rrd");
994 rra_current = rra_pos_tmp;
998 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1000 scratch_idx = CDP_primary_val;
1001 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1002 if (rrd_test_error()) break;
1004 /* write other rows of the bulk update, if any */
1005 scratch_idx = CDP_secondary_val;
1006 for ( ; rra_step_cnt[i] > 1;
1007 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1009 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1012 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1013 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1016 rrd.rra_ptr[i].cur_row = 0;
1017 /* seek back to beginning of current rra */
1018 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1020 rrd_set_error("seek error in rrd");
1024 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1026 rra_current = rra_start;
1028 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1031 if (rrd_test_error())
1035 /* break out of the argument parsing loop if error_string is set */
1036 if (rrd_test_error()){
1041 } /* endif a pdp_st has occurred */
1042 rrd.live_head->last_up = current_time;
1044 } /* function argument loop */
1046 if (seasonal_coef != NULL) free(seasonal_coef);
1047 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1048 if (rra_step_cnt != NULL) free(rra_step_cnt);
1049 rpnstack_free(&rpnstack);
1051 /* if we got here and if there is an error and if the file has not been
1052 * written to, then close things up and return. */
1053 if (rrd_test_error()) {
1063 /* aargh ... that was tough ... so many loops ... anyway, its done.
1064 * we just need to write back the live header portion now*/
1066 if (fseek(rrd_file, (sizeof(stat_head_t)
1067 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1068 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1070 rrd_set_error("seek rrd for live header writeback");
1080 if(fwrite( rrd.live_head,
1081 sizeof(live_head_t), 1, rrd_file) != 1){
1082 rrd_set_error("fwrite live_head to rrd");
1092 if(fwrite( rrd.pdp_prep,
1094 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1095 rrd_set_error("ftwrite pdp_prep to rrd");
1105 if(fwrite( rrd.cdp_prep,
1107 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1108 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1110 rrd_set_error("ftwrite cdp_prep to rrd");
1120 if(fwrite( rrd.rra_ptr,
1122 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1123 rrd_set_error("fwrite rra_ptr to rrd");
1133 /* OK now close the files and free the memory */
1134 if(fclose(rrd_file) != 0){
1135 rrd_set_error("closing rrd");
1144 /* calling the smoothing code here guarantees at most
1145 * one smoothing operation per rrd_update call. Unfortunately,
1146 * it is possible with bulk updates, or a long-delayed update
1147 * for smoothing to occur off-schedule. This really isn't
1148 * critical except during the burning cycles. */
1149 if (schedule_smooth)
1152 rrd_file = fopen(argv[optind],"r+");
1154 rrd_file = fopen(argv[optind],"rb+");
1156 rra_start = rra_begin;
1157 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1159 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1160 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1163 fprintf(stderr,"Running smoother for rra %ld\n",i);
1165 apply_smoother(&rrd,i,rra_start,rrd_file);
1166 if (rrd_test_error())
1169 rra_start += rrd.rra_def[i].row_cnt
1170 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1183 * get exclusive lock to whole file.
1184 * lock gets removed when we close the file
1186 * returns 0 on success
1189 LockRRD(FILE *rrdfile)
1191 int rrd_fd; /* File descriptor for RRD */
1194 rrd_fd = fileno(rrdfile);
1199 lock.l_type = F_WRLCK; /* exclusive write lock */
1200 lock.l_len = 0; /* whole file */
1201 lock.l_start = 0; /* start of file */
1202 lock.l_whence = SEEK_SET; /* end of file */
1204 stat = fcntl(rrd_fd, F_SETLK, &lock);
1208 if ( _fstat( rrd_fd, &st ) == 0 ) {
1209 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1221 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1222 unsigned short CDP_scratch_idx, FILE *rrd_file)
1224 unsigned long ds_idx, cdp_idx;
1226 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1228 /* compute the cdp index */
1229 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1231 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1232 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1233 rrd -> rra_def[rra_idx].cf_nam);
1236 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1237 sizeof(rrd_value_t),1,rrd_file) != 1)
1239 rrd_set_error("writing rrd");
1242 *rra_current += sizeof(rrd_value_t);