1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normilize time as returned by gettimeofday. usec part must
64 static void normalize_time(struct timeval *t)
68 t->tv_usec += 1000000L;
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
79 FILE UNUSED(*rrd_file),
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv,
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
117 info_t *rrd_update_v(int argc, char **argv)
120 info_t *result = NULL;
122 optind = 0; opterr = 0; /* initialize getopt */
125 static struct option long_options[] =
127 {"template", required_argument, 0, 't'},
130 int option_index = 0;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], tmplt,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
165 rrd_update(int argc, char **argv)
169 optind = 0; opterr = 0; /* initialize getopt */
172 static struct option long_options[] =
174 {"template", required_argument, 0, 't'},
177 int option_index = 0;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
203 rc = rrd_update_r(argv[optind], tmplt,
204 argc - optind - 1, argv + optind + 1);
209 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
211 return _rrd_update(filename, tmplt, argc, argv, NULL);
215 _rrd_update(char *filename, char *tmplt, int argc, char **argv,
216 info_t *pcdp_summary)
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
234 pre_int,post_int; /* interval between this and
236 unsigned long proc_pdp_st; /* which pdp_st was the last
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
253 long *tmpl_idx; /* index representing the settings
254 transported by the tmplt index */
255 unsigned long tmpl_cnt = 2; /* time and data */
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
293 rrd_set_error("Not enough arguments");
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
308 current_time_usec = tmp_time.tv_usec;
311 current_time_usec = 0;
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
329 fseek(rrd_file, 0, SEEK_CUR);
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
368 /* initialize tmplt redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
383 /* we should work on a writeable copy here */
385 unsigned int tmpl_len;
386 tmplt = strdup(tmplt);
388 tmpl_cnt = 1; /* the first entry is the time */
389 tmpl_len = strlen(tmplt);
390 for(i=0;i<=tmpl_len ;i++) {
391 if (tmplt[i] == ':' || tmplt[i] == '\0') {
393 if (tmpl_cnt>rrd.stat_head->ds_cnt){
394 rrd_set_error("tmplt contains more DS definitions than RRD");
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
400 rrd_set_error("unknown DS name '%s'",dsname);
401 free(updvals); free(pdp_temp);
403 free(tmpl_idx); rrd_free(&rrd);
404 fclose(rrd_file); return(-1);
406 /* the first element is always the time */
407 tmpl_idx[tmpl_cnt-1]++;
408 /* go to the next entry on the tmplt */
409 dsname = &tmplt[i+1];
410 /* fix the damage we did before */
420 if ((pdp_new = malloc(sizeof(rrd_value_t)
421 *rrd.stat_head->ds_cnt))==NULL){
422 rrd_set_error("allocating pdp_new ...");
432 rrd_mmaped_file = mmap(0,
434 PROT_READ | PROT_WRITE,
438 if (rrd_mmaped_file == MAP_FAILED) {
439 rrd_set_error("error mmapping file %s", filename);
448 /* loop through the arguments. */
449 for(arg_i=0; arg_i<argc;arg_i++) {
450 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
451 char *step_start = stepper;
453 char *parsetime_error = NULL;
454 enum {atstyle, normal} timesyntax;
455 struct rrd_time_value ds_tv;
456 if (stepper == NULL){
457 rrd_set_error("failed duplication argv entry");
463 munmap(rrd_mmaped_file, rrd_filesize);
468 /* initialize all ds input to unknown except the first one
469 which has always got to be set */
470 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471 strcpy(stepper,argv[arg_i]);
473 /* separate all ds elements; first must be examined separately
474 due to alternate time syntax */
475 if ((p=strchr(stepper,'@'))!=NULL) {
476 timesyntax = atstyle;
479 } else if ((p=strchr(stepper,':'))!=NULL) {
484 rrd_set_error("expected timestamp not found in data source from %s:...",
490 updvals[tmpl_idx[ii]] = stepper;
492 if (*stepper == ':') {
496 updvals[tmpl_idx[ii]] = stepper+1;
502 if (ii != tmpl_cnt-1) {
503 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504 tmpl_cnt-1, ii, argv[arg_i]);
509 /* get the time from the reading ... handle N */
510 if (timesyntax == atstyle) {
511 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
516 if (ds_tv.type == RELATIVE_TO_END_TIME ||
517 ds_tv.type == RELATIVE_TO_START_TIME) {
518 rrd_set_error("specifying time relative to the 'start' "
519 "or 'end' makes no sense here: %s",
525 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
528 } else if (strcmp(updvals[0],"N")==0){
529 gettimeofday(&tmp_time, 0);
530 normalize_time(&tmp_time);
531 current_time = tmp_time.tv_sec;
532 current_time_usec = tmp_time.tv_usec;
535 tmp = strtod(updvals[0], 0);
536 current_time = floor(tmp);
537 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
539 /* dont do any correction for old version RRDs */
541 current_time_usec = 0;
543 if(current_time < rrd.live_head->last_up ||
544 (current_time == rrd.live_head->last_up &&
545 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
546 rrd_set_error("illegal attempt to update using time %ld when "
547 "last update time is %ld (minimum one second step)",
548 current_time, rrd.live_head->last_up);
554 /* seek to the beginning of the rra's */
555 if (rra_current != rra_begin) {
557 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
558 rrd_set_error("seek error in rrd");
563 rra_current = rra_begin;
565 rra_start = rra_begin;
567 /* when was the current pdp started */
568 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
571 /* when did the last pdp_st occur */
572 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573 occu_pdp_st = current_time - occu_pdp_age;
575 /* interval = current_time - rrd.live_head->last_up; */
576 interval = (double)(current_time - rrd.live_head->last_up)
577 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
579 if (occu_pdp_st > proc_pdp_st){
580 /* OK we passed the pdp_st moment*/
581 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
582 * occurred before the latest
584 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
585 post_int = occu_pdp_age; /* how much after it */
586 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
600 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
601 occu_pdp_age, occu_pdp_st,
602 interval, pre_int, post_int);
605 /* process the data sources and update the pdp_prep
606 * area accordingly */
607 for(i=0;i<rrd.stat_head->ds_cnt;i++){
609 dst_idx= dst_conv(rrd.ds_def[i].dst);
611 /* make sure we do not build diffs with old last_ds values */
612 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
613 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
614 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
617 /* NOTE: DST_CDEF should never enter this if block, because
618 * updvals[i+1][0] is initialized to 'U'; unless the caller
619 * accidently specified a value for the DST_CDEF. To handle
620 * this case, an extra check is required. */
622 if((updvals[i+1][0] != 'U') &&
623 (dst_idx != DST_CDEF) &&
624 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
626 /* the data source type defines how to process the data */
627 /* pdp_new contains rate * time ... eg the bytes
628 * transferred during the interval. Doing it this way saves
629 * a lot of math operations */
635 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636 for(ii=0;updvals[i+1][ii] != '\0';ii++){
637 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
642 if (rrd_test_error()){
645 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646 if(dst_idx == DST_COUNTER) {
647 /* simple overflow catcher suggested by Andres Kroonmaa */
648 /* this will fail terribly for non 32 or 64 bit counters ... */
649 /* are there any others in SNMP land ? */
650 if (pdp_new[i] < (double)0.0 )
651 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
652 if (pdp_new[i] < (double)0.0 )
653 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
655 rate = pdp_new[i] / interval;
663 pdp_new[i] = strtod(updvals[i+1],&endptr);
665 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
668 if (endptr[0] != '\0'){
669 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
672 rate = pdp_new[i] / interval;
676 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
678 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
681 if (endptr[0] != '\0'){
682 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
685 rate = pdp_new[i] / interval;
688 rrd_set_error("rrd contains unknown DS type : '%s'",
692 /* break out of this for loop if the error string is set */
693 if (rrd_test_error()){
696 /* make sure pdp_temp is neither too large or too small
697 * if any of these occur it becomes unknown ...
699 if ( ! isnan(rate) &&
700 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
702 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
707 /* no news is news all the same */
711 /* make a copy of the command line argument for the next run */
719 rrd.pdp_prep[i].last_ds,
720 updvals[i+1], pdp_new[i]);
722 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723 strncpy(rrd.pdp_prep[i].last_ds,
724 updvals[i+1],LAST_DS_LEN-1);
725 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
728 /* break out of the argument parsing loop if the error_string is set */
729 if (rrd_test_error()){
733 /* has a pdp_st moment occurred since the last run ? */
735 if (proc_pdp_st == occu_pdp_st){
736 /* no we have not passed a pdp_st moment. therefore update is simple */
738 for(i=0;i<rrd.stat_head->ds_cnt;i++){
739 if(isnan(pdp_new[i]))
740 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
742 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
743 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
745 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
754 rrd.pdp_prep[i].scratch[PDP_val].u_val,
755 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
759 /* an pdp_st has occurred. */
761 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
762 * occurred up to the last run.
763 pdp_new[] contains rate*seconds from the latest run.
764 pdp_temp[] will contain the rate for cdp */
766 for(i=0;i<rrd.stat_head->ds_cnt;i++){
767 /* update pdp_prep to the current pdp_st. */
769 if(isnan(pdp_new[i]))
770 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
772 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
773 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
775 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
780 /* if too much of the pdp_prep is unknown we dump it */
782 /* removed because this does not agree with the definition
783 a heart beat can be unknown */
784 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
785 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
786 /* if the interval is larger thatn mrhb we get NAN */
787 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
788 (occu_pdp_st-proc_pdp_st <=
789 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
792 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
793 / (double)( occu_pdp_st
795 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
798 /* process CDEF data sources; remember each CDEF DS can
799 * only reference other DS with a lower index number */
800 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
802 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
803 /* substitue data values for OP_VARIABLE nodes */
804 for (ii = 0; rpnp[ii].op != OP_END; ii++)
806 if (rpnp[ii].op == OP_VARIABLE) {
807 rpnp[ii].op = OP_NUMBER;
808 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
811 /* run the rpn calculator */
812 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
814 break; /* exits the data sources pdp_temp loop */
818 /* make pdp_prep ready for the next run */
819 if(isnan(pdp_new[i])){
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
821 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
823 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
824 rrd.pdp_prep[i].scratch[PDP_val].u_val =
825 pdp_new[i]/interval*post_int;
833 "new_unkn_sec %5lu\n",
835 rrd.pdp_prep[i].scratch[PDP_val].u_val,
836 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
840 /* if there were errors during the last loop, bail out here */
841 if (rrd_test_error()){
846 /* compute the number of elapsed pdp_st moments */
847 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
849 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
851 if (rra_step_cnt == NULL)
853 rra_step_cnt = (unsigned long *)
854 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
857 for(i = 0, rra_start = rra_begin;
858 i < rrd.stat_head->rra_cnt;
859 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
862 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
863 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
864 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
865 if (start_pdp_offset <= elapsed_pdp_st) {
866 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
867 rrd.rra_def[i].pdp_cnt + 1;
872 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
874 /* If this is a bulk update, we need to skip ahead in the seasonal
875 * arrays so that they will be correct for the next observed value;
876 * note that for the bulk update itself, no update will occur to
877 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
879 if (rra_step_cnt[i] > 2)
881 /* skip update by resetting rra_step_cnt[i],
882 * note that this is not data source specific; this is due
883 * to the bulk update, not a DNAN value for the specific data
886 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
887 &last_seasonal_coef);
888 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
892 /* periodically run a smoother for seasonal effects */
893 /* Need to use first cdp parameter buffer to track
894 * burnin (burnin requires a specific smoothing schedule).
895 * The CDP_init_seasonal parameter is really an RRA level,
896 * not a data source within RRA level parameter, but the rra_def
897 * is read only for rrd_update (not flushed to disk). */
898 iii = i*(rrd.stat_head -> ds_cnt);
899 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
902 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
903 > rrd.rra_def[i].row_cnt - 1) {
904 /* mark off one of the burnin cycles */
905 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
909 /* someone has no doubt invented a trick to deal with this
910 * wrap around, but at least this code is clear. */
911 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
912 rrd.rra_ptr[i].cur_row)
914 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
915 * mapping between PDP and CDP */
916 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
917 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
921 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
922 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
923 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
928 /* can't rely on negative numbers because we are working with
930 /* Don't need modulus here. If we've wrapped more than once, only
931 * one smooth is executed at the end. */
932 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
933 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
934 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
938 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
939 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
940 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
947 rra_current = ftell(rrd_file);
948 } /* if cf is DEVSEASONAL or SEASONAL */
950 if (rrd_test_error()) break;
952 /* update CDP_PREP areas */
953 /* loop over data soures within each RRA */
955 ii < rrd.stat_head->ds_cnt;
959 /* iii indexes the CDP prep area for this data source within the RRA */
960 iii=i*rrd.stat_head->ds_cnt+ii;
962 if (rrd.rra_def[i].pdp_cnt > 1) {
964 if (rra_step_cnt[i] > 0) {
965 /* If we are in this block, as least 1 CDP value will be written to
966 * disk, this is the CDP_primary_val entry. If more than 1 value needs
967 * to be written, then the "fill in" value is the CDP_secondary_val
969 if (isnan(pdp_temp[ii]))
971 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
972 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
974 /* CDP_secondary value is the RRA "fill in" value for intermediary
975 * CDP data entries. No matter the CF, the value is the same because
976 * the average, max, min, and last of a list of identical values is
977 * the same, namely, the value itself. */
978 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
981 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
982 > rrd.rra_def[i].pdp_cnt*
983 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
985 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
986 /* initialize carry over */
987 if (current_cf == CF_AVERAGE) {
988 if (isnan(pdp_temp[ii])) {
989 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
991 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
992 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
995 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
998 rrd_value_t cum_val, cur_val;
999 switch (current_cf) {
1001 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1002 cur_val = IFDNAN(pdp_temp[ii],0.0);
1003 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1004 (cum_val + cur_val * start_pdp_offset) /
1005 (rrd.rra_def[i].pdp_cnt
1006 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1007 /* initialize carry over value */
1008 if (isnan(pdp_temp[ii])) {
1009 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1011 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1012 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1016 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1017 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1019 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1020 isnan(pdp_temp[ii])) {
1022 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1027 if (cur_val > cum_val)
1028 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1030 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1031 /* initialize carry over value */
1032 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1035 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1036 cur_val = IFDNAN(pdp_temp[ii],DINF);
1038 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1039 isnan(pdp_temp[ii])) {
1041 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1046 if (cur_val < cum_val)
1047 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1049 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1050 /* initialize carry over value */
1051 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1055 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1056 /* initialize carry over value */
1057 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060 } /* endif meets xff value requirement for a valid value */
1061 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1062 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1063 if (isnan(pdp_temp[ii]))
1064 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1065 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1067 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1068 } else /* rra_step_cnt[i] == 0 */
1071 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1072 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1075 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1076 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1079 if (isnan(pdp_temp[ii])) {
1080 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1081 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1083 if (current_cf == CF_AVERAGE) {
1084 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1087 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1091 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1094 switch (current_cf) {
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1100 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1101 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1104 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1105 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1109 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1114 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1115 if (elapsed_pdp_st > 2)
1117 switch (current_cf) {
1120 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1121 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1124 case CF_DEVSEASONAL:
1125 /* need to update cached seasonal values, so they are consistent
1126 * with the bulk update */
1127 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1128 * CDP_last_deviation are the same. */
1129 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1130 last_seasonal_coef[ii];
1131 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1135 /* need to update the null_count and last_null_count.
1136 * even do this for non-DNAN pdp_temp because the
1137 * algorithm is not learning from batch updates. */
1138 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1140 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1144 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1145 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1148 /* do not count missed bulk values as failures */
1149 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1150 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1151 /* need to reset violations buffer.
1152 * could do this more carefully, but for now, just
1153 * assume a bulk update wipes away all violations. */
1154 erase_violations(&rrd, iii, i);
1158 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1160 if (rrd_test_error()) break;
1162 } /* endif data sources loop */
1163 } /* end RRA Loop */
1165 /* this loop is only entered if elapsed_pdp_st < 3 */
1166 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1167 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1169 for(i = 0, rra_start = rra_begin;
1170 i < rrd.stat_head->rra_cnt;
1171 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1174 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1176 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1177 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1179 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1180 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1182 rra_current = ftell(rrd_file);
1184 if (rrd_test_error()) break;
1185 /* loop over data soures within each RRA */
1187 ii < rrd.stat_head->ds_cnt;
1190 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1191 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1192 scratch_idx, seasonal_coef);
1194 } /* end RRA Loop */
1195 if (rrd_test_error()) break;
1196 } /* end elapsed_pdp_st loop */
1198 if (rrd_test_error()) break;
1200 /* Ready to write to disk */
1201 /* Move sequentially through the file, writing one RRA at a time.
1202 * Note this architecture divorces the computation of CDP with
1203 * flushing updated RRA entries to disk. */
1204 for(i = 0, rra_start = rra_begin;
1205 i < rrd.stat_head->rra_cnt;
1206 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1208 /* is there anything to write for this RRA? If not, continue. */
1209 if (rra_step_cnt[i] == 0) continue;
1211 /* write the first row */
1213 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1215 rrd.rra_ptr[i].cur_row++;
1216 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1217 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1218 /* positition on the first row */
1219 rra_pos_tmp = rra_start +
1220 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1221 if(rra_pos_tmp != rra_current) {
1223 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1224 rrd_set_error("seek error in rrd");
1228 rra_current = rra_pos_tmp;
1232 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1234 scratch_idx = CDP_primary_val;
1235 if (pcdp_summary != NULL)
1237 rra_time = (current_time - current_time
1238 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1239 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1242 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1243 pcdp_summary, &rra_time, rrd_mmaped_file);
1245 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1246 pcdp_summary, &rra_time);
1248 if (rrd_test_error()) break;
1250 /* write other rows of the bulk update, if any */
1251 scratch_idx = CDP_secondary_val;
1252 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1254 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1257 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1258 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1261 rrd.rra_ptr[i].cur_row = 0;
1262 /* seek back to beginning of current rra */
1263 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1265 rrd_set_error("seek error in rrd");
1269 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1271 rra_current = rra_start;
1273 if (pcdp_summary != NULL)
1275 rra_time = (current_time - current_time
1276 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1277 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1280 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281 pcdp_summary, &rra_time, rrd_mmaped_file);
1283 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1284 pcdp_summary, &rra_time);
1288 if (rrd_test_error())
1292 /* break out of the argument parsing loop if error_string is set */
1293 if (rrd_test_error()){
1298 } /* endif a pdp_st has occurred */
1299 rrd.live_head->last_up = current_time;
1300 rrd.live_head->last_up_usec = current_time_usec;
1302 } /* function argument loop */
1304 if (seasonal_coef != NULL) free(seasonal_coef);
1305 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1306 if (rra_step_cnt != NULL) free(rra_step_cnt);
1307 rpnstack_free(&rpnstack);
1310 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1311 rrd_set_error("error writing(unmapping) file: %s", filename);
1314 /* if we got here and if there is an error and if the file has not been
1315 * written to, then close things up and return. */
1316 if (rrd_test_error()) {
1326 /* aargh ... that was tough ... so many loops ... anyway, its done.
1327 * we just need to write back the live header portion now*/
1329 if (fseek(rrd_file, (sizeof(stat_head_t)
1330 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1331 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1333 rrd_set_error("seek rrd for live header writeback");
1344 if(fwrite( rrd.live_head,
1345 sizeof(live_head_t), 1, rrd_file) != 1){
1346 rrd_set_error("fwrite live_head to rrd");
1357 if(fwrite( &rrd.live_head->last_up,
1358 sizeof(time_t), 1, rrd_file) != 1){
1359 rrd_set_error("fwrite live_head to rrd");
1371 if(fwrite( rrd.pdp_prep,
1373 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1374 rrd_set_error("ftwrite pdp_prep to rrd");
1384 if(fwrite( rrd.cdp_prep,
1386 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1387 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1389 rrd_set_error("ftwrite cdp_prep to rrd");
1399 if(fwrite( rrd.rra_ptr,
1401 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1402 rrd_set_error("fwrite rra_ptr to rrd");
1412 /* OK now close the files and free the memory */
1413 if(fclose(rrd_file) != 0){
1414 rrd_set_error("closing rrd");
1423 /* calling the smoothing code here guarantees at most
1424 * one smoothing operation per rrd_update call. Unfortunately,
1425 * it is possible with bulk updates, or a long-delayed update
1426 * for smoothing to occur off-schedule. This really isn't
1427 * critical except during the burning cycles. */
1428 if (schedule_smooth)
1430 rrd_file = fopen(filename,"rb+");
1431 rra_start = rra_begin;
1432 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1434 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1435 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1438 fprintf(stderr,"Running smoother for rra %ld\n",i);
1440 apply_smoother(&rrd,i,rra_start,rrd_file);
1441 if (rrd_test_error())
1444 rra_start += rrd.rra_def[i].row_cnt
1445 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1458 * get exclusive lock to whole file.
1459 * lock gets removed when we close the file
1461 * returns 0 on success
1464 LockRRD(FILE *rrdfile)
1466 int rrd_fd; /* File descriptor for RRD */
1469 rrd_fd = fileno(rrdfile);
1472 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1475 if ( _fstat( rrd_fd, &st ) == 0 ) {
1476 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1482 lock.l_type = F_WRLCK; /* exclusive write lock */
1483 lock.l_len = 0; /* whole file */
1484 lock.l_start = 0; /* start of file */
1485 lock.l_whence = SEEK_SET; /* end of file */
1487 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1497 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1498 unsigned short CDP_scratch_idx,
1500 FILE UNUSED(*rrd_file),
1504 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1507 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1508 unsigned short CDP_scratch_idx, FILE *rrd_file,
1509 info_t *pcdp_summary, time_t *rra_time)
1512 unsigned long ds_idx, cdp_idx;
1515 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1517 /* compute the cdp index */
1518 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1520 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1521 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1522 rrd -> rra_def[rra_idx].cf_nam);
1524 if (pcdp_summary != NULL)
1526 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1527 /* append info to the return hash */
1528 pcdp_summary = info_push(pcdp_summary,
1529 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1530 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1531 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1535 memcpy((char *)rrd_mmaped_file + *rra_current,
1536 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1537 sizeof(rrd_value_t));
1539 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1540 sizeof(rrd_value_t),1,rrd_file) != 1)
1542 rrd_set_error("writing rrd");
1546 *rra_current += sizeof(rrd_value_t);
1548 return (pcdp_summary);