1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.5 2001/05/09 05:31:01 oetiker
9 * Bug fix: when update of multiple PDP/CDP RRAs coincided
10 * with interpolation of multiple PDPs an incorrect value was
11 * stored as the CDP. Especially evident for GAUGE data sources.
12 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
14 * Revision 1.4 2001/03/10 23:54:41 oetiker
15 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
16 * parser and calculator from rrd_graph and puts then in a new file,
17 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
18 * clean-up of aberrant behavior stuff, including a bug fix.
19 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
20 * -- Jake Brutlag <jakeb@corp.webtv.net>
22 * Revision 1.3 2001/03/04 13:01:55 oetiker
23 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
24 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
25 * This is backwards compatible! But new files using the Aberrant stuff are not readable
26 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
27 * -- Jake Brutlag <jakeb@corp.webtv.net>
29 * Revision 1.2 2001/03/04 11:14:25 oetiker
30 * added at-style-time@value:value syntax to rrd_update
31 * -- Dave Bodenstab <imdave@mcs.net>
33 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
36 *****************************************************************************/
39 #include <sys/types.h>
42 #include "rrd_rpncalc.h"
45 #include <sys/locking.h>
50 /* Local prototypes */
51 int LockRRD(FILE *rrd_file);
52 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
53 unsigned short CDP_scratch_idx, FILE *rrd_file);
55 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
60 main(int argc, char **argv){
61 rrd_update(argc,argv);
62 if (rrd_test_error()) {
63 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
64 "Usage: rrdupdate filename\n"
65 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
66 "\t\t\ttime|N:value[:value...]\n\n"
67 "\t\t\tat-time@value[:value...]\n\n"
68 "\t\t\t[ time:value[:value...] ..]\n\n");
70 printf("ERROR: %s\n",rrd_get_error());
79 rrd_update(int argc, char **argv)
86 unsigned long rra_begin; /* byte pointer to the rra
87 * area in the rrd file. this
88 * pointer never changes value */
89 unsigned long rra_start; /* byte pointer to the rra
90 * area in the rrd file. this
91 * pointer changes as each rrd is
93 unsigned long rra_current; /* byte pointer to the current write
94 * spot in the rrd file. */
95 unsigned long rra_pos_tmp; /* temporary byte pointer. */
96 unsigned long interval,
97 pre_int,post_int; /* interval between this and
99 unsigned long proc_pdp_st; /* which pdp_st was the last
101 unsigned long occu_pdp_st; /* when was the pdp_st
102 * before the last update
104 unsigned long proc_pdp_age; /* how old was the data in
105 * the pdp prep area when it
106 * was last updated */
107 unsigned long occu_pdp_age; /* how long ago was the last
109 rrd_value_t *pdp_new; /* prepare the incoming data
110 * to be added the the
112 rrd_value_t *pdp_temp; /* prepare the pdp values
113 * to be added the the
116 long *tmpl_idx; /* index representing the settings
117 transported by the template index */
118 long tmpl_cnt = 2; /* time and data */
122 time_t current_time = time(NULL);
124 int schedule_smooth = 0;
125 char *template = NULL;
126 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
127 /* a vector of future Holt-Winters seasonal coefs */
128 unsigned long elapsed_pdp_st;
129 /* number of elapsed PDP steps since last update */
130 unsigned long *rra_step_cnt = NULL;
131 /* number of rows to be updated in an RRA for a data
133 unsigned long start_pdp_offset;
134 /* number of PDP steps since the last update that
135 * are assigned to the first CDP to be generated
136 * since the last update. */
137 unsigned short scratch_idx;
138 /* index into the CDP scratch array */
139 enum cf_en current_cf;
140 /* numeric id of the current consolidation function */
141 rpnstack_t rpnstack; /* used for COMPUTE DS */
143 rpnstack_init(&rpnstack);
146 static struct option long_options[] =
148 {"template", required_argument, 0, 't'},
151 int option_index = 0;
153 opt = getopt_long(argc, argv, "t:",
154 long_options, &option_index);
165 rrd_set_error("unknown option '%s'",argv[optind-1]);
171 /* need at least 2 arguments: filename, data. */
172 if (argc-optind < 2) {
173 rrd_set_error("Not enough arguments");
177 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
180 rra_current = rra_start = rra_begin = ftell(rrd_file);
181 /* This is defined in the ANSI C standard, section 7.9.5.3:
183 When a file is opened with udpate mode ('+' as the second
184 or third character in the ... list of mode argument
185 variables), both input and ouptut may be performed on the
186 associated stream. However, ... input may not be directly
187 followed by output without an intervening call to a file
188 positioning function, unless the input oepration encounters
190 fseek(rrd_file, 0, SEEK_CUR);
193 /* get exclusive lock to whole file.
194 * lock gets removed when we close the file.
196 if (LockRRD(rrd_file) != 0) {
197 rrd_set_error("could not lock RRD");
203 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
204 rrd_set_error("allocating updvals pointer array");
210 if ((pdp_temp = malloc(sizeof(rrd_value_t)
211 *rrd.stat_head->ds_cnt))==NULL){
212 rrd_set_error("allocating pdp_temp ...");
219 if ((tmpl_idx = malloc(sizeof(unsigned long)
220 *(rrd.stat_head->ds_cnt+1)))==NULL){
221 rrd_set_error("allocating tmpl_idx ...");
228 /* initialize template redirector */
229 /* default config example (assume DS 1 is a CDEF DS)
230 tmpl_idx[0] -> 0; (time)
231 tmpl_idx[1] -> 1; (DS 0)
232 tmpl_idx[2] -> 3; (DS 2)
233 tmpl_idx[3] -> 4; (DS 3) */
234 tmpl_idx[0] = 0; /* time */
235 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
237 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
246 tmpl_cnt = 1; /* the first entry is the time */
247 tmpl_len = strlen(template);
248 for(i=0;i<=tmpl_len ;i++) {
249 if (template[i] == ':' || template[i] == '\0') {
251 if (tmpl_cnt>rrd.stat_head->ds_cnt){
252 rrd_set_error("Template contains more DS definitions than RRD");
253 free(updvals); free(pdp_temp);
254 free(tmpl_idx); rrd_free(&rrd);
255 fclose(rrd_file); return(-1);
257 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
258 rrd_set_error("unknown DS name '%s'",dsname);
259 free(updvals); free(pdp_temp);
260 free(tmpl_idx); rrd_free(&rrd);
261 fclose(rrd_file); return(-1);
263 /* the first element is always the time */
264 tmpl_idx[tmpl_cnt-1]++;
265 /* go to the next entry on the template */
266 dsname = &template[i+1];
267 /* fix the damage we did before */
276 if ((pdp_new = malloc(sizeof(rrd_value_t)
277 *rrd.stat_head->ds_cnt))==NULL){
278 rrd_set_error("allocating pdp_new ...");
287 /* loop through the arguments. */
288 for(arg_i=optind+1; arg_i<argc;arg_i++) {
289 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
290 char *step_start = stepper;
292 char *parsetime_error = NULL;
293 enum {atstyle, normal} timesyntax;
294 struct time_value ds_tv;
295 if (stepper == NULL){
296 rrd_set_error("failed duplication argv entry");
304 /* initialize all ds input to unknown except the first one
305 which has always got to be set */
306 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
307 strcpy(stepper,argv[arg_i]);
309 /* separate all ds elements; first must be examined separately
310 due to alternate time syntax */
311 if ((p=strchr(stepper,'@'))!=NULL) {
312 timesyntax = atstyle;
315 } else if ((p=strchr(stepper,':'))!=NULL) {
320 rrd_set_error("expected timestamp not found in data source from %s:...",
326 updvals[tmpl_idx[ii]] = stepper;
328 if (*stepper == ':') {
332 updvals[tmpl_idx[ii]] = stepper+1;
338 if (ii != tmpl_cnt-1) {
339 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
340 tmpl_cnt-1, ii, argv[arg_i]);
345 /* get the time from the reading ... handle N */
346 if (timesyntax == atstyle) {
347 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
348 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
352 if (ds_tv.type == RELATIVE_TO_END_TIME ||
353 ds_tv.type == RELATIVE_TO_START_TIME) {
354 rrd_set_error("specifying time relative to the 'start' "
355 "or 'end' makes no sense here: %s",
361 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
362 } else if (strcmp(updvals[0],"N")==0){
363 current_time = time(NULL);
365 current_time = atol(updvals[0]);
368 if(current_time <= rrd.live_head->last_up){
369 rrd_set_error("illegal attempt to update using time %ld when "
370 "last update time is %ld (minimum one second step)",
371 current_time, rrd.live_head->last_up);
377 /* seek to the beginning of the rra's */
378 if (rra_current != rra_begin) {
379 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
380 rrd_set_error("seek error in rrd");
384 rra_current = rra_begin;
386 rra_start = rra_begin;
388 /* when was the current pdp started */
389 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
390 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
392 /* when did the last pdp_st occur */
393 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
394 occu_pdp_st = current_time - occu_pdp_age;
395 interval = current_time - rrd.live_head->last_up;
397 if (occu_pdp_st > proc_pdp_st){
398 /* OK we passed the pdp_st moment*/
399 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
400 * occurred before the latest
402 post_int = occu_pdp_age; /* how much after it */
416 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
417 occu_pdp_age, occu_pdp_st,
418 interval, pre_int, post_int);
421 /* process the data sources and update the pdp_prep
422 * area accordingly */
423 for(i=0;i<rrd.stat_head->ds_cnt;i++){
425 dst_idx= dst_conv(rrd.ds_def[i].dst);
426 /* NOTE: DST_CDEF should never enter this if block, because
427 * updvals[i+1][0] is initialized to 'U'; unless the caller
428 * accidently specified a value for the DST_CDEF. To handle
429 * this case, an extra check is required. */
430 if((updvals[i+1][0] != 'U') &&
431 (dst_idx != DST_CDEF) &&
432 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
434 /* the data source type defines how to process the data */
435 /* pdp_new contains rate * time ... eg the bytes
436 * transferred during the interval. Doing it this way saves
437 * a lot of math operations */
443 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
444 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
445 if(dst_idx == DST_COUNTER) {
446 /* simple overflow catcher sugestet by andres kroonmaa */
447 /* this will fail terribly for non 32 or 64 bit counters ... */
448 /* are there any others in SNMP land ? */
449 if (pdp_new[i] < (double)0.0 )
450 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
451 if (pdp_new[i] < (double)0.0 )
452 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
454 rate = pdp_new[i] / interval;
461 pdp_new[i]= atof(updvals[i+1]);
462 rate = pdp_new[i] / interval;
465 pdp_new[i] = atof(updvals[i+1]) * interval;
466 rate = pdp_new[i] / interval;
469 rrd_set_error("rrd contains unknown DS type : '%s'",
473 /* break out of this for loop if the error string is set */
474 if (rrd_test_error()){
477 /* make sure pdp_temp is neither too large or too small
478 * if any of these occur it becomes unknown ...
480 if ( ! isnan(rate) &&
481 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
482 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
483 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
484 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
488 /* no news is news all the same */
492 /* make a copy of the command line argument for the next run */
500 rrd.pdp_prep[i].last_ds,
501 updvals[i+1], pdp_new[i]);
503 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
504 strncpy(rrd.pdp_prep[i].last_ds,
505 updvals[i+1],LAST_DS_LEN-1);
506 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
509 /* break out of the argument parsing loop if the error_string is set */
510 if (rrd_test_error()){
514 /* has a pdp_st moment occurred since the last run ? */
516 if (proc_pdp_st == occu_pdp_st){
517 /* no we have not passed a pdp_st moment. therefore update is simple */
519 for(i=0;i<rrd.stat_head->ds_cnt;i++){
520 if(isnan(pdp_new[i]))
521 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
523 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
530 rrd.pdp_prep[i].scratch[PDP_val].u_val,
531 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535 /* an pdp_st has occurred. */
537 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
538 * occurred up to the last run.
539 pdp_new[] contains rate*seconds from the latest run.
540 pdp_temp[] will contain the rate for cdp */
542 for(i=0;i<rrd.stat_head->ds_cnt;i++){
543 /* update pdp_prep to the current pdp_st */
544 if(isnan(pdp_new[i]))
545 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
547 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
548 pdp_new[i]/(double)interval*(double)pre_int;
550 /* if too much of the pdp_prep is unknown we dump it */
551 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
552 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
553 (occu_pdp_st-proc_pdp_st <=
554 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
557 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
558 / (double)( occu_pdp_st
560 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
563 /* process CDEF data sources; remember each CDEF DS can
564 * only reference other DS with a lower index number */
565 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
567 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
568 /* substitue data values for OP_VARIABLE nodes */
569 for (ii = 0; rpnp[ii].op != OP_END; ii++)
571 if (rpnp[ii].op == OP_VARIABLE) {
572 rpnp[ii].op = OP_NUMBER;
573 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
576 /* run the rpn calculator */
577 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
579 break; /* exits the data sources pdp_temp loop */
583 /* make pdp_prep ready for the next run */
584 if(isnan(pdp_new[i])){
585 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
586 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
588 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
589 rrd.pdp_prep[i].scratch[PDP_val].u_val =
590 pdp_new[i]/(double)interval*(double)post_int;
598 "new_unkn_sec %5lu\n",
600 rrd.pdp_prep[i].scratch[PDP_val].u_val,
601 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
605 /* if there were errors during the last loop, bail out here */
606 if (rrd_test_error()){
611 /* compute the number of elapsed pdp_st moments */
612 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
614 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
616 if (rra_step_cnt == NULL)
618 rra_step_cnt = (unsigned long *)
619 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
622 for(i = 0, rra_start = rra_begin;
623 i < rrd.stat_head->rra_cnt;
624 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
627 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
628 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
629 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
630 if (start_pdp_offset <= elapsed_pdp_st) {
631 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
632 rrd.rra_def[i].pdp_cnt + 1;
637 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
639 /* If this is a bulk update, we need to skip ahead in the seasonal
640 * arrays so that they will be correct for the next observed value;
641 * note that for the bulk update itself, no update will occur to
642 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
644 if (rra_step_cnt[i] > 2)
646 /* skip update by resetting rra_step_cnt[i],
647 * note that this is not data source specific; this is due
648 * to the bulk update, not a DNAN value for the specific data
651 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
652 &last_seasonal_coef);
653 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
657 /* periodically run a smoother for seasonal effects */
658 /* Need to use first cdp parameter buffer to track
659 * burnin (burnin requires a specific smoothing schedule).
660 * The CDP_init_seasonal parameter is really an RRA level,
661 * not a data source within RRA level parameter, but the rra_def
662 * is read only for rrd_update (not flushed to disk). */
663 iii = i*(rrd.stat_head -> ds_cnt);
664 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
667 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
668 > rrd.rra_def[i].row_cnt - 1) {
669 /* mark off one of the burnin cycles */
670 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
674 /* someone has no doubt invented a trick to deal with this
675 * wrap around, but at least this code is clear. */
676 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
677 rrd.rra_ptr[i].cur_row)
679 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
680 * mapping between PDP and CDP */
681 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
682 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
686 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
687 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
688 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
693 /* can't rely on negative numbers because we are working with
695 /* Don't need modulus here. If we've wrapped more than once, only
696 * one smooth is executed at the end. */
697 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
698 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
699 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
703 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
704 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
705 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
712 rra_current = ftell(rrd_file);
713 } /* if cf is DEVSEASONAL or SEASONAL */
715 if (rrd_test_error()) break;
717 /* update CDP_PREP areas */
718 /* loop over data soures within each RRA */
720 ii < rrd.stat_head->ds_cnt;
724 /* iii indexes the CDP prep area for this data source within the RRA */
725 iii=i*rrd.stat_head->ds_cnt+ii;
727 if (rrd.rra_def[i].pdp_cnt > 1) {
729 if (rra_step_cnt[i] > 0) {
730 /* If we are in this block, as least 1 CDP value will be written to
731 * disk, this is the CDP_primary_val entry. If more than 1 value needs
732 * to be written, then the "fill in" value is the CDP_secondary_val
734 if (isnan(pdp_temp[ii]))
736 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
737 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
739 /* CDP_secondary value is the RRA "fill in" value for intermediary
740 * CDP data entries. No matter the CF, the value is the same because
741 * the average, max, min, and last of a list of identical values is
742 * the same, namely, the value itself. */
743 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
746 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
747 > rrd.rra_def[i].pdp_cnt*
748 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
750 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
751 /* initialize carry over */
752 if (current_cf == CF_AVERAGE) {
753 if (isnan(pdp_temp[ii])) {
754 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
756 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
757 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
760 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
763 rrd_value_t cum_val, cur_val;
764 switch (current_cf) {
766 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
767 cur_val = IFDNAN(pdp_temp[ii],0.0);
768 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
769 (cum_val + cur_val * start_pdp_offset) /
770 (rrd.rra_def[i].pdp_cnt
771 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
772 /* initialize carry over value */
773 if (isnan(pdp_temp[ii])) {
774 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
776 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
777 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
781 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
782 cur_val = IFDNAN(pdp_temp[ii],-DINF);
784 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
785 isnan(pdp_temp[ii])) {
787 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
792 if (cur_val > cum_val)
793 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
795 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
796 /* initialize carry over value */
797 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
800 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
801 cur_val = IFDNAN(pdp_temp[ii],DINF);
803 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
804 isnan(pdp_temp[ii])) {
806 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
811 if (cur_val < cum_val)
812 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
814 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
815 /* initialize carry over value */
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
820 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
821 /* initialize carry over value */
822 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
825 } /* endif meets xff value requirement for a valid value */
826 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
827 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
828 if (isnan(pdp_temp[ii]))
829 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
830 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
832 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
833 } else /* rra_step_cnt[i] == 0 */
836 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
837 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
840 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
841 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
844 if (isnan(pdp_temp[ii])) {
845 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
846 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
848 if (current_cf == CF_AVERAGE) {
849 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
852 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
855 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
856 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
859 switch (current_cf) {
861 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
865 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
866 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
869 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
870 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
874 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
879 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
880 if (elapsed_pdp_st > 2)
882 switch (current_cf) {
885 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
886 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
890 /* need to update cached seasonal values, so they are consistent
891 * with the bulk update */
892 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
893 * CDP_last_deviation are the same. */
894 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
895 last_seasonal_coef[ii];
896 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
900 /* need to update the null_count and last_null_count.
901 * even do this for non-DNAN pdp_temp because the
902 * algorithm is not learning from batch updates. */
903 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
905 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
909 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
910 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
913 /* do not count missed bulk values as failures */
914 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
915 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
916 /* need to reset violations buffer.
917 * could do this more carefully, but for now, just
918 * assume a bulk update wipes away all violations. */
919 erase_violations(&rrd, iii, i);
923 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
925 if (rrd_test_error()) break;
927 } /* endif data sources loop */
930 /* this loop is only entered if elapsed_pdp_st < 3 */
931 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
932 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
934 for(i = 0, rra_start = rra_begin;
935 i < rrd.stat_head->rra_cnt;
936 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
939 if (rrd.rra_def[i].pdp_cnt > 1) continue;
941 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
942 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
944 lookup_seasonal(&rrd,i,rra_start,rrd_file,
945 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
947 rra_current = ftell(rrd_file);
949 if (rrd_test_error()) break;
950 /* loop over data soures within each RRA */
952 ii < rrd.stat_head->ds_cnt;
955 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
956 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
957 scratch_idx, seasonal_coef);
960 if (rrd_test_error()) break;
961 } /* end elapsed_pdp_st loop */
963 if (rrd_test_error()) break;
965 /* Ready to write to disk */
966 /* Move sequentially through the file, writing one RRA at a time.
967 * Note this architecture divorces the computation of CDP with
968 * flushing updated RRA entries to disk. */
969 for(i = 0, rra_start = rra_begin;
970 i < rrd.stat_head->rra_cnt;
971 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
973 /* is there anything to write for this RRA? If not, continue. */
974 if (rra_step_cnt[i] == 0) continue;
976 /* write the first row */
978 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
980 rrd.rra_ptr[i].cur_row++;
981 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
982 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
983 /* positition on the first row */
984 rra_pos_tmp = rra_start +
985 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
986 if(rra_pos_tmp != rra_current) {
987 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
988 rrd_set_error("seek error in rrd");
991 rra_current = rra_pos_tmp;
995 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
997 scratch_idx = CDP_primary_val;
998 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
999 if (rrd_test_error()) break;
1001 /* write other rows of the bulk update, if any */
1002 scratch_idx = CDP_secondary_val;
1003 for ( ; rra_step_cnt[i] > 1;
1004 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1006 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1009 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1010 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1013 rrd.rra_ptr[i].cur_row = 0;
1014 /* seek back to beginning of current rra */
1015 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1017 rrd_set_error("seek error in rrd");
1021 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1023 rra_current = rra_start;
1025 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1028 if (rrd_test_error())
1032 /* break out of the argument parsing loop if error_string is set */
1033 if (rrd_test_error()){
1038 } /* endif a pdp_st has occurred */
1039 rrd.live_head->last_up = current_time;
1041 } /* function argument loop */
1043 if (seasonal_coef != NULL) free(seasonal_coef);
1044 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1045 if (rra_step_cnt != NULL) free(rra_step_cnt);
1046 rpnstack_free(&rpnstack);
1048 /* if we got here and if there is an error and if the file has not been
1049 * written to, then close things up and return. */
1050 if (rrd_test_error()) {
1060 /* aargh ... that was tough ... so many loops ... anyway, its done.
1061 * we just need to write back the live header portion now*/
1063 if (fseek(rrd_file, (sizeof(stat_head_t)
1064 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1065 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1067 rrd_set_error("seek rrd for live header writeback");
1077 if(fwrite( rrd.live_head,
1078 sizeof(live_head_t), 1, rrd_file) != 1){
1079 rrd_set_error("fwrite live_head to rrd");
1089 if(fwrite( rrd.pdp_prep,
1091 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1092 rrd_set_error("ftwrite pdp_prep to rrd");
1102 if(fwrite( rrd.cdp_prep,
1104 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1105 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1107 rrd_set_error("ftwrite cdp_prep to rrd");
1117 if(fwrite( rrd.rra_ptr,
1119 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1120 rrd_set_error("fwrite rra_ptr to rrd");
1130 /* OK now close the files and free the memory */
1131 if(fclose(rrd_file) != 0){
1132 rrd_set_error("closing rrd");
1141 /* calling the smoothing code here guarantees at most
1142 * one smoothing operation per rrd_update call. Unfortunately,
1143 * it is possible with bulk updates, or a long-delayed update
1144 * for smoothing to occur off-schedule. This really isn't
1145 * critical except during the burning cycles. */
1146 if (schedule_smooth)
1149 rrd_file = fopen(argv[optind],"r+");
1151 rrd_file = fopen(argv[optind],"rb+");
1153 rra_start = rra_begin;
1154 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1156 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1157 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1160 fprintf(stderr,"Running smoother for rra %ld\n",i);
1162 apply_smoother(&rrd,i,rra_start,rrd_file);
1163 if (rrd_test_error())
1166 rra_start += rrd.rra_def[i].row_cnt
1167 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1180 * get exclusive lock to whole file.
1181 * lock gets removed when we close the file
1183 * returns 0 on success
1186 LockRRD(FILE *rrdfile)
1188 int rrd_fd; /* File descriptor for RRD */
1191 rrd_fd = fileno(rrdfile);
1196 lock.l_type = F_WRLCK; /* exclusive write lock */
1197 lock.l_len = 0; /* whole file */
1198 lock.l_start = 0; /* start of file */
1199 lock.l_whence = SEEK_SET; /* end of file */
1201 stat = fcntl(rrd_fd, F_SETLK, &lock);
1205 if ( _fstat( rrd_fd, &st ) == 0 ) {
1206 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1218 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1219 unsigned short CDP_scratch_idx, FILE *rrd_file)
1221 unsigned long ds_idx, cdp_idx;
1223 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1225 /* compute the cdp index */
1226 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1228 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1229 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1230 rrd -> rra_def[rra_idx].cf_nam);
1233 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1234 sizeof(rrd_value_t),1,rrd_file) != 1)
1236 rrd_set_error("writing rrd");
1239 *rra_current += sizeof(rrd_value_t);