1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normilize time as returned by gettimeofday. usec part must
64 static void normalize_time(struct timeval *t)
68 t->tv_usec += 1000000L;
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
79 FILE UNUSED(*rrd_file),
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
90 int rrd_update_r(char *filename, char *template, int argc, char **argv);
91 int _rrd_update(char *filename, char *template, int argc, char **argv,
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
117 info_t *rrd_update_v(int argc, char **argv)
119 char *template = NULL;
120 info_t *result = NULL;
122 optind = 0; opterr = 0; /* initialize getopt */
125 static struct option long_options[] =
127 {"template", required_argument, 0, 't'},
130 int option_index = 0;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], template,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
165 rrd_update(int argc, char **argv)
167 char *template = NULL;
169 optind = 0; opterr = 0; /* initialize getopt */
172 static struct option long_options[] =
174 {"template", required_argument, 0, 't'},
177 int option_index = 0;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
203 rc = rrd_update_r(argv[optind], template,
204 argc - optind - 1, argv + optind + 1);
209 rrd_update_r(char *filename, char *template, int argc, char **argv)
211 return _rrd_update(filename, template, argc, argv, NULL);
215 _rrd_update(char *filename, char *template, int argc, char **argv,
216 info_t *pcdp_summary)
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
234 pre_int,post_int; /* interval between this and
236 unsigned long proc_pdp_st; /* which pdp_st was the last
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
253 long *tmpl_idx; /* index representing the settings
254 transported by the template index */
255 unsigned long tmpl_cnt = 2; /* time and data */
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
293 rrd_set_error("Not enough arguments");
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
308 current_time_usec = tmp_time.tv_usec;
311 current_time_usec = 0;
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
329 fseek(rrd_file, 0, SEEK_CUR);
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
368 /* initialize template redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
384 unsigned int tmpl_len;
386 tmpl_cnt = 1; /* the first entry is the time */
387 tmpl_len = strlen(template);
388 for(i=0;i<=tmpl_len ;i++) {
389 if (template[i] == ':' || template[i] == '\0') {
391 if (tmpl_cnt>rrd.stat_head->ds_cnt){
392 rrd_set_error("Template contains more DS definitions than RRD");
393 free(updvals); free(pdp_temp);
394 free(tmpl_idx); rrd_free(&rrd);
395 fclose(rrd_file); return(-1);
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
398 rrd_set_error("unknown DS name '%s'",dsname);
399 free(updvals); free(pdp_temp);
400 free(tmpl_idx); rrd_free(&rrd);
401 fclose(rrd_file); return(-1);
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt-1]++;
405 /* go to the next entry on the template */
406 dsname = &template[i+1];
407 /* fix the damage we did before */
416 if ((pdp_new = malloc(sizeof(rrd_value_t)
417 *rrd.stat_head->ds_cnt))==NULL){
418 rrd_set_error("allocating pdp_new ...");
428 rrd_mmaped_file = mmap(0,
430 PROT_READ | PROT_WRITE,
434 if (rrd_mmaped_file == MAP_FAILED) {
435 rrd_set_error("error mmapping file %s", filename);
444 /* loop through the arguments. */
445 for(arg_i=0; arg_i<argc;arg_i++) {
446 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
447 char *step_start = stepper;
449 char *parsetime_error = NULL;
450 enum {atstyle, normal} timesyntax;
451 struct rrd_time_value ds_tv;
452 if (stepper == NULL){
453 rrd_set_error("failed duplication argv entry");
459 munmap(rrd_mmaped_file, rrd_filesize);
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
486 updvals[tmpl_idx[ii]] = stepper;
488 if (*stepper == ':') {
492 updvals[tmpl_idx[ii]] = stepper+1;
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
535 /* dont do any correction for old version RRDs */
537 current_time_usec = 0;
539 if(current_time < rrd.live_head->last_up ||
540 (current_time == rrd.live_head->last_up &&
541 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
542 rrd_set_error("illegal attempt to update using time %ld when "
543 "last update time is %ld (minimum one second step)",
544 current_time, rrd.live_head->last_up);
550 /* seek to the beginning of the rra's */
551 if (rra_current != rra_begin) {
553 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
554 rrd_set_error("seek error in rrd");
559 rra_current = rra_begin;
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
571 /* interval = current_time - rrd.live_head->last_up; */
572 interval = (double)(current_time - rrd.live_head->last_up)
573 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
575 if (occu_pdp_st > proc_pdp_st){
576 /* OK we passed the pdp_st moment*/
577 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
578 * occurred before the latest
580 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
581 post_int = occu_pdp_age; /* how much after it */
582 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
596 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
597 occu_pdp_age, occu_pdp_st,
598 interval, pre_int, post_int);
601 /* process the data sources and update the pdp_prep
602 * area accordingly */
603 for(i=0;i<rrd.stat_head->ds_cnt;i++){
605 dst_idx= dst_conv(rrd.ds_def[i].dst);
607 /* make sure we do not build diffs with old last_ds values */
608 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
609 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
610 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
613 /* NOTE: DST_CDEF should never enter this if block, because
614 * updvals[i+1][0] is initialized to 'U'; unless the caller
615 * accidently specified a value for the DST_CDEF. To handle
616 * this case, an extra check is required. */
618 if((updvals[i+1][0] != 'U') &&
619 (dst_idx != DST_CDEF) &&
620 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
622 /* the data source type defines how to process the data */
623 /* pdp_new contains rate * time ... eg the bytes
624 * transferred during the interval. Doing it this way saves
625 * a lot of math operations */
631 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
632 for(ii=0;updvals[i+1][ii] != '\0';ii++){
633 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
634 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
638 if (rrd_test_error()){
641 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
642 if(dst_idx == DST_COUNTER) {
643 /* simple overflow catcher suggested by Andres Kroonmaa */
644 /* this will fail terribly for non 32 or 64 bit counters ... */
645 /* are there any others in SNMP land ? */
646 if (pdp_new[i] < (double)0.0 )
647 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
648 if (pdp_new[i] < (double)0.0 )
649 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
651 rate = pdp_new[i] / interval;
659 pdp_new[i] = strtod(updvals[i+1],&endptr);
661 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
664 if (endptr[0] != '\0'){
665 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
668 rate = pdp_new[i] / interval;
672 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
674 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
677 if (endptr[0] != '\0'){
678 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
681 rate = pdp_new[i] / interval;
684 rrd_set_error("rrd contains unknown DS type : '%s'",
688 /* break out of this for loop if the error string is set */
689 if (rrd_test_error()){
692 /* make sure pdp_temp is neither too large or too small
693 * if any of these occur it becomes unknown ...
695 if ( ! isnan(rate) &&
696 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
697 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
698 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
699 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
703 /* no news is news all the same */
707 /* make a copy of the command line argument for the next run */
715 rrd.pdp_prep[i].last_ds,
716 updvals[i+1], pdp_new[i]);
718 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
719 strncpy(rrd.pdp_prep[i].last_ds,
720 updvals[i+1],LAST_DS_LEN-1);
721 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
724 /* break out of the argument parsing loop if the error_string is set */
725 if (rrd_test_error()){
729 /* has a pdp_st moment occurred since the last run ? */
731 if (proc_pdp_st == occu_pdp_st){
732 /* no we have not passed a pdp_st moment. therefore update is simple */
734 for(i=0;i<rrd.stat_head->ds_cnt;i++){
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
738 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
739 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
741 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
750 rrd.pdp_prep[i].scratch[PDP_val].u_val,
751 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
755 /* an pdp_st has occurred. */
757 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
758 * occurred up to the last run.
759 pdp_new[] contains rate*seconds from the latest run.
760 pdp_temp[] will contain the rate for cdp */
762 for(i=0;i<rrd.stat_head->ds_cnt;i++){
763 /* update pdp_prep to the current pdp_st. */
765 if(isnan(pdp_new[i]))
766 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
768 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
769 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
771 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
776 /* if too much of the pdp_prep is unknown we dump it */
778 /* removed because this does not agree with the definition
779 a heart beat can be unknown */
780 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
781 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
782 (occu_pdp_st-proc_pdp_st <=
783 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
786 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787 / (double)( occu_pdp_st
789 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
792 /* process CDEF data sources; remember each CDEF DS can
793 * only reference other DS with a lower index number */
794 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
796 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++)
800 if (rpnp[ii].op == OP_VARIABLE) {
801 rpnp[ii].op = OP_NUMBER;
802 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
805 /* run the rpn calculator */
806 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
808 break; /* exits the data sources pdp_temp loop */
812 /* make pdp_prep ready for the next run */
813 if(isnan(pdp_new[i])){
814 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
815 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
817 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
818 rrd.pdp_prep[i].scratch[PDP_val].u_val =
819 pdp_new[i]/interval*post_int;
827 "new_unkn_sec %5lu\n",
829 rrd.pdp_prep[i].scratch[PDP_val].u_val,
830 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 /* if there were errors during the last loop, bail out here */
835 if (rrd_test_error()){
840 /* compute the number of elapsed pdp_st moments */
841 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
843 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
845 if (rra_step_cnt == NULL)
847 rra_step_cnt = (unsigned long *)
848 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
851 for(i = 0, rra_start = rra_begin;
852 i < rrd.stat_head->rra_cnt;
853 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
856 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
857 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
858 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
859 if (start_pdp_offset <= elapsed_pdp_st) {
860 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
861 rrd.rra_def[i].pdp_cnt + 1;
866 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
868 /* If this is a bulk update, we need to skip ahead in the seasonal
869 * arrays so that they will be correct for the next observed value;
870 * note that for the bulk update itself, no update will occur to
871 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
873 if (rra_step_cnt[i] > 2)
875 /* skip update by resetting rra_step_cnt[i],
876 * note that this is not data source specific; this is due
877 * to the bulk update, not a DNAN value for the specific data
880 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
881 &last_seasonal_coef);
882 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
886 /* periodically run a smoother for seasonal effects */
887 /* Need to use first cdp parameter buffer to track
888 * burnin (burnin requires a specific smoothing schedule).
889 * The CDP_init_seasonal parameter is really an RRA level,
890 * not a data source within RRA level parameter, but the rra_def
891 * is read only for rrd_update (not flushed to disk). */
892 iii = i*(rrd.stat_head -> ds_cnt);
893 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
897 > rrd.rra_def[i].row_cnt - 1) {
898 /* mark off one of the burnin cycles */
899 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
903 /* someone has no doubt invented a trick to deal with this
904 * wrap around, but at least this code is clear. */
905 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
906 rrd.rra_ptr[i].cur_row)
908 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
909 * mapping between PDP and CDP */
910 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
911 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
916 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
917 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
922 /* can't rely on negative numbers because we are working with
924 /* Don't need modulus here. If we've wrapped more than once, only
925 * one smooth is executed at the end. */
926 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
927 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
928 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
932 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
933 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
934 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
941 rra_current = ftell(rrd_file);
942 } /* if cf is DEVSEASONAL or SEASONAL */
944 if (rrd_test_error()) break;
946 /* update CDP_PREP areas */
947 /* loop over data soures within each RRA */
949 ii < rrd.stat_head->ds_cnt;
953 /* iii indexes the CDP prep area for this data source within the RRA */
954 iii=i*rrd.stat_head->ds_cnt+ii;
956 if (rrd.rra_def[i].pdp_cnt > 1) {
958 if (rra_step_cnt[i] > 0) {
959 /* If we are in this block, as least 1 CDP value will be written to
960 * disk, this is the CDP_primary_val entry. If more than 1 value needs
961 * to be written, then the "fill in" value is the CDP_secondary_val
963 if (isnan(pdp_temp[ii]))
965 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
966 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
968 /* CDP_secondary value is the RRA "fill in" value for intermediary
969 * CDP data entries. No matter the CF, the value is the same because
970 * the average, max, min, and last of a list of identical values is
971 * the same, namely, the value itself. */
972 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
975 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
976 > rrd.rra_def[i].pdp_cnt*
977 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
979 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
980 /* initialize carry over */
981 if (current_cf == CF_AVERAGE) {
982 if (isnan(pdp_temp[ii])) {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
989 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
992 rrd_value_t cum_val, cur_val;
993 switch (current_cf) {
995 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
996 cur_val = IFDNAN(pdp_temp[ii],0.0);
997 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
998 (cum_val + cur_val * start_pdp_offset) /
999 (rrd.rra_def[i].pdp_cnt
1000 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1001 /* initialize carry over value */
1002 if (isnan(pdp_temp[ii])) {
1003 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1005 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1006 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1010 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1011 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1013 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1014 isnan(pdp_temp[ii])) {
1016 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1021 if (cur_val > cum_val)
1022 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1024 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1025 /* initialize carry over value */
1026 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1029 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1030 cur_val = IFDNAN(pdp_temp[ii],DINF);
1032 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1033 isnan(pdp_temp[ii])) {
1035 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1040 if (cur_val < cum_val)
1041 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1043 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1044 /* initialize carry over value */
1045 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1050 /* initialize carry over value */
1051 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1054 } /* endif meets xff value requirement for a valid value */
1055 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1056 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1057 if (isnan(pdp_temp[ii]))
1058 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1059 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1061 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1062 } else /* rra_step_cnt[i] == 0 */
1065 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1066 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1069 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1070 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1073 if (isnan(pdp_temp[ii])) {
1074 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1075 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1077 if (current_cf == CF_AVERAGE) {
1078 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1084 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1085 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1088 switch (current_cf) {
1090 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1094 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1099 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1108 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1109 if (elapsed_pdp_st > 2)
1111 switch (current_cf) {
1114 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1115 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1118 case CF_DEVSEASONAL:
1119 /* need to update cached seasonal values, so they are consistent
1120 * with the bulk update */
1121 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1122 * CDP_last_deviation are the same. */
1123 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1124 last_seasonal_coef[ii];
1125 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1129 /* need to update the null_count and last_null_count.
1130 * even do this for non-DNAN pdp_temp because the
1131 * algorithm is not learning from batch updates. */
1132 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1134 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1138 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1139 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1142 /* do not count missed bulk values as failures */
1143 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1144 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1145 /* need to reset violations buffer.
1146 * could do this more carefully, but for now, just
1147 * assume a bulk update wipes away all violations. */
1148 erase_violations(&rrd, iii, i);
1152 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1154 if (rrd_test_error()) break;
1156 } /* endif data sources loop */
1157 } /* end RRA Loop */
1159 /* this loop is only entered if elapsed_pdp_st < 3 */
1160 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1161 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1163 for(i = 0, rra_start = rra_begin;
1164 i < rrd.stat_head->rra_cnt;
1165 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1168 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1170 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1171 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1173 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1174 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1176 rra_current = ftell(rrd_file);
1178 if (rrd_test_error()) break;
1179 /* loop over data soures within each RRA */
1181 ii < rrd.stat_head->ds_cnt;
1184 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1185 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1186 scratch_idx, seasonal_coef);
1188 } /* end RRA Loop */
1189 if (rrd_test_error()) break;
1190 } /* end elapsed_pdp_st loop */
1192 if (rrd_test_error()) break;
1194 /* Ready to write to disk */
1195 /* Move sequentially through the file, writing one RRA at a time.
1196 * Note this architecture divorces the computation of CDP with
1197 * flushing updated RRA entries to disk. */
1198 for(i = 0, rra_start = rra_begin;
1199 i < rrd.stat_head->rra_cnt;
1200 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1202 /* is there anything to write for this RRA? If not, continue. */
1203 if (rra_step_cnt[i] == 0) continue;
1205 /* write the first row */
1207 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1209 rrd.rra_ptr[i].cur_row++;
1210 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1211 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1212 /* positition on the first row */
1213 rra_pos_tmp = rra_start +
1214 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1215 if(rra_pos_tmp != rra_current) {
1217 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1218 rrd_set_error("seek error in rrd");
1222 rra_current = rra_pos_tmp;
1226 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1228 scratch_idx = CDP_primary_val;
1229 if (pcdp_summary != NULL)
1231 rra_time = (current_time - current_time
1232 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1236 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1237 pcdp_summary, &rra_time, rrd_mmaped_file);
1239 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 pcdp_summary, &rra_time);
1242 if (rrd_test_error()) break;
1244 /* write other rows of the bulk update, if any */
1245 scratch_idx = CDP_secondary_val;
1246 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1248 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1251 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1252 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1255 rrd.rra_ptr[i].cur_row = 0;
1256 /* seek back to beginning of current rra */
1257 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1259 rrd_set_error("seek error in rrd");
1263 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1265 rra_current = rra_start;
1267 if (pcdp_summary != NULL)
1269 rra_time = (current_time - current_time
1270 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1271 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1274 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1275 pcdp_summary, &rra_time, rrd_mmaped_file);
1277 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278 pcdp_summary, &rra_time);
1282 if (rrd_test_error())
1286 /* break out of the argument parsing loop if error_string is set */
1287 if (rrd_test_error()){
1292 } /* endif a pdp_st has occurred */
1293 rrd.live_head->last_up = current_time;
1294 rrd.live_head->last_up_usec = current_time_usec;
1296 } /* function argument loop */
1298 if (seasonal_coef != NULL) free(seasonal_coef);
1299 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1300 if (rra_step_cnt != NULL) free(rra_step_cnt);
1301 rpnstack_free(&rpnstack);
1304 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1305 rrd_set_error("error writing(unmapping) file: %s", filename);
1308 /* if we got here and if there is an error and if the file has not been
1309 * written to, then close things up and return. */
1310 if (rrd_test_error()) {
1320 /* aargh ... that was tough ... so many loops ... anyway, its done.
1321 * we just need to write back the live header portion now*/
1323 if (fseek(rrd_file, (sizeof(stat_head_t)
1324 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1325 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1327 rrd_set_error("seek rrd for live header writeback");
1338 if(fwrite( rrd.live_head,
1339 sizeof(live_head_t), 1, rrd_file) != 1){
1340 rrd_set_error("fwrite live_head to rrd");
1351 if(fwrite( &rrd.live_head->last_up,
1352 sizeof(time_t), 1, rrd_file) != 1){
1353 rrd_set_error("fwrite live_head to rrd");
1365 if(fwrite( rrd.pdp_prep,
1367 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1368 rrd_set_error("ftwrite pdp_prep to rrd");
1378 if(fwrite( rrd.cdp_prep,
1380 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1381 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1383 rrd_set_error("ftwrite cdp_prep to rrd");
1393 if(fwrite( rrd.rra_ptr,
1395 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1396 rrd_set_error("fwrite rra_ptr to rrd");
1406 /* OK now close the files and free the memory */
1407 if(fclose(rrd_file) != 0){
1408 rrd_set_error("closing rrd");
1417 /* calling the smoothing code here guarantees at most
1418 * one smoothing operation per rrd_update call. Unfortunately,
1419 * it is possible with bulk updates, or a long-delayed update
1420 * for smoothing to occur off-schedule. This really isn't
1421 * critical except during the burning cycles. */
1422 if (schedule_smooth)
1424 rrd_file = fopen(filename,"rb+");
1425 rra_start = rra_begin;
1426 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1428 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1432 fprintf(stderr,"Running smoother for rra %ld\n",i);
1434 apply_smoother(&rrd,i,rra_start,rrd_file);
1435 if (rrd_test_error())
1438 rra_start += rrd.rra_def[i].row_cnt
1439 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1452 * get exclusive lock to whole file.
1453 * lock gets removed when we close the file
1455 * returns 0 on success
1458 LockRRD(FILE *rrdfile)
1460 int rrd_fd; /* File descriptor for RRD */
1463 rrd_fd = fileno(rrdfile);
1466 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1469 if ( _fstat( rrd_fd, &st ) == 0 ) {
1470 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1476 lock.l_type = F_WRLCK; /* exclusive write lock */
1477 lock.l_len = 0; /* whole file */
1478 lock.l_start = 0; /* start of file */
1479 lock.l_whence = SEEK_SET; /* end of file */
1481 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1491 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1492 unsigned short CDP_scratch_idx,
1494 FILE UNUSED(*rrd_file),
1498 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1501 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1502 unsigned short CDP_scratch_idx, FILE *rrd_file,
1503 info_t *pcdp_summary, time_t *rra_time)
1506 unsigned long ds_idx, cdp_idx;
1509 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1511 /* compute the cdp index */
1512 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1514 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1515 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1516 rrd -> rra_def[rra_idx].cf_nam);
1518 if (pcdp_summary != NULL)
1520 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1521 /* append info to the return hash */
1522 pcdp_summary = info_push(pcdp_summary,
1523 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1524 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1525 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1529 memcpy((char *)rrd_mmaped_file + *rra_current,
1530 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1531 sizeof(rrd_value_t));
1533 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1534 sizeof(rrd_value_t),1,rrd_file) != 1)
1536 rrd_set_error("writing rrd");
1540 *rra_current += sizeof(rrd_value_t);
1542 return (pcdp_summary);