1 /*****************************************************************************
2 * RRDtool 1.2rc4 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
32 #include <sys/timeb.h>
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46 struct timeb current_time;
48 _ftime(¤t_time);
50 t->tv_sec = current_time.time;
51 t->tv_usec = current_time.millitm * 1000;
56 * normilize time as returned by gettimeofday. usec part must
59 static void normalize_time(struct timeval *t)
63 t->tv_usec += 1000000L;
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
71 unsigned long *rra_current,
72 unsigned short CDP_scratch_idx, FILE *rrd_file,
73 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx, FILE *rrd_file,
78 info_t *pcdp_summary, time_t *rra_time);
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv,
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
89 main(int argc, char **argv){
90 rrd_update(argc,argv);
91 if (rrd_test_error()) {
92 printf("RRDtool 1.2rc4 Copyright by Tobi Oetiker, 1997-2005\n\n"
93 "Usage: rrdupdate filename\n"
94 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95 "\t\t\ttime|N:value[:value...]\n\n"
96 "\t\t\tat-time@value[:value...]\n\n"
97 "\t\t\t[ time:value[:value...] ..]\n\n");
99 printf("ERROR: %s\n",rrd_get_error());
107 info_t *rrd_update_v(int argc, char **argv)
109 char *template = NULL;
110 info_t *result = NULL;
114 static struct option long_options[] =
116 {"template", required_argument, 0, 't'},
119 int option_index = 0;
121 opt = getopt_long(argc, argv, "t:",
122 long_options, &option_index);
133 rrd_set_error("unknown option '%s'",argv[optind-1]);
139 /* need at least 2 arguments: filename, data. */
140 if (argc-optind < 2) {
141 rrd_set_error("Not enough arguments");
145 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
146 rc.u_int = _rrd_update(argv[optind], template,
147 argc - optind - 1, argv + optind + 1, result);
148 result->value.u_int = rc.u_int;
154 rrd_update(int argc, char **argv)
156 char *template = NULL;
160 static struct option long_options[] =
162 {"template", required_argument, 0, 't'},
165 int option_index = 0;
167 opt = getopt_long(argc, argv, "t:",
168 long_options, &option_index);
179 rrd_set_error("unknown option '%s'",argv[optind-1]);
184 /* need at least 2 arguments: filename, data. */
185 if (argc-optind < 2) {
186 rrd_set_error("Not enough arguments");
191 rc = rrd_update_r(argv[optind], template,
192 argc - optind - 1, argv + optind + 1);
197 rrd_update_r(char *filename, char *template, int argc, char **argv)
199 return _rrd_update(filename, template, argc, argv, NULL);
203 _rrd_update(char *filename, char *template, int argc, char **argv,
204 info_t *pcdp_summary)
209 unsigned long i,ii,iii=1;
211 unsigned long rra_begin; /* byte pointer to the rra
212 * area in the rrd file. this
213 * pointer never changes value */
214 unsigned long rra_start; /* byte pointer to the rra
215 * area in the rrd file. this
216 * pointer changes as each rrd is
218 unsigned long rra_current; /* byte pointer to the current write
219 * spot in the rrd file. */
220 unsigned long rra_pos_tmp; /* temporary byte pointer. */
222 pre_int,post_int; /* interval between this and
224 unsigned long proc_pdp_st; /* which pdp_st was the last
226 unsigned long occu_pdp_st; /* when was the pdp_st
227 * before the last update
229 unsigned long proc_pdp_age; /* how old was the data in
230 * the pdp prep area when it
231 * was last updated */
232 unsigned long occu_pdp_age; /* how long ago was the last
234 rrd_value_t *pdp_new; /* prepare the incoming data
235 * to be added the the
237 rrd_value_t *pdp_temp; /* prepare the pdp values
238 * to be added the the
241 long *tmpl_idx; /* index representing the settings
242 transported by the template index */
243 unsigned long tmpl_cnt = 2; /* time and data */
248 time_t rra_time; /* time of update for a RRA */
249 unsigned long current_time_usec; /* microseconds part of current time */
250 struct timeval tmp_time; /* used for time conversion */
253 int schedule_smooth = 0;
254 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
255 /* a vector of future Holt-Winters seasonal coefs */
256 unsigned long elapsed_pdp_st;
257 /* number of elapsed PDP steps since last update */
258 unsigned long *rra_step_cnt = NULL;
259 /* number of rows to be updated in an RRA for a data
261 unsigned long start_pdp_offset;
262 /* number of PDP steps since the last update that
263 * are assigned to the first CDP to be generated
264 * since the last update. */
265 unsigned short scratch_idx;
266 /* index into the CDP scratch array */
267 enum cf_en current_cf;
268 /* numeric id of the current consolidation function */
269 rpnstack_t rpnstack; /* used for COMPUTE DS */
270 int version; /* rrd version */
271 char *endptr; /* used in the conversion */
273 void *rrd_mmaped_file;
274 unsigned long rrd_filesize;
277 rpnstack_init(&rpnstack);
279 /* need at least 1 arguments: data. */
281 rrd_set_error("Not enough arguments");
287 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
290 /* initialize time */
291 version = atoi(rrd.stat_head->version);
292 gettimeofday(&tmp_time, 0);
293 normalize_time(&tmp_time);
294 current_time = tmp_time.tv_sec;
296 current_time_usec = tmp_time.tv_usec;
299 current_time_usec = 0;
302 rra_current = rra_start = rra_begin = ftell(rrd_file);
303 /* This is defined in the ANSI C standard, section 7.9.5.3:
305 When a file is opened with udpate mode ('+' as the second
306 or third character in the ... list of mode argument
307 variables), both input and ouptut may be performed on the
308 associated stream. However, ... input may not be directly
309 followed by output without an intervening call to a file
310 positioning function, unless the input oepration encounters
313 fseek(rrd_file, 0, SEEK_END);
314 rrd_filesize = ftell(rrd_file);
315 fseek(rrd_file, rra_current, SEEK_SET);
317 fseek(rrd_file, 0, SEEK_CUR);
321 /* get exclusive lock to whole file.
322 * lock gets removed when we close the file.
324 if (LockRRD(rrd_file) != 0) {
325 rrd_set_error("could not lock RRD");
331 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
332 rrd_set_error("allocating updvals pointer array");
338 if ((pdp_temp = malloc(sizeof(rrd_value_t)
339 *rrd.stat_head->ds_cnt))==NULL){
340 rrd_set_error("allocating pdp_temp ...");
347 if ((tmpl_idx = malloc(sizeof(unsigned long)
348 *(rrd.stat_head->ds_cnt+1)))==NULL){
349 rrd_set_error("allocating tmpl_idx ...");
356 /* initialize template redirector */
357 /* default config example (assume DS 1 is a CDEF DS)
358 tmpl_idx[0] -> 0; (time)
359 tmpl_idx[1] -> 1; (DS 0)
360 tmpl_idx[2] -> 3; (DS 2)
361 tmpl_idx[3] -> 4; (DS 3) */
362 tmpl_idx[0] = 0; /* time */
363 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
365 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
372 unsigned int tmpl_len;
374 tmpl_cnt = 1; /* the first entry is the time */
375 tmpl_len = strlen(template);
376 for(i=0;i<=tmpl_len ;i++) {
377 if (template[i] == ':' || template[i] == '\0') {
379 if (tmpl_cnt>rrd.stat_head->ds_cnt){
380 rrd_set_error("Template contains more DS definitions than RRD");
381 free(updvals); free(pdp_temp);
382 free(tmpl_idx); rrd_free(&rrd);
383 fclose(rrd_file); return(-1);
385 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
386 rrd_set_error("unknown DS name '%s'",dsname);
387 free(updvals); free(pdp_temp);
388 free(tmpl_idx); rrd_free(&rrd);
389 fclose(rrd_file); return(-1);
391 /* the first element is always the time */
392 tmpl_idx[tmpl_cnt-1]++;
393 /* go to the next entry on the template */
394 dsname = &template[i+1];
395 /* fix the damage we did before */
404 if ((pdp_new = malloc(sizeof(rrd_value_t)
405 *rrd.stat_head->ds_cnt))==NULL){
406 rrd_set_error("allocating pdp_new ...");
416 rrd_mmaped_file = mmap(0,
418 PROT_READ | PROT_WRITE,
422 if (rrd_mmaped_file == MAP_FAILED) {
423 rrd_set_error("error mmapping file %s", filename);
432 /* loop through the arguments. */
433 for(arg_i=0; arg_i<argc;arg_i++) {
434 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
435 char *step_start = stepper;
437 char *parsetime_error = NULL;
438 enum {atstyle, normal} timesyntax;
439 struct rrd_time_value ds_tv;
440 if (stepper == NULL){
441 rrd_set_error("failed duplication argv entry");
447 munmap(rrd_mmaped_file, rrd_filesize);
452 /* initialize all ds input to unknown except the first one
453 which has always got to be set */
454 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
455 strcpy(stepper,argv[arg_i]);
457 /* separate all ds elements; first must be examined separately
458 due to alternate time syntax */
459 if ((p=strchr(stepper,'@'))!=NULL) {
460 timesyntax = atstyle;
463 } else if ((p=strchr(stepper,':'))!=NULL) {
468 rrd_set_error("expected timestamp not found in data source from %s:...",
474 updvals[tmpl_idx[ii]] = stepper;
476 if (*stepper == ':') {
480 updvals[tmpl_idx[ii]] = stepper+1;
486 if (ii != tmpl_cnt-1) {
487 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
488 tmpl_cnt-1, ii, argv[arg_i]);
493 /* get the time from the reading ... handle N */
494 if (timesyntax == atstyle) {
495 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
496 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
500 if (ds_tv.type == RELATIVE_TO_END_TIME ||
501 ds_tv.type == RELATIVE_TO_START_TIME) {
502 rrd_set_error("specifying time relative to the 'start' "
503 "or 'end' makes no sense here: %s",
509 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
510 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
512 } else if (strcmp(updvals[0],"N")==0){
513 gettimeofday(&tmp_time, 0);
514 normalize_time(&tmp_time);
515 current_time = tmp_time.tv_sec;
516 current_time_usec = tmp_time.tv_usec;
519 tmp = strtod(updvals[0], 0);
520 current_time = floor(tmp);
521 current_time_usec = (long)((tmp - current_time) * 1000000L);
523 /* dont do any correction for old version RRDs */
525 current_time_usec = 0;
527 if(current_time <= rrd.live_head->last_up){
528 rrd_set_error("illegal attempt to update using time %ld when "
529 "last update time is %ld (minimum one second step)",
530 current_time, rrd.live_head->last_up);
536 /* seek to the beginning of the rra's */
537 if (rra_current != rra_begin) {
539 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
540 rrd_set_error("seek error in rrd");
545 rra_current = rra_begin;
547 rra_start = rra_begin;
549 /* when was the current pdp started */
550 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
551 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
553 /* when did the last pdp_st occur */
554 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
555 occu_pdp_st = current_time - occu_pdp_age;
556 /* interval = current_time - rrd.live_head->last_up; */
557 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
559 if (occu_pdp_st > proc_pdp_st){
560 /* OK we passed the pdp_st moment*/
561 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
562 * occurred before the latest
564 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
565 post_int = occu_pdp_age; /* how much after it */
566 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
580 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
581 occu_pdp_age, occu_pdp_st,
582 interval, pre_int, post_int);
585 /* process the data sources and update the pdp_prep
586 * area accordingly */
587 for(i=0;i<rrd.stat_head->ds_cnt;i++){
589 dst_idx= dst_conv(rrd.ds_def[i].dst);
590 /* NOTE: DST_CDEF should never enter this if block, because
591 * updvals[i+1][0] is initialized to 'U'; unless the caller
592 * accidently specified a value for the DST_CDEF. To handle
593 * this case, an extra check is required. */
594 if((updvals[i+1][0] != 'U') &&
595 (dst_idx != DST_CDEF) &&
596 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
598 /* the data source type defines how to process the data */
599 /* pdp_new contains rate * time ... eg the bytes
600 * transferred during the interval. Doing it this way saves
601 * a lot of math operations */
607 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
608 for(ii=0;updvals[i+1][ii] != '\0';ii++){
609 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
610 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
614 if (rrd_test_error()){
617 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
618 if(dst_idx == DST_COUNTER) {
619 /* simple overflow catcher suggested by Andres Kroonmaa */
620 /* this will fail terribly for non 32 or 64 bit counters ... */
621 /* are there any others in SNMP land ? */
622 if (pdp_new[i] < (double)0.0 )
623 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
624 if (pdp_new[i] < (double)0.0 )
625 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
627 rate = pdp_new[i] / interval;
635 pdp_new[i] = strtod(updvals[i+1],&endptr);
637 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
640 if (endptr[0] != '\0'){
641 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
644 rate = pdp_new[i] / interval;
648 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
650 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
653 if (endptr[0] != '\0'){
654 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
657 rate = pdp_new[i] / interval;
660 rrd_set_error("rrd contains unknown DS type : '%s'",
664 /* break out of this for loop if the error string is set */
665 if (rrd_test_error()){
668 /* make sure pdp_temp is neither too large or too small
669 * if any of these occur it becomes unknown ...
671 if ( ! isnan(rate) &&
672 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
673 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
674 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
675 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
679 /* no news is news all the same */
683 /* make a copy of the command line argument for the next run */
691 rrd.pdp_prep[i].last_ds,
692 updvals[i+1], pdp_new[i]);
694 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
695 strncpy(rrd.pdp_prep[i].last_ds,
696 updvals[i+1],LAST_DS_LEN-1);
697 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
700 /* break out of the argument parsing loop if the error_string is set */
701 if (rrd_test_error()){
705 /* has a pdp_st moment occurred since the last run ? */
707 if (proc_pdp_st == occu_pdp_st){
708 /* no we have not passed a pdp_st moment. therefore update is simple */
710 for(i=0;i<rrd.stat_head->ds_cnt;i++){
711 if(isnan(pdp_new[i]))
712 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
714 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
721 rrd.pdp_prep[i].scratch[PDP_val].u_val,
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
726 /* an pdp_st has occurred. */
728 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
729 * occurred up to the last run.
730 pdp_new[] contains rate*seconds from the latest run.
731 pdp_temp[] will contain the rate for cdp */
733 for(i=0;i<rrd.stat_head->ds_cnt;i++){
734 /* update pdp_prep to the current pdp_st */
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
738 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
739 pdp_new[i]/(double)interval*(double)pre_int;
741 /* if too much of the pdp_prep is unknown we dump it */
742 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
743 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
744 (occu_pdp_st-proc_pdp_st <=
745 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
748 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
749 / (double)( occu_pdp_st
751 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
754 /* process CDEF data sources; remember each CDEF DS can
755 * only reference other DS with a lower index number */
756 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
758 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
759 /* substitue data values for OP_VARIABLE nodes */
760 for (ii = 0; rpnp[ii].op != OP_END; ii++)
762 if (rpnp[ii].op == OP_VARIABLE) {
763 rpnp[ii].op = OP_NUMBER;
764 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
767 /* run the rpn calculator */
768 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
770 break; /* exits the data sources pdp_temp loop */
774 /* make pdp_prep ready for the next run */
775 if(isnan(pdp_new[i])){
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
777 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
779 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
780 rrd.pdp_prep[i].scratch[PDP_val].u_val =
781 pdp_new[i]/(double)interval*(double)post_int;
789 "new_unkn_sec %5lu\n",
791 rrd.pdp_prep[i].scratch[PDP_val].u_val,
792 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
796 /* if there were errors during the last loop, bail out here */
797 if (rrd_test_error()){
802 /* compute the number of elapsed pdp_st moments */
803 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
805 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
807 if (rra_step_cnt == NULL)
809 rra_step_cnt = (unsigned long *)
810 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
813 for(i = 0, rra_start = rra_begin;
814 i < rrd.stat_head->rra_cnt;
815 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
818 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
819 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
820 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
821 if (start_pdp_offset <= elapsed_pdp_st) {
822 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
823 rrd.rra_def[i].pdp_cnt + 1;
828 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
830 /* If this is a bulk update, we need to skip ahead in the seasonal
831 * arrays so that they will be correct for the next observed value;
832 * note that for the bulk update itself, no update will occur to
833 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
835 if (rra_step_cnt[i] > 2)
837 /* skip update by resetting rra_step_cnt[i],
838 * note that this is not data source specific; this is due
839 * to the bulk update, not a DNAN value for the specific data
842 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
843 &last_seasonal_coef);
844 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
848 /* periodically run a smoother for seasonal effects */
849 /* Need to use first cdp parameter buffer to track
850 * burnin (burnin requires a specific smoothing schedule).
851 * The CDP_init_seasonal parameter is really an RRA level,
852 * not a data source within RRA level parameter, but the rra_def
853 * is read only for rrd_update (not flushed to disk). */
854 iii = i*(rrd.stat_head -> ds_cnt);
855 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
858 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
859 > rrd.rra_def[i].row_cnt - 1) {
860 /* mark off one of the burnin cycles */
861 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
865 /* someone has no doubt invented a trick to deal with this
866 * wrap around, but at least this code is clear. */
867 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
868 rrd.rra_ptr[i].cur_row)
870 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
871 * mapping between PDP and CDP */
872 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
873 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
877 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
879 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
884 /* can't rely on negative numbers because we are working with
886 /* Don't need modulus here. If we've wrapped more than once, only
887 * one smooth is executed at the end. */
888 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
889 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
890 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
894 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
895 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
896 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
903 rra_current = ftell(rrd_file);
904 } /* if cf is DEVSEASONAL or SEASONAL */
906 if (rrd_test_error()) break;
908 /* update CDP_PREP areas */
909 /* loop over data soures within each RRA */
911 ii < rrd.stat_head->ds_cnt;
915 /* iii indexes the CDP prep area for this data source within the RRA */
916 iii=i*rrd.stat_head->ds_cnt+ii;
918 if (rrd.rra_def[i].pdp_cnt > 1) {
920 if (rra_step_cnt[i] > 0) {
921 /* If we are in this block, as least 1 CDP value will be written to
922 * disk, this is the CDP_primary_val entry. If more than 1 value needs
923 * to be written, then the "fill in" value is the CDP_secondary_val
925 if (isnan(pdp_temp[ii]))
927 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
928 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
930 /* CDP_secondary value is the RRA "fill in" value for intermediary
931 * CDP data entries. No matter the CF, the value is the same because
932 * the average, max, min, and last of a list of identical values is
933 * the same, namely, the value itself. */
934 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
937 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
938 > rrd.rra_def[i].pdp_cnt*
939 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
941 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
942 /* initialize carry over */
943 if (current_cf == CF_AVERAGE) {
944 if (isnan(pdp_temp[ii])) {
945 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
947 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
948 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
951 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
954 rrd_value_t cum_val, cur_val;
955 switch (current_cf) {
957 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
958 cur_val = IFDNAN(pdp_temp[ii],0.0);
959 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
960 (cum_val + cur_val * start_pdp_offset) /
961 (rrd.rra_def[i].pdp_cnt
962 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
963 /* initialize carry over value */
964 if (isnan(pdp_temp[ii])) {
965 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
968 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
972 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
973 cur_val = IFDNAN(pdp_temp[ii],-DINF);
975 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
976 isnan(pdp_temp[ii])) {
978 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
983 if (cur_val > cum_val)
984 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
987 /* initialize carry over value */
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
992 cur_val = IFDNAN(pdp_temp[ii],DINF);
994 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
995 isnan(pdp_temp[ii])) {
997 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1002 if (cur_val < cum_val)
1003 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1005 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1006 /* initialize carry over value */
1007 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1012 /* initialize carry over value */
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016 } /* endif meets xff value requirement for a valid value */
1017 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1018 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1019 if (isnan(pdp_temp[ii]))
1020 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1021 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1023 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1024 } else /* rra_step_cnt[i] == 0 */
1027 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1028 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1031 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1032 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1035 if (isnan(pdp_temp[ii])) {
1036 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1037 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1039 if (current_cf == CF_AVERAGE) {
1040 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1043 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1047 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1050 switch (current_cf) {
1052 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1056 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1057 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1070 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1071 if (elapsed_pdp_st > 2)
1073 switch (current_cf) {
1076 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1077 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1080 case CF_DEVSEASONAL:
1081 /* need to update cached seasonal values, so they are consistent
1082 * with the bulk update */
1083 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1084 * CDP_last_deviation are the same. */
1085 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1086 last_seasonal_coef[ii];
1087 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1091 /* need to update the null_count and last_null_count.
1092 * even do this for non-DNAN pdp_temp because the
1093 * algorithm is not learning from batch updates. */
1094 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1096 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1100 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1101 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1104 /* do not count missed bulk values as failures */
1105 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1106 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1107 /* need to reset violations buffer.
1108 * could do this more carefully, but for now, just
1109 * assume a bulk update wipes away all violations. */
1110 erase_violations(&rrd, iii, i);
1114 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1116 if (rrd_test_error()) break;
1118 } /* endif data sources loop */
1119 } /* end RRA Loop */
1121 /* this loop is only entered if elapsed_pdp_st < 3 */
1122 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1123 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1125 for(i = 0, rra_start = rra_begin;
1126 i < rrd.stat_head->rra_cnt;
1127 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1130 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1132 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1133 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1135 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1136 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1138 rra_current = ftell(rrd_file);
1140 if (rrd_test_error()) break;
1141 /* loop over data soures within each RRA */
1143 ii < rrd.stat_head->ds_cnt;
1146 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1147 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1148 scratch_idx, seasonal_coef);
1150 } /* end RRA Loop */
1151 if (rrd_test_error()) break;
1152 } /* end elapsed_pdp_st loop */
1154 if (rrd_test_error()) break;
1156 /* Ready to write to disk */
1157 /* Move sequentially through the file, writing one RRA at a time.
1158 * Note this architecture divorces the computation of CDP with
1159 * flushing updated RRA entries to disk. */
1160 for(i = 0, rra_start = rra_begin;
1161 i < rrd.stat_head->rra_cnt;
1162 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1164 /* is there anything to write for this RRA? If not, continue. */
1165 if (rra_step_cnt[i] == 0) continue;
1167 /* write the first row */
1169 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1171 rrd.rra_ptr[i].cur_row++;
1172 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1173 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1174 /* positition on the first row */
1175 rra_pos_tmp = rra_start +
1176 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1177 if(rra_pos_tmp != rra_current) {
1179 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1180 rrd_set_error("seek error in rrd");
1184 rra_current = rra_pos_tmp;
1188 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1190 scratch_idx = CDP_primary_val;
1191 if (pcdp_summary != NULL)
1193 rra_time = (current_time - current_time
1194 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1195 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1198 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1199 pcdp_summary, &rra_time, rrd_mmaped_file);
1201 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1202 pcdp_summary, &rra_time);
1204 if (rrd_test_error()) break;
1206 /* write other rows of the bulk update, if any */
1207 scratch_idx = CDP_secondary_val;
1208 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1210 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1213 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1214 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1217 rrd.rra_ptr[i].cur_row = 0;
1218 /* seek back to beginning of current rra */
1219 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1221 rrd_set_error("seek error in rrd");
1225 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1227 rra_current = rra_start;
1229 if (pcdp_summary != NULL)
1231 rra_time = (current_time - current_time
1232 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1236 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1237 pcdp_summary, &rra_time, rrd_mmaped_file);
1239 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 pcdp_summary, &rra_time);
1244 if (rrd_test_error())
1248 /* break out of the argument parsing loop if error_string is set */
1249 if (rrd_test_error()){
1254 } /* endif a pdp_st has occurred */
1255 rrd.live_head->last_up = current_time;
1256 rrd.live_head->last_up_usec = current_time_usec;
1258 } /* function argument loop */
1260 if (seasonal_coef != NULL) free(seasonal_coef);
1261 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1262 if (rra_step_cnt != NULL) free(rra_step_cnt);
1263 rpnstack_free(&rpnstack);
1266 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1267 rrd_set_error("error writing(unmapping) file: %s", filename);
1270 /* if we got here and if there is an error and if the file has not been
1271 * written to, then close things up and return. */
1272 if (rrd_test_error()) {
1282 /* aargh ... that was tough ... so many loops ... anyway, its done.
1283 * we just need to write back the live header portion now*/
1285 if (fseek(rrd_file, (sizeof(stat_head_t)
1286 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1287 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1289 rrd_set_error("seek rrd for live header writeback");
1300 if(fwrite( rrd.live_head,
1301 sizeof(live_head_t), 1, rrd_file) != 1){
1302 rrd_set_error("fwrite live_head to rrd");
1313 if(fwrite( &rrd.live_head->last_up,
1314 sizeof(time_t), 1, rrd_file) != 1){
1315 rrd_set_error("fwrite live_head to rrd");
1327 if(fwrite( rrd.pdp_prep,
1329 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1330 rrd_set_error("ftwrite pdp_prep to rrd");
1340 if(fwrite( rrd.cdp_prep,
1342 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1343 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1345 rrd_set_error("ftwrite cdp_prep to rrd");
1355 if(fwrite( rrd.rra_ptr,
1357 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1358 rrd_set_error("fwrite rra_ptr to rrd");
1368 /* OK now close the files and free the memory */
1369 if(fclose(rrd_file) != 0){
1370 rrd_set_error("closing rrd");
1379 /* calling the smoothing code here guarantees at most
1380 * one smoothing operation per rrd_update call. Unfortunately,
1381 * it is possible with bulk updates, or a long-delayed update
1382 * for smoothing to occur off-schedule. This really isn't
1383 * critical except during the burning cycles. */
1384 if (schedule_smooth)
1386 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1387 rrd_file = fopen(filename,"rb+");
1389 rrd_file = fopen(filename,"r+");
1391 rra_start = rra_begin;
1392 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1394 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1395 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1398 fprintf(stderr,"Running smoother for rra %ld\n",i);
1400 apply_smoother(&rrd,i,rra_start,rrd_file);
1401 if (rrd_test_error())
1404 rra_start += rrd.rra_def[i].row_cnt
1405 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1418 * get exclusive lock to whole file.
1419 * lock gets removed when we close the file
1421 * returns 0 on success
1424 LockRRD(FILE *rrdfile)
1426 int rrd_fd; /* File descriptor for RRD */
1429 rrd_fd = fileno(rrdfile);
1432 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1435 if ( _fstat( rrd_fd, &st ) == 0 ) {
1436 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1442 lock.l_type = F_WRLCK; /* exclusive write lock */
1443 lock.l_len = 0; /* whole file */
1444 lock.l_start = 0; /* start of file */
1445 lock.l_whence = SEEK_SET; /* end of file */
1447 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1457 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1458 unsigned short CDP_scratch_idx, FILE *rrd_file,
1459 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1462 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1463 unsigned short CDP_scratch_idx, FILE *rrd_file,
1464 info_t *pcdp_summary, time_t *rra_time)
1467 unsigned long ds_idx, cdp_idx;
1470 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1472 /* compute the cdp index */
1473 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1475 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1476 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1477 rrd -> rra_def[rra_idx].cf_nam);
1479 if (pcdp_summary != NULL)
1481 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1482 /* append info to the return hash */
1483 pcdp_summary = info_push(pcdp_summary,
1484 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1485 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1486 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1490 memcpy((char *)rrd_mmaped_file + *rra_current,
1491 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1492 sizeof(rrd_value_t));
1494 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1495 sizeof(rrd_value_t),1,rrd_file) != 1)
1497 rrd_set_error("writing rrd");
1501 *rra_current += sizeof(rrd_value_t);
1503 return (pcdp_summary);