1 /*****************************************************************************
2 * RRDtool 1.2.6 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
36 time_t tv_sec; /* seconds */
37 long tv_usec; /* microseconds */
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
47 struct timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
57 * normilize time as returned by gettimeofday. usec part must
60 static void normalize_time(struct timeval *t)
64 t->tv_usec += 1000000L;
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx,
75 FILE UNUSED(*rrd_file),
79 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
82 unsigned long *rra_current,
83 unsigned short CDP_scratch_idx, FILE *rrd_file,
84 info_t *pcdp_summary, time_t *rra_time);
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv,
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95 main(int argc, char **argv){
96 rrd_update(argc,argv);
97 if (rrd_test_error()) {
98 printf("RRDtool 1.2.6 Copyright by Tobi Oetiker, 1997-2005\n\n"
99 "Usage: rrdupdate filename\n"
100 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101 "\t\t\ttime|N:value[:value...]\n\n"
102 "\t\t\tat-time@value[:value...]\n\n"
103 "\t\t\t[ time:value[:value...] ..]\n\n");
105 printf("ERROR: %s\n",rrd_get_error());
113 info_t *rrd_update_v(int argc, char **argv)
115 char *template = NULL;
116 info_t *result = NULL;
118 optind = 0; opterr = 0; /* initialize getopt */
121 static struct option long_options[] =
123 {"template", required_argument, 0, 't'},
126 int option_index = 0;
128 opt = getopt_long(argc, argv, "t:",
129 long_options, &option_index);
140 rrd_set_error("unknown option '%s'",argv[optind-1]);
146 /* need at least 2 arguments: filename, data. */
147 if (argc-optind < 2) {
148 rrd_set_error("Not enough arguments");
152 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153 rc.u_int = _rrd_update(argv[optind], template,
154 argc - optind - 1, argv + optind + 1, result);
155 result->value.u_int = rc.u_int;
161 rrd_update(int argc, char **argv)
163 char *template = NULL;
165 optind = 0; opterr = 0; /* initialize getopt */
168 static struct option long_options[] =
170 {"template", required_argument, 0, 't'},
173 int option_index = 0;
175 opt = getopt_long(argc, argv, "t:",
176 long_options, &option_index);
187 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
199 rc = rrd_update_r(argv[optind], template,
200 argc - optind - 1, argv + optind + 1);
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
207 return _rrd_update(filename, template, argc, argv, NULL);
211 _rrd_update(char *filename, char *template, int argc, char **argv,
212 info_t *pcdp_summary)
217 unsigned long i,ii,iii=1;
219 unsigned long rra_begin; /* byte pointer to the rra
220 * area in the rrd file. this
221 * pointer never changes value */
222 unsigned long rra_start; /* byte pointer to the rra
223 * area in the rrd file. this
224 * pointer changes as each rrd is
226 unsigned long rra_current; /* byte pointer to the current write
227 * spot in the rrd file. */
228 unsigned long rra_pos_tmp; /* temporary byte pointer. */
230 pre_int,post_int; /* interval between this and
232 unsigned long proc_pdp_st; /* which pdp_st was the last
234 unsigned long occu_pdp_st; /* when was the pdp_st
235 * before the last update
237 unsigned long proc_pdp_age; /* how old was the data in
238 * the pdp prep area when it
239 * was last updated */
240 unsigned long occu_pdp_age; /* how long ago was the last
242 rrd_value_t *pdp_new; /* prepare the incoming data
243 * to be added the the
245 rrd_value_t *pdp_temp; /* prepare the pdp values
246 * to be added the the
249 long *tmpl_idx; /* index representing the settings
250 transported by the template index */
251 unsigned long tmpl_cnt = 2; /* time and data */
256 time_t rra_time; /* time of update for a RRA */
257 unsigned long current_time_usec; /* microseconds part of current time */
258 struct timeval tmp_time; /* used for time conversion */
261 int schedule_smooth = 0;
262 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263 /* a vector of future Holt-Winters seasonal coefs */
264 unsigned long elapsed_pdp_st;
265 /* number of elapsed PDP steps since last update */
266 unsigned long *rra_step_cnt = NULL;
267 /* number of rows to be updated in an RRA for a data
269 unsigned long start_pdp_offset;
270 /* number of PDP steps since the last update that
271 * are assigned to the first CDP to be generated
272 * since the last update. */
273 unsigned short scratch_idx;
274 /* index into the CDP scratch array */
275 enum cf_en current_cf;
276 /* numeric id of the current consolidation function */
277 rpnstack_t rpnstack; /* used for COMPUTE DS */
278 int version; /* rrd version */
279 char *endptr; /* used in the conversion */
281 void *rrd_mmaped_file;
282 unsigned long rrd_filesize;
285 rpnstack_init(&rpnstack);
287 /* need at least 1 arguments: data. */
289 rrd_set_error("Not enough arguments");
295 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
298 /* initialize time */
299 version = atoi(rrd.stat_head->version);
300 gettimeofday(&tmp_time, 0);
301 normalize_time(&tmp_time);
302 current_time = tmp_time.tv_sec;
304 current_time_usec = tmp_time.tv_usec;
307 current_time_usec = 0;
310 rra_current = rra_start = rra_begin = ftell(rrd_file);
311 /* This is defined in the ANSI C standard, section 7.9.5.3:
313 When a file is opened with udpate mode ('+' as the second
314 or third character in the ... list of mode argument
315 variables), both input and ouptut may be performed on the
316 associated stream. However, ... input may not be directly
317 followed by output without an intervening call to a file
318 positioning function, unless the input oepration encounters
321 fseek(rrd_file, 0, SEEK_END);
322 rrd_filesize = ftell(rrd_file);
323 fseek(rrd_file, rra_current, SEEK_SET);
325 fseek(rrd_file, 0, SEEK_CUR);
329 /* get exclusive lock to whole file.
330 * lock gets removed when we close the file.
332 if (LockRRD(rrd_file) != 0) {
333 rrd_set_error("could not lock RRD");
339 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340 rrd_set_error("allocating updvals pointer array");
346 if ((pdp_temp = malloc(sizeof(rrd_value_t)
347 *rrd.stat_head->ds_cnt))==NULL){
348 rrd_set_error("allocating pdp_temp ...");
355 if ((tmpl_idx = malloc(sizeof(unsigned long)
356 *(rrd.stat_head->ds_cnt+1)))==NULL){
357 rrd_set_error("allocating tmpl_idx ...");
364 /* initialize template redirector */
365 /* default config example (assume DS 1 is a CDEF DS)
366 tmpl_idx[0] -> 0; (time)
367 tmpl_idx[1] -> 1; (DS 0)
368 tmpl_idx[2] -> 3; (DS 2)
369 tmpl_idx[3] -> 4; (DS 3) */
370 tmpl_idx[0] = 0; /* time */
371 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
373 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
380 unsigned int tmpl_len;
382 tmpl_cnt = 1; /* the first entry is the time */
383 tmpl_len = strlen(template);
384 for(i=0;i<=tmpl_len ;i++) {
385 if (template[i] == ':' || template[i] == '\0') {
387 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388 rrd_set_error("Template contains more DS definitions than RRD");
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
393 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394 rrd_set_error("unknown DS name '%s'",dsname);
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
399 /* the first element is always the time */
400 tmpl_idx[tmpl_cnt-1]++;
401 /* go to the next entry on the template */
402 dsname = &template[i+1];
403 /* fix the damage we did before */
412 if ((pdp_new = malloc(sizeof(rrd_value_t)
413 *rrd.stat_head->ds_cnt))==NULL){
414 rrd_set_error("allocating pdp_new ...");
424 rrd_mmaped_file = mmap(0,
426 PROT_READ | PROT_WRITE,
430 if (rrd_mmaped_file == MAP_FAILED) {
431 rrd_set_error("error mmapping file %s", filename);
440 /* loop through the arguments. */
441 for(arg_i=0; arg_i<argc;arg_i++) {
442 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443 char *step_start = stepper;
445 char *parsetime_error = NULL;
446 enum {atstyle, normal} timesyntax;
447 struct rrd_time_value ds_tv;
448 if (stepper == NULL){
449 rrd_set_error("failed duplication argv entry");
455 munmap(rrd_mmaped_file, rrd_filesize);
460 /* initialize all ds input to unknown except the first one
461 which has always got to be set */
462 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463 strcpy(stepper,argv[arg_i]);
465 /* separate all ds elements; first must be examined separately
466 due to alternate time syntax */
467 if ((p=strchr(stepper,'@'))!=NULL) {
468 timesyntax = atstyle;
471 } else if ((p=strchr(stepper,':'))!=NULL) {
476 rrd_set_error("expected timestamp not found in data source from %s:...",
482 updvals[tmpl_idx[ii]] = stepper;
484 if (*stepper == ':') {
488 updvals[tmpl_idx[ii]] = stepper+1;
494 if (ii != tmpl_cnt-1) {
495 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496 tmpl_cnt-1, ii, argv[arg_i]);
501 /* get the time from the reading ... handle N */
502 if (timesyntax == atstyle) {
503 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
508 if (ds_tv.type == RELATIVE_TO_END_TIME ||
509 ds_tv.type == RELATIVE_TO_START_TIME) {
510 rrd_set_error("specifying time relative to the 'start' "
511 "or 'end' makes no sense here: %s",
517 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
520 } else if (strcmp(updvals[0],"N")==0){
521 gettimeofday(&tmp_time, 0);
522 normalize_time(&tmp_time);
523 current_time = tmp_time.tv_sec;
524 current_time_usec = tmp_time.tv_usec;
527 tmp = strtod(updvals[0], 0);
528 current_time = floor(tmp);
529 current_time_usec = (long)((tmp - current_time) * 1000000L);
531 /* dont do any correction for old version RRDs */
533 current_time_usec = 0;
535 if(current_time <= rrd.live_head->last_up){
536 rrd_set_error("illegal attempt to update using time %ld when "
537 "last update time is %ld (minimum one second step)",
538 current_time, rrd.live_head->last_up);
544 /* seek to the beginning of the rra's */
545 if (rra_current != rra_begin) {
547 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 rrd_set_error("seek error in rrd");
553 rra_current = rra_begin;
555 rra_start = rra_begin;
557 /* when was the current pdp started */
558 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561 /* when did the last pdp_st occur */
562 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 occu_pdp_st = current_time - occu_pdp_age;
564 /* interval = current_time - rrd.live_head->last_up; */
565 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
567 if (occu_pdp_st > proc_pdp_st){
568 /* OK we passed the pdp_st moment*/
569 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570 * occurred before the latest
572 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573 post_int = occu_pdp_age; /* how much after it */
574 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
588 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
589 occu_pdp_age, occu_pdp_st,
590 interval, pre_int, post_int);
593 /* process the data sources and update the pdp_prep
594 * area accordingly */
595 for(i=0;i<rrd.stat_head->ds_cnt;i++){
597 dst_idx= dst_conv(rrd.ds_def[i].dst);
598 /* NOTE: DST_CDEF should never enter this if block, because
599 * updvals[i+1][0] is initialized to 'U'; unless the caller
600 * accidently specified a value for the DST_CDEF. To handle
601 * this case, an extra check is required. */
602 if((updvals[i+1][0] != 'U') &&
603 (dst_idx != DST_CDEF) &&
604 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
606 /* the data source type defines how to process the data */
607 /* pdp_new contains rate * time ... eg the bytes
608 * transferred during the interval. Doing it this way saves
609 * a lot of math operations */
615 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
616 for(ii=0;updvals[i+1][ii] != '\0';ii++){
617 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
618 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
622 if (rrd_test_error()){
625 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
626 if(dst_idx == DST_COUNTER) {
627 /* simple overflow catcher suggested by Andres Kroonmaa */
628 /* this will fail terribly for non 32 or 64 bit counters ... */
629 /* are there any others in SNMP land ? */
630 if (pdp_new[i] < (double)0.0 )
631 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
632 if (pdp_new[i] < (double)0.0 )
633 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
635 rate = pdp_new[i] / interval;
643 pdp_new[i] = strtod(updvals[i+1],&endptr);
645 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
648 if (endptr[0] != '\0'){
649 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
652 rate = pdp_new[i] / interval;
656 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
658 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661 if (endptr[0] != '\0'){
662 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665 rate = pdp_new[i] / interval;
668 rrd_set_error("rrd contains unknown DS type : '%s'",
672 /* break out of this for loop if the error string is set */
673 if (rrd_test_error()){
676 /* make sure pdp_temp is neither too large or too small
677 * if any of these occur it becomes unknown ...
679 if ( ! isnan(rate) &&
680 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
681 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
682 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
683 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
687 /* no news is news all the same */
691 /* make a copy of the command line argument for the next run */
699 rrd.pdp_prep[i].last_ds,
700 updvals[i+1], pdp_new[i]);
702 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
703 strncpy(rrd.pdp_prep[i].last_ds,
704 updvals[i+1],LAST_DS_LEN-1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
708 /* break out of the argument parsing loop if the error_string is set */
709 if (rrd_test_error()){
713 /* has a pdp_st moment occurred since the last run ? */
715 if (proc_pdp_st == occu_pdp_st){
716 /* no we have not passed a pdp_st moment. therefore update is simple */
718 for(i=0;i<rrd.stat_head->ds_cnt;i++){
719 if(isnan(pdp_new[i]))
720 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
722 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
729 rrd.pdp_prep[i].scratch[PDP_val].u_val,
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
734 /* an pdp_st has occurred. */
736 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
737 * occurred up to the last run.
738 pdp_new[] contains rate*seconds from the latest run.
739 pdp_temp[] will contain the rate for cdp */
741 for(i=0;i<rrd.stat_head->ds_cnt;i++){
742 /* update pdp_prep to the current pdp_st */
743 if(isnan(pdp_new[i]))
744 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
746 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
747 pdp_new[i]/(double)interval*(double)pre_int;
749 /* if too much of the pdp_prep is unknown we dump it */
750 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
751 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
752 (occu_pdp_st-proc_pdp_st <=
753 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
756 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
757 / (double)( occu_pdp_st
759 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
762 /* process CDEF data sources; remember each CDEF DS can
763 * only reference other DS with a lower index number */
764 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
766 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
767 /* substitue data values for OP_VARIABLE nodes */
768 for (ii = 0; rpnp[ii].op != OP_END; ii++)
770 if (rpnp[ii].op == OP_VARIABLE) {
771 rpnp[ii].op = OP_NUMBER;
772 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
775 /* run the rpn calculator */
776 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
778 break; /* exits the data sources pdp_temp loop */
782 /* make pdp_prep ready for the next run */
783 if(isnan(pdp_new[i])){
784 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
785 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
787 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
788 rrd.pdp_prep[i].scratch[PDP_val].u_val =
789 pdp_new[i]/(double)interval*(double)post_int;
797 "new_unkn_sec %5lu\n",
799 rrd.pdp_prep[i].scratch[PDP_val].u_val,
800 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
804 /* if there were errors during the last loop, bail out here */
805 if (rrd_test_error()){
810 /* compute the number of elapsed pdp_st moments */
811 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
813 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
815 if (rra_step_cnt == NULL)
817 rra_step_cnt = (unsigned long *)
818 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
821 for(i = 0, rra_start = rra_begin;
822 i < rrd.stat_head->rra_cnt;
823 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
826 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
827 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
828 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
829 if (start_pdp_offset <= elapsed_pdp_st) {
830 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
831 rrd.rra_def[i].pdp_cnt + 1;
836 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
838 /* If this is a bulk update, we need to skip ahead in the seasonal
839 * arrays so that they will be correct for the next observed value;
840 * note that for the bulk update itself, no update will occur to
841 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
843 if (rra_step_cnt[i] > 2)
845 /* skip update by resetting rra_step_cnt[i],
846 * note that this is not data source specific; this is due
847 * to the bulk update, not a DNAN value for the specific data
850 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
851 &last_seasonal_coef);
852 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
856 /* periodically run a smoother for seasonal effects */
857 /* Need to use first cdp parameter buffer to track
858 * burnin (burnin requires a specific smoothing schedule).
859 * The CDP_init_seasonal parameter is really an RRA level,
860 * not a data source within RRA level parameter, but the rra_def
861 * is read only for rrd_update (not flushed to disk). */
862 iii = i*(rrd.stat_head -> ds_cnt);
863 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
866 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
867 > rrd.rra_def[i].row_cnt - 1) {
868 /* mark off one of the burnin cycles */
869 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
873 /* someone has no doubt invented a trick to deal with this
874 * wrap around, but at least this code is clear. */
875 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
876 rrd.rra_ptr[i].cur_row)
878 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
879 * mapping between PDP and CDP */
880 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
881 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
885 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
886 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
887 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
892 /* can't rely on negative numbers because we are working with
894 /* Don't need modulus here. If we've wrapped more than once, only
895 * one smooth is executed at the end. */
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
897 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
898 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
902 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
904 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
911 rra_current = ftell(rrd_file);
912 } /* if cf is DEVSEASONAL or SEASONAL */
914 if (rrd_test_error()) break;
916 /* update CDP_PREP areas */
917 /* loop over data soures within each RRA */
919 ii < rrd.stat_head->ds_cnt;
923 /* iii indexes the CDP prep area for this data source within the RRA */
924 iii=i*rrd.stat_head->ds_cnt+ii;
926 if (rrd.rra_def[i].pdp_cnt > 1) {
928 if (rra_step_cnt[i] > 0) {
929 /* If we are in this block, as least 1 CDP value will be written to
930 * disk, this is the CDP_primary_val entry. If more than 1 value needs
931 * to be written, then the "fill in" value is the CDP_secondary_val
933 if (isnan(pdp_temp[ii]))
935 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
936 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
938 /* CDP_secondary value is the RRA "fill in" value for intermediary
939 * CDP data entries. No matter the CF, the value is the same because
940 * the average, max, min, and last of a list of identical values is
941 * the same, namely, the value itself. */
942 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
945 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
946 > rrd.rra_def[i].pdp_cnt*
947 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
949 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
950 /* initialize carry over */
951 if (current_cf == CF_AVERAGE) {
952 if (isnan(pdp_temp[ii])) {
953 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
955 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
956 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
959 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
962 rrd_value_t cum_val, cur_val;
963 switch (current_cf) {
965 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
966 cur_val = IFDNAN(pdp_temp[ii],0.0);
967 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
968 (cum_val + cur_val * start_pdp_offset) /
969 (rrd.rra_def[i].pdp_cnt
970 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
971 /* initialize carry over value */
972 if (isnan(pdp_temp[ii])) {
973 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
980 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
981 cur_val = IFDNAN(pdp_temp[ii],-DINF);
983 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
984 isnan(pdp_temp[ii])) {
986 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
991 if (cur_val > cum_val)
992 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
994 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
995 /* initialize carry over value */
996 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
999 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1000 cur_val = IFDNAN(pdp_temp[ii],DINF);
1002 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1003 isnan(pdp_temp[ii])) {
1005 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1010 if (cur_val < cum_val)
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1013 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1014 /* initialize carry over value */
1015 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1019 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1020 /* initialize carry over value */
1021 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1024 } /* endif meets xff value requirement for a valid value */
1025 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1026 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1027 if (isnan(pdp_temp[ii]))
1028 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1029 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1031 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1032 } else /* rra_step_cnt[i] == 0 */
1035 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1036 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1039 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1040 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1043 if (isnan(pdp_temp[ii])) {
1044 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1045 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1047 if (current_cf == CF_AVERAGE) {
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1051 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1054 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1055 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1058 switch (current_cf) {
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1064 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1069 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1079 if (elapsed_pdp_st > 2)
1081 switch (current_cf) {
1084 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1085 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1088 case CF_DEVSEASONAL:
1089 /* need to update cached seasonal values, so they are consistent
1090 * with the bulk update */
1091 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1092 * CDP_last_deviation are the same. */
1093 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1094 last_seasonal_coef[ii];
1095 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1099 /* need to update the null_count and last_null_count.
1100 * even do this for non-DNAN pdp_temp because the
1101 * algorithm is not learning from batch updates. */
1102 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1104 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1108 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1109 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1112 /* do not count missed bulk values as failures */
1113 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1114 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1115 /* need to reset violations buffer.
1116 * could do this more carefully, but for now, just
1117 * assume a bulk update wipes away all violations. */
1118 erase_violations(&rrd, iii, i);
1122 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1124 if (rrd_test_error()) break;
1126 } /* endif data sources loop */
1127 } /* end RRA Loop */
1129 /* this loop is only entered if elapsed_pdp_st < 3 */
1130 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1131 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1133 for(i = 0, rra_start = rra_begin;
1134 i < rrd.stat_head->rra_cnt;
1135 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1138 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1140 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1141 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1143 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1144 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1146 rra_current = ftell(rrd_file);
1148 if (rrd_test_error()) break;
1149 /* loop over data soures within each RRA */
1151 ii < rrd.stat_head->ds_cnt;
1154 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1155 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1156 scratch_idx, seasonal_coef);
1158 } /* end RRA Loop */
1159 if (rrd_test_error()) break;
1160 } /* end elapsed_pdp_st loop */
1162 if (rrd_test_error()) break;
1164 /* Ready to write to disk */
1165 /* Move sequentially through the file, writing one RRA at a time.
1166 * Note this architecture divorces the computation of CDP with
1167 * flushing updated RRA entries to disk. */
1168 for(i = 0, rra_start = rra_begin;
1169 i < rrd.stat_head->rra_cnt;
1170 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1172 /* is there anything to write for this RRA? If not, continue. */
1173 if (rra_step_cnt[i] == 0) continue;
1175 /* write the first row */
1177 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1179 rrd.rra_ptr[i].cur_row++;
1180 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1181 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1182 /* positition on the first row */
1183 rra_pos_tmp = rra_start +
1184 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1185 if(rra_pos_tmp != rra_current) {
1187 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1188 rrd_set_error("seek error in rrd");
1192 rra_current = rra_pos_tmp;
1196 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1198 scratch_idx = CDP_primary_val;
1199 if (pcdp_summary != NULL)
1201 rra_time = (current_time - current_time
1202 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1203 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1206 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1207 pcdp_summary, &rra_time, rrd_mmaped_file);
1209 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1210 pcdp_summary, &rra_time);
1212 if (rrd_test_error()) break;
1214 /* write other rows of the bulk update, if any */
1215 scratch_idx = CDP_secondary_val;
1216 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1218 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1221 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1222 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1225 rrd.rra_ptr[i].cur_row = 0;
1226 /* seek back to beginning of current rra */
1227 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1229 rrd_set_error("seek error in rrd");
1233 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1235 rra_current = rra_start;
1237 if (pcdp_summary != NULL)
1239 rra_time = (current_time - current_time
1240 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1241 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1244 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1245 pcdp_summary, &rra_time, rrd_mmaped_file);
1247 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248 pcdp_summary, &rra_time);
1252 if (rrd_test_error())
1256 /* break out of the argument parsing loop if error_string is set */
1257 if (rrd_test_error()){
1262 } /* endif a pdp_st has occurred */
1263 rrd.live_head->last_up = current_time;
1264 rrd.live_head->last_up_usec = current_time_usec;
1266 } /* function argument loop */
1268 if (seasonal_coef != NULL) free(seasonal_coef);
1269 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1270 if (rra_step_cnt != NULL) free(rra_step_cnt);
1271 rpnstack_free(&rpnstack);
1274 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1275 rrd_set_error("error writing(unmapping) file: %s", filename);
1278 /* if we got here and if there is an error and if the file has not been
1279 * written to, then close things up and return. */
1280 if (rrd_test_error()) {
1290 /* aargh ... that was tough ... so many loops ... anyway, its done.
1291 * we just need to write back the live header portion now*/
1293 if (fseek(rrd_file, (sizeof(stat_head_t)
1294 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1295 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1297 rrd_set_error("seek rrd for live header writeback");
1308 if(fwrite( rrd.live_head,
1309 sizeof(live_head_t), 1, rrd_file) != 1){
1310 rrd_set_error("fwrite live_head to rrd");
1321 if(fwrite( &rrd.live_head->last_up,
1322 sizeof(time_t), 1, rrd_file) != 1){
1323 rrd_set_error("fwrite live_head to rrd");
1335 if(fwrite( rrd.pdp_prep,
1337 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338 rrd_set_error("ftwrite pdp_prep to rrd");
1348 if(fwrite( rrd.cdp_prep,
1350 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1351 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1353 rrd_set_error("ftwrite cdp_prep to rrd");
1363 if(fwrite( rrd.rra_ptr,
1365 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366 rrd_set_error("fwrite rra_ptr to rrd");
1376 /* OK now close the files and free the memory */
1377 if(fclose(rrd_file) != 0){
1378 rrd_set_error("closing rrd");
1387 /* calling the smoothing code here guarantees at most
1388 * one smoothing operation per rrd_update call. Unfortunately,
1389 * it is possible with bulk updates, or a long-delayed update
1390 * for smoothing to occur off-schedule. This really isn't
1391 * critical except during the burning cycles. */
1392 if (schedule_smooth)
1394 rrd_file = fopen(filename,"rb+");
1395 rra_start = rra_begin;
1396 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1398 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1399 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1402 fprintf(stderr,"Running smoother for rra %ld\n",i);
1404 apply_smoother(&rrd,i,rra_start,rrd_file);
1405 if (rrd_test_error())
1408 rra_start += rrd.rra_def[i].row_cnt
1409 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1422 * get exclusive lock to whole file.
1423 * lock gets removed when we close the file
1425 * returns 0 on success
1428 LockRRD(FILE *rrdfile)
1430 int rrd_fd; /* File descriptor for RRD */
1433 rrd_fd = fileno(rrdfile);
1436 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1439 if ( _fstat( rrd_fd, &st ) == 0 ) {
1440 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1446 lock.l_type = F_WRLCK; /* exclusive write lock */
1447 lock.l_len = 0; /* whole file */
1448 lock.l_start = 0; /* start of file */
1449 lock.l_whence = SEEK_SET; /* end of file */
1451 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1461 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1462 unsigned short CDP_scratch_idx,
1464 FILE UNUSED(*rrd_file),
1468 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1471 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1472 unsigned short CDP_scratch_idx, FILE *rrd_file,
1473 info_t *pcdp_summary, time_t *rra_time)
1476 unsigned long ds_idx, cdp_idx;
1479 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1481 /* compute the cdp index */
1482 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1484 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1485 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1486 rrd -> rra_def[rra_idx].cf_nam);
1488 if (pcdp_summary != NULL)
1490 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1491 /* append info to the return hash */
1492 pcdp_summary = info_push(pcdp_summary,
1493 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1494 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1495 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1499 memcpy((char *)rrd_mmaped_file + *rra_current,
1500 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1501 sizeof(rrd_value_t));
1503 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1504 sizeof(rrd_value_t),1,rrd_file) != 1)
1506 rrd_set_error("writing rrd");
1510 *rra_current += sizeof(rrd_value_t);
1512 return (pcdp_summary);