1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.7 2003/02/13 07:05:27 oetiker
9 * Find attached the patch I promised to send to you. Please note that there
10 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
11 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
12 * library is identical to librrd, but it contains support code for per-thread
13 * global variables currently used for error information only. This is similar
14 * to how errno per-thread variables are implemented. librrd_th must be linked
15 * alongside of libpthred
17 * There is also a new file "THREADS", holding some documentation.
19 * -- Peter Stamfest <peter@stamfest.at>
21 * Revision 1.6 2002/02/01 20:34:49 oetiker
22 * fixed version number and date/time
24 * Revision 1.5 2001/05/09 05:31:01 oetiker
25 * Bug fix: when update of multiple PDP/CDP RRAs coincided
26 * with interpolation of multiple PDPs an incorrect value was
27 * stored as the CDP. Especially evident for GAUGE data sources.
28 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
30 * Revision 1.4 2001/03/10 23:54:41 oetiker
31 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
32 * parser and calculator from rrd_graph and puts then in a new file,
33 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
34 * clean-up of aberrant behavior stuff, including a bug fix.
35 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
36 * -- Jake Brutlag <jakeb@corp.webtv.net>
38 * Revision 1.3 2001/03/04 13:01:55 oetiker
39 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
40 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
41 * This is backwards compatible! But new files using the Aberrant stuff are not readable
42 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
43 * -- Jake Brutlag <jakeb@corp.webtv.net>
45 * Revision 1.2 2001/03/04 11:14:25 oetiker
46 * added at-style-time@value:value syntax to rrd_update
47 * -- Dave Bodenstab <imdave@mcs.net>
49 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
52 *****************************************************************************/
55 #include <sys/types.h>
59 #include <sys/locking.h>
65 #include "rrd_rpncalc.h"
67 #include "rrd_is_thread_safe.h"
69 /* Local prototypes */
70 int LockRRD(FILE *rrd_file);
71 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx, FILE *rrd_file);
74 int rrd_update_r(char *filename, char *template, int argc, char **argv);
76 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
81 main(int argc, char **argv){
82 rrd_update(argc,argv);
83 if (rrd_test_error()) {
84 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
85 "Usage: rrdupdate filename\n"
86 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
87 "\t\t\ttime|N:value[:value...]\n\n"
88 "\t\t\tat-time@value[:value...]\n\n"
89 "\t\t\t[ time:value[:value...] ..]\n\n");
91 printf("ERROR: %s\n",rrd_get_error());
100 rrd_update(int argc, char **argv)
102 char *template = NULL;
106 static struct option long_options[] =
108 {"template", required_argument, 0, 't'},
111 int option_index = 0;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 /* rrd_free(&rrd); */
131 /* need at least 2 arguments: filename, data. */
132 if (argc-optind < 2) {
133 rrd_set_error("Not enough arguments");
138 rc = rrd_update_r(argv[optind], template,
139 argc - optind - 1, argv + optind + 1);
144 rrd_update_r(char *filename, char *template, int argc, char **argv)
149 unsigned long i,ii,iii=1;
151 unsigned long rra_begin; /* byte pointer to the rra
152 * area in the rrd file. this
153 * pointer never changes value */
154 unsigned long rra_start; /* byte pointer to the rra
155 * area in the rrd file. this
156 * pointer changes as each rrd is
158 unsigned long rra_current; /* byte pointer to the current write
159 * spot in the rrd file. */
160 unsigned long rra_pos_tmp; /* temporary byte pointer. */
161 unsigned long interval,
162 pre_int,post_int; /* interval between this and
164 unsigned long proc_pdp_st; /* which pdp_st was the last
166 unsigned long occu_pdp_st; /* when was the pdp_st
167 * before the last update
169 unsigned long proc_pdp_age; /* how old was the data in
170 * the pdp prep area when it
171 * was last updated */
172 unsigned long occu_pdp_age; /* how long ago was the last
174 rrd_value_t *pdp_new; /* prepare the incoming data
175 * to be added the the
177 rrd_value_t *pdp_temp; /* prepare the pdp values
178 * to be added the the
181 long *tmpl_idx; /* index representing the settings
182 transported by the template index */
183 unsigned long tmpl_cnt = 2; /* time and data */
187 time_t current_time = time(NULL);
189 int schedule_smooth = 0;
190 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
191 /* a vector of future Holt-Winters seasonal coefs */
192 unsigned long elapsed_pdp_st;
193 /* number of elapsed PDP steps since last update */
194 unsigned long *rra_step_cnt = NULL;
195 /* number of rows to be updated in an RRA for a data
197 unsigned long start_pdp_offset;
198 /* number of PDP steps since the last update that
199 * are assigned to the first CDP to be generated
200 * since the last update. */
201 unsigned short scratch_idx;
202 /* index into the CDP scratch array */
203 enum cf_en current_cf;
204 /* numeric id of the current consolidation function */
205 rpnstack_t rpnstack; /* used for COMPUTE DS */
207 rpnstack_init(&rpnstack);
209 /* need at least 1 arguments: data. */
211 rrd_set_error("Not enough arguments");
215 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
218 rra_current = rra_start = rra_begin = ftell(rrd_file);
219 /* This is defined in the ANSI C standard, section 7.9.5.3:
221 When a file is opened with udpate mode ('+' as the second
222 or third character in the ... list of mode argument
223 variables), both input and ouptut may be performed on the
224 associated stream. However, ... input may not be directly
225 followed by output without an intervening call to a file
226 positioning function, unless the input oepration encounters
228 fseek(rrd_file, 0, SEEK_CUR);
231 /* get exclusive lock to whole file.
232 * lock gets removed when we close the file.
234 if (LockRRD(rrd_file) != 0) {
235 rrd_set_error("could not lock RRD");
241 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
242 rrd_set_error("allocating updvals pointer array");
248 if ((pdp_temp = malloc(sizeof(rrd_value_t)
249 *rrd.stat_head->ds_cnt))==NULL){
250 rrd_set_error("allocating pdp_temp ...");
257 if ((tmpl_idx = malloc(sizeof(unsigned long)
258 *(rrd.stat_head->ds_cnt+1)))==NULL){
259 rrd_set_error("allocating tmpl_idx ...");
266 /* initialize template redirector */
267 /* default config example (assume DS 1 is a CDEF DS)
268 tmpl_idx[0] -> 0; (time)
269 tmpl_idx[1] -> 1; (DS 0)
270 tmpl_idx[2] -> 3; (DS 2)
271 tmpl_idx[3] -> 4; (DS 3) */
272 tmpl_idx[0] = 0; /* time */
273 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
275 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
282 unsigned int tmpl_len;
284 tmpl_cnt = 1; /* the first entry is the time */
285 tmpl_len = strlen(template);
286 for(i=0;i<=tmpl_len ;i++) {
287 if (template[i] == ':' || template[i] == '\0') {
289 if (tmpl_cnt>rrd.stat_head->ds_cnt){
290 rrd_set_error("Template contains more DS definitions than RRD");
291 free(updvals); free(pdp_temp);
292 free(tmpl_idx); rrd_free(&rrd);
293 fclose(rrd_file); return(-1);
295 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
296 rrd_set_error("unknown DS name '%s'",dsname);
297 free(updvals); free(pdp_temp);
298 free(tmpl_idx); rrd_free(&rrd);
299 fclose(rrd_file); return(-1);
301 /* the first element is always the time */
302 tmpl_idx[tmpl_cnt-1]++;
303 /* go to the next entry on the template */
304 dsname = &template[i+1];
305 /* fix the damage we did before */
314 if ((pdp_new = malloc(sizeof(rrd_value_t)
315 *rrd.stat_head->ds_cnt))==NULL){
316 rrd_set_error("allocating pdp_new ...");
325 /* loop through the arguments. */
326 for(arg_i=0; arg_i<argc;arg_i++) {
327 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
328 char *step_start = stepper;
330 char *parsetime_error = NULL;
331 enum {atstyle, normal} timesyntax;
332 struct time_value ds_tv;
333 if (stepper == NULL){
334 rrd_set_error("failed duplication argv entry");
342 /* initialize all ds input to unknown except the first one
343 which has always got to be set */
344 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
345 strcpy(stepper,argv[arg_i]);
347 /* separate all ds elements; first must be examined separately
348 due to alternate time syntax */
349 if ((p=strchr(stepper,'@'))!=NULL) {
350 timesyntax = atstyle;
353 } else if ((p=strchr(stepper,':'))!=NULL) {
358 rrd_set_error("expected timestamp not found in data source from %s:...",
364 updvals[tmpl_idx[ii]] = stepper;
366 if (*stepper == ':') {
370 updvals[tmpl_idx[ii]] = stepper+1;
376 if (ii != tmpl_cnt-1) {
377 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
378 tmpl_cnt-1, ii, argv[arg_i]);
383 /* get the time from the reading ... handle N */
384 if (timesyntax == atstyle) {
385 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
386 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
390 if (ds_tv.type == RELATIVE_TO_END_TIME ||
391 ds_tv.type == RELATIVE_TO_START_TIME) {
392 rrd_set_error("specifying time relative to the 'start' "
393 "or 'end' makes no sense here: %s",
399 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
400 } else if (strcmp(updvals[0],"N")==0){
401 current_time = time(NULL);
403 current_time = atol(updvals[0]);
406 if(current_time <= rrd.live_head->last_up){
407 rrd_set_error("illegal attempt to update using time %ld when "
408 "last update time is %ld (minimum one second step)",
409 current_time, rrd.live_head->last_up);
415 /* seek to the beginning of the rra's */
416 if (rra_current != rra_begin) {
417 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
418 rrd_set_error("seek error in rrd");
422 rra_current = rra_begin;
424 rra_start = rra_begin;
426 /* when was the current pdp started */
427 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
428 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
430 /* when did the last pdp_st occur */
431 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
432 occu_pdp_st = current_time - occu_pdp_age;
433 interval = current_time - rrd.live_head->last_up;
435 if (occu_pdp_st > proc_pdp_st){
436 /* OK we passed the pdp_st moment*/
437 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
438 * occurred before the latest
440 post_int = occu_pdp_age; /* how much after it */
454 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
455 occu_pdp_age, occu_pdp_st,
456 interval, pre_int, post_int);
459 /* process the data sources and update the pdp_prep
460 * area accordingly */
461 for(i=0;i<rrd.stat_head->ds_cnt;i++){
463 dst_idx= dst_conv(rrd.ds_def[i].dst);
464 /* NOTE: DST_CDEF should never enter this if block, because
465 * updvals[i+1][0] is initialized to 'U'; unless the caller
466 * accidently specified a value for the DST_CDEF. To handle
467 * this case, an extra check is required. */
468 if((updvals[i+1][0] != 'U') &&
469 (dst_idx != DST_CDEF) &&
470 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
472 /* the data source type defines how to process the data */
473 /* pdp_new contains rate * time ... eg the bytes
474 * transferred during the interval. Doing it this way saves
475 * a lot of math operations */
481 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
482 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
483 if(dst_idx == DST_COUNTER) {
484 /* simple overflow catcher sugestet by andres kroonmaa */
485 /* this will fail terribly for non 32 or 64 bit counters ... */
486 /* are there any others in SNMP land ? */
487 if (pdp_new[i] < (double)0.0 )
488 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
489 if (pdp_new[i] < (double)0.0 )
490 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
492 rate = pdp_new[i] / interval;
499 pdp_new[i]= atof(updvals[i+1]);
500 rate = pdp_new[i] / interval;
503 pdp_new[i] = atof(updvals[i+1]) * interval;
504 rate = pdp_new[i] / interval;
507 rrd_set_error("rrd contains unknown DS type : '%s'",
511 /* break out of this for loop if the error string is set */
512 if (rrd_test_error()){
515 /* make sure pdp_temp is neither too large or too small
516 * if any of these occur it becomes unknown ...
518 if ( ! isnan(rate) &&
519 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
520 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
521 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
522 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
526 /* no news is news all the same */
530 /* make a copy of the command line argument for the next run */
538 rrd.pdp_prep[i].last_ds,
539 updvals[i+1], pdp_new[i]);
541 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
542 strncpy(rrd.pdp_prep[i].last_ds,
543 updvals[i+1],LAST_DS_LEN-1);
544 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
547 /* break out of the argument parsing loop if the error_string is set */
548 if (rrd_test_error()){
552 /* has a pdp_st moment occurred since the last run ? */
554 if (proc_pdp_st == occu_pdp_st){
555 /* no we have not passed a pdp_st moment. therefore update is simple */
557 for(i=0;i<rrd.stat_head->ds_cnt;i++){
558 if(isnan(pdp_new[i]))
559 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
561 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
568 rrd.pdp_prep[i].scratch[PDP_val].u_val,
569 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
573 /* an pdp_st has occurred. */
575 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
576 * occurred up to the last run.
577 pdp_new[] contains rate*seconds from the latest run.
578 pdp_temp[] will contain the rate for cdp */
580 for(i=0;i<rrd.stat_head->ds_cnt;i++){
581 /* update pdp_prep to the current pdp_st */
582 if(isnan(pdp_new[i]))
583 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
585 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
586 pdp_new[i]/(double)interval*(double)pre_int;
588 /* if too much of the pdp_prep is unknown we dump it */
589 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
590 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
591 (occu_pdp_st-proc_pdp_st <=
592 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
595 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
596 / (double)( occu_pdp_st
598 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
601 /* process CDEF data sources; remember each CDEF DS can
602 * only reference other DS with a lower index number */
603 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
605 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
606 /* substitue data values for OP_VARIABLE nodes */
607 for (ii = 0; rpnp[ii].op != OP_END; ii++)
609 if (rpnp[ii].op == OP_VARIABLE) {
610 rpnp[ii].op = OP_NUMBER;
611 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
614 /* run the rpn calculator */
615 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
617 break; /* exits the data sources pdp_temp loop */
621 /* make pdp_prep ready for the next run */
622 if(isnan(pdp_new[i])){
623 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
624 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
626 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
627 rrd.pdp_prep[i].scratch[PDP_val].u_val =
628 pdp_new[i]/(double)interval*(double)post_int;
636 "new_unkn_sec %5lu\n",
638 rrd.pdp_prep[i].scratch[PDP_val].u_val,
639 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
643 /* if there were errors during the last loop, bail out here */
644 if (rrd_test_error()){
649 /* compute the number of elapsed pdp_st moments */
650 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
652 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
654 if (rra_step_cnt == NULL)
656 rra_step_cnt = (unsigned long *)
657 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
660 for(i = 0, rra_start = rra_begin;
661 i < rrd.stat_head->rra_cnt;
662 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
665 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
666 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
667 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
668 if (start_pdp_offset <= elapsed_pdp_st) {
669 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
670 rrd.rra_def[i].pdp_cnt + 1;
675 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
677 /* If this is a bulk update, we need to skip ahead in the seasonal
678 * arrays so that they will be correct for the next observed value;
679 * note that for the bulk update itself, no update will occur to
680 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
682 if (rra_step_cnt[i] > 2)
684 /* skip update by resetting rra_step_cnt[i],
685 * note that this is not data source specific; this is due
686 * to the bulk update, not a DNAN value for the specific data
689 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
690 &last_seasonal_coef);
691 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
695 /* periodically run a smoother for seasonal effects */
696 /* Need to use first cdp parameter buffer to track
697 * burnin (burnin requires a specific smoothing schedule).
698 * The CDP_init_seasonal parameter is really an RRA level,
699 * not a data source within RRA level parameter, but the rra_def
700 * is read only for rrd_update (not flushed to disk). */
701 iii = i*(rrd.stat_head -> ds_cnt);
702 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
705 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
706 > rrd.rra_def[i].row_cnt - 1) {
707 /* mark off one of the burnin cycles */
708 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
712 /* someone has no doubt invented a trick to deal with this
713 * wrap around, but at least this code is clear. */
714 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
715 rrd.rra_ptr[i].cur_row)
717 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
718 * mapping between PDP and CDP */
719 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
720 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
724 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
725 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
726 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
731 /* can't rely on negative numbers because we are working with
733 /* Don't need modulus here. If we've wrapped more than once, only
734 * one smooth is executed at the end. */
735 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
736 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
737 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
741 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
742 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
743 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
750 rra_current = ftell(rrd_file);
751 } /* if cf is DEVSEASONAL or SEASONAL */
753 if (rrd_test_error()) break;
755 /* update CDP_PREP areas */
756 /* loop over data soures within each RRA */
758 ii < rrd.stat_head->ds_cnt;
762 /* iii indexes the CDP prep area for this data source within the RRA */
763 iii=i*rrd.stat_head->ds_cnt+ii;
765 if (rrd.rra_def[i].pdp_cnt > 1) {
767 if (rra_step_cnt[i] > 0) {
768 /* If we are in this block, as least 1 CDP value will be written to
769 * disk, this is the CDP_primary_val entry. If more than 1 value needs
770 * to be written, then the "fill in" value is the CDP_secondary_val
772 if (isnan(pdp_temp[ii]))
774 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
775 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
777 /* CDP_secondary value is the RRA "fill in" value for intermediary
778 * CDP data entries. No matter the CF, the value is the same because
779 * the average, max, min, and last of a list of identical values is
780 * the same, namely, the value itself. */
781 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
784 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
785 > rrd.rra_def[i].pdp_cnt*
786 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
788 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
789 /* initialize carry over */
790 if (current_cf == CF_AVERAGE) {
791 if (isnan(pdp_temp[ii])) {
792 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
794 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
795 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
798 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
801 rrd_value_t cum_val, cur_val;
802 switch (current_cf) {
804 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
805 cur_val = IFDNAN(pdp_temp[ii],0.0);
806 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
807 (cum_val + cur_val * start_pdp_offset) /
808 (rrd.rra_def[i].pdp_cnt
809 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
810 /* initialize carry over value */
811 if (isnan(pdp_temp[ii])) {
812 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
814 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
815 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
819 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
820 cur_val = IFDNAN(pdp_temp[ii],-DINF);
822 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
823 isnan(pdp_temp[ii])) {
825 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
830 if (cur_val > cum_val)
831 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
833 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
834 /* initialize carry over value */
835 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
838 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
839 cur_val = IFDNAN(pdp_temp[ii],DINF);
841 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
842 isnan(pdp_temp[ii])) {
844 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
849 if (cur_val < cum_val)
850 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
852 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
853 /* initialize carry over value */
854 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
858 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
859 /* initialize carry over value */
860 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
863 } /* endif meets xff value requirement for a valid value */
864 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
865 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
866 if (isnan(pdp_temp[ii]))
867 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
868 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
870 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
871 } else /* rra_step_cnt[i] == 0 */
874 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
875 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
878 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
879 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
882 if (isnan(pdp_temp[ii])) {
883 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
884 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
886 if (current_cf == CF_AVERAGE) {
887 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
890 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
893 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
894 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
897 switch (current_cf) {
899 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
903 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
904 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
907 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
908 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
912 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
917 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
918 if (elapsed_pdp_st > 2)
920 switch (current_cf) {
923 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
924 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
928 /* need to update cached seasonal values, so they are consistent
929 * with the bulk update */
930 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
931 * CDP_last_deviation are the same. */
932 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
933 last_seasonal_coef[ii];
934 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
938 /* need to update the null_count and last_null_count.
939 * even do this for non-DNAN pdp_temp because the
940 * algorithm is not learning from batch updates. */
941 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
943 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
947 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
948 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
951 /* do not count missed bulk values as failures */
952 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
953 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
954 /* need to reset violations buffer.
955 * could do this more carefully, but for now, just
956 * assume a bulk update wipes away all violations. */
957 erase_violations(&rrd, iii, i);
961 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
963 if (rrd_test_error()) break;
965 } /* endif data sources loop */
968 /* this loop is only entered if elapsed_pdp_st < 3 */
969 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
970 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
972 for(i = 0, rra_start = rra_begin;
973 i < rrd.stat_head->rra_cnt;
974 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
977 if (rrd.rra_def[i].pdp_cnt > 1) continue;
979 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
980 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
982 lookup_seasonal(&rrd,i,rra_start,rrd_file,
983 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
985 rra_current = ftell(rrd_file);
987 if (rrd_test_error()) break;
988 /* loop over data soures within each RRA */
990 ii < rrd.stat_head->ds_cnt;
993 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
994 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
995 scratch_idx, seasonal_coef);
998 if (rrd_test_error()) break;
999 } /* end elapsed_pdp_st loop */
1001 if (rrd_test_error()) break;
1003 /* Ready to write to disk */
1004 /* Move sequentially through the file, writing one RRA at a time.
1005 * Note this architecture divorces the computation of CDP with
1006 * flushing updated RRA entries to disk. */
1007 for(i = 0, rra_start = rra_begin;
1008 i < rrd.stat_head->rra_cnt;
1009 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1011 /* is there anything to write for this RRA? If not, continue. */
1012 if (rra_step_cnt[i] == 0) continue;
1014 /* write the first row */
1016 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1018 rrd.rra_ptr[i].cur_row++;
1019 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1020 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1021 /* positition on the first row */
1022 rra_pos_tmp = rra_start +
1023 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1024 if(rra_pos_tmp != rra_current) {
1025 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1026 rrd_set_error("seek error in rrd");
1029 rra_current = rra_pos_tmp;
1033 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1035 scratch_idx = CDP_primary_val;
1036 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1037 if (rrd_test_error()) break;
1039 /* write other rows of the bulk update, if any */
1040 scratch_idx = CDP_secondary_val;
1041 for ( ; rra_step_cnt[i] > 1;
1042 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1044 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1047 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1048 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1051 rrd.rra_ptr[i].cur_row = 0;
1052 /* seek back to beginning of current rra */
1053 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1055 rrd_set_error("seek error in rrd");
1059 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1061 rra_current = rra_start;
1063 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1066 if (rrd_test_error())
1070 /* break out of the argument parsing loop if error_string is set */
1071 if (rrd_test_error()){
1076 } /* endif a pdp_st has occurred */
1077 rrd.live_head->last_up = current_time;
1079 } /* function argument loop */
1081 if (seasonal_coef != NULL) free(seasonal_coef);
1082 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1083 if (rra_step_cnt != NULL) free(rra_step_cnt);
1084 rpnstack_free(&rpnstack);
1086 /* if we got here and if there is an error and if the file has not been
1087 * written to, then close things up and return. */
1088 if (rrd_test_error()) {
1098 /* aargh ... that was tough ... so many loops ... anyway, its done.
1099 * we just need to write back the live header portion now*/
1101 if (fseek(rrd_file, (sizeof(stat_head_t)
1102 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1103 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1105 rrd_set_error("seek rrd for live header writeback");
1115 if(fwrite( rrd.live_head,
1116 sizeof(live_head_t), 1, rrd_file) != 1){
1117 rrd_set_error("fwrite live_head to rrd");
1127 if(fwrite( rrd.pdp_prep,
1129 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1130 rrd_set_error("ftwrite pdp_prep to rrd");
1140 if(fwrite( rrd.cdp_prep,
1142 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1143 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1145 rrd_set_error("ftwrite cdp_prep to rrd");
1155 if(fwrite( rrd.rra_ptr,
1157 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1158 rrd_set_error("fwrite rra_ptr to rrd");
1168 /* OK now close the files and free the memory */
1169 if(fclose(rrd_file) != 0){
1170 rrd_set_error("closing rrd");
1179 /* calling the smoothing code here guarantees at most
1180 * one smoothing operation per rrd_update call. Unfortunately,
1181 * it is possible with bulk updates, or a long-delayed update
1182 * for smoothing to occur off-schedule. This really isn't
1183 * critical except during the burning cycles. */
1184 if (schedule_smooth)
1187 rrd_file = fopen(filename,"r+");
1189 rrd_file = fopen(filename,"rb+");
1191 rra_start = rra_begin;
1192 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1194 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1195 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1198 fprintf(stderr,"Running smoother for rra %ld\n",i);
1200 apply_smoother(&rrd,i,rra_start,rrd_file);
1201 if (rrd_test_error())
1204 rra_start += rrd.rra_def[i].row_cnt
1205 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1218 * get exclusive lock to whole file.
1219 * lock gets removed when we close the file
1221 * returns 0 on success
1224 LockRRD(FILE *rrdfile)
1226 int rrd_fd; /* File descriptor for RRD */
1229 rrd_fd = fileno(rrdfile);
1234 lock.l_type = F_WRLCK; /* exclusive write lock */
1235 lock.l_len = 0; /* whole file */
1236 lock.l_start = 0; /* start of file */
1237 lock.l_whence = SEEK_SET; /* end of file */
1239 stat = fcntl(rrd_fd, F_SETLK, &lock);
1243 if ( _fstat( rrd_fd, &st ) == 0 ) {
1244 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1256 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1257 unsigned short CDP_scratch_idx, FILE *rrd_file)
1259 unsigned long ds_idx, cdp_idx;
1261 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1263 /* compute the cdp index */
1264 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1266 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1267 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1268 rrd -> rra_def[rra_idx].cf_nam);
1271 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1272 sizeof(rrd_value_t),1,rrd_file) != 1)
1274 rrd_set_error("writing rrd");
1277 *rra_current += sizeof(rrd_value_t);