1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normilize time as returned by gettimeofday. usec part must
64 static void normalize_time(struct timeval *t)
68 t->tv_usec += 1000000L;
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
79 FILE UNUSED(*rrd_file),
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
90 int rrd_update_r(char *filename, char *template, int argc, char **argv);
91 int _rrd_update(char *filename, char *template, int argc, char **argv,
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
117 info_t *rrd_update_v(int argc, char **argv)
119 char *template = NULL;
120 info_t *result = NULL;
122 optind = 0; opterr = 0; /* initialize getopt */
125 static struct option long_options[] =
127 {"template", required_argument, 0, 't'},
130 int option_index = 0;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], template,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
165 rrd_update(int argc, char **argv)
167 char *template = NULL;
169 optind = 0; opterr = 0; /* initialize getopt */
172 static struct option long_options[] =
174 {"template", required_argument, 0, 't'},
177 int option_index = 0;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
203 rc = rrd_update_r(argv[optind], template,
204 argc - optind - 1, argv + optind + 1);
209 rrd_update_r(char *filename, char *template, int argc, char **argv)
211 return _rrd_update(filename, template, argc, argv, NULL);
215 _rrd_update(char *filename, char *template, int argc, char **argv,
216 info_t *pcdp_summary)
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
234 pre_int,post_int; /* interval between this and
236 unsigned long proc_pdp_st; /* which pdp_st was the last
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
253 long *tmpl_idx; /* index representing the settings
254 transported by the template index */
255 unsigned long tmpl_cnt = 2; /* time and data */
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
293 rrd_set_error("Not enough arguments");
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
308 current_time_usec = tmp_time.tv_usec;
311 current_time_usec = 0;
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
329 fseek(rrd_file, 0, SEEK_CUR);
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
368 /* initialize template redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
384 unsigned int tmpl_len;
386 tmpl_cnt = 1; /* the first entry is the time */
387 tmpl_len = strlen(template);
388 for(i=0;i<=tmpl_len ;i++) {
389 if (template[i] == ':' || template[i] == '\0') {
391 if (tmpl_cnt>rrd.stat_head->ds_cnt){
392 rrd_set_error("Template contains more DS definitions than RRD");
393 free(updvals); free(pdp_temp);
394 free(tmpl_idx); rrd_free(&rrd);
395 fclose(rrd_file); return(-1);
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
398 rrd_set_error("unknown DS name '%s'",dsname);
399 free(updvals); free(pdp_temp);
400 free(tmpl_idx); rrd_free(&rrd);
401 fclose(rrd_file); return(-1);
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt-1]++;
405 /* go to the next entry on the template */
406 dsname = &template[i+1];
407 /* fix the damage we did before */
416 if ((pdp_new = malloc(sizeof(rrd_value_t)
417 *rrd.stat_head->ds_cnt))==NULL){
418 rrd_set_error("allocating pdp_new ...");
428 rrd_mmaped_file = mmap(0,
430 PROT_READ | PROT_WRITE,
434 if (rrd_mmaped_file == MAP_FAILED) {
435 rrd_set_error("error mmapping file %s", filename);
444 /* loop through the arguments. */
445 for(arg_i=0; arg_i<argc;arg_i++) {
446 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
447 char *step_start = stepper;
449 char *parsetime_error = NULL;
450 enum {atstyle, normal} timesyntax;
451 struct rrd_time_value ds_tv;
452 if (stepper == NULL){
453 rrd_set_error("failed duplication argv entry");
459 munmap(rrd_mmaped_file, rrd_filesize);
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
486 updvals[tmpl_idx[ii]] = stepper;
488 if (*stepper == ':') {
492 updvals[tmpl_idx[ii]] = stepper+1;
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
535 /* dont do any correction for old version RRDs */
537 current_time_usec = 0;
539 if(current_time < rrd.live_head->last_up ||
540 (current_time == rrd.live_head->last_up &&
541 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
542 rrd_set_error("illegal attempt to update using time %ld when "
543 "last update time is %ld (minimum one second step)",
544 current_time, rrd.live_head->last_up);
550 /* seek to the beginning of the rra's */
551 if (rra_current != rra_begin) {
553 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
554 rrd_set_error("seek error in rrd");
559 rra_current = rra_begin;
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
571 /* interval = current_time - rrd.live_head->last_up; */
572 interval = (double)(current_time - rrd.live_head->last_up)
573 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
575 if (occu_pdp_st > proc_pdp_st){
576 /* OK we passed the pdp_st moment*/
577 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
578 * occurred before the latest
580 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
581 post_int = occu_pdp_age; /* how much after it */
582 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
596 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
597 occu_pdp_age, occu_pdp_st,
598 interval, pre_int, post_int);
601 /* process the data sources and update the pdp_prep
602 * area accordingly */
603 for(i=0;i<rrd.stat_head->ds_cnt;i++){
605 dst_idx= dst_conv(rrd.ds_def[i].dst);
607 /* make sure we do not build diffs with old last_ds values */
608 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
609 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
610 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
613 /* NOTE: DST_CDEF should never enter this if block, because
614 * updvals[i+1][0] is initialized to 'U'; unless the caller
615 * accidently specified a value for the DST_CDEF. To handle
616 * this case, an extra check is required. */
618 if((updvals[i+1][0] != 'U') &&
619 (dst_idx != DST_CDEF) &&
620 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
622 /* the data source type defines how to process the data */
623 /* pdp_new contains rate * time ... eg the bytes
624 * transferred during the interval. Doing it this way saves
625 * a lot of math operations */
631 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
632 for(ii=0;updvals[i+1][ii] != '\0';ii++){
633 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
634 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
638 if (rrd_test_error()){
641 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
642 if(dst_idx == DST_COUNTER) {
643 /* simple overflow catcher suggested by Andres Kroonmaa */
644 /* this will fail terribly for non 32 or 64 bit counters ... */
645 /* are there any others in SNMP land ? */
646 if (pdp_new[i] < (double)0.0 )
647 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
648 if (pdp_new[i] < (double)0.0 )
649 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
651 rate = pdp_new[i] / interval;
659 pdp_new[i] = strtod(updvals[i+1],&endptr);
661 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
664 if (endptr[0] != '\0'){
665 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
668 rate = pdp_new[i] / interval;
672 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
674 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
677 if (endptr[0] != '\0'){
678 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
681 rate = pdp_new[i] / interval;
684 rrd_set_error("rrd contains unknown DS type : '%s'",
688 /* break out of this for loop if the error string is set */
689 if (rrd_test_error()){
692 /* make sure pdp_temp is neither too large or too small
693 * if any of these occur it becomes unknown ...
695 if ( ! isnan(rate) &&
696 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
697 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
698 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
699 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
703 /* no news is news all the same */
707 /* make a copy of the command line argument for the next run */
715 rrd.pdp_prep[i].last_ds,
716 updvals[i+1], pdp_new[i]);
718 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
719 strncpy(rrd.pdp_prep[i].last_ds,
720 updvals[i+1],LAST_DS_LEN-1);
721 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
724 /* break out of the argument parsing loop if the error_string is set */
725 if (rrd_test_error()){
729 /* has a pdp_st moment occurred since the last run ? */
731 if (proc_pdp_st == occu_pdp_st){
732 /* no we have not passed a pdp_st moment. therefore update is simple */
734 for(i=0;i<rrd.stat_head->ds_cnt;i++){
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval-0.5);
738 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
745 rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
750 /* an pdp_st has occurred. */
752 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
753 * occurred up to the last run.
754 pdp_new[] contains rate*seconds from the latest run.
755 pdp_temp[] will contain the rate for cdp */
757 for(i=0;i<rrd.stat_head->ds_cnt;i++){
758 /* update pdp_prep to the current pdp_st */
759 if(isnan(pdp_new[i]))
760 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
762 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
763 pdp_new[i]/interval*pre_int;
765 /* if too much of the pdp_prep is unknown we dump it */
766 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
767 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
768 (occu_pdp_st-proc_pdp_st <=
769 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
772 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
773 / (double)( occu_pdp_st
775 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
778 /* process CDEF data sources; remember each CDEF DS can
779 * only reference other DS with a lower index number */
780 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
782 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
783 /* substitue data values for OP_VARIABLE nodes */
784 for (ii = 0; rpnp[ii].op != OP_END; ii++)
786 if (rpnp[ii].op == OP_VARIABLE) {
787 rpnp[ii].op = OP_NUMBER;
788 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
791 /* run the rpn calculator */
792 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
794 break; /* exits the data sources pdp_temp loop */
798 /* make pdp_prep ready for the next run */
799 if(isnan(pdp_new[i])){
800 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
801 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
803 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
804 rrd.pdp_prep[i].scratch[PDP_val].u_val =
805 pdp_new[i]/interval*post_int;
813 "new_unkn_sec %5lu\n",
815 rrd.pdp_prep[i].scratch[PDP_val].u_val,
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
820 /* if there were errors during the last loop, bail out here */
821 if (rrd_test_error()){
826 /* compute the number of elapsed pdp_st moments */
827 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
829 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
831 if (rra_step_cnt == NULL)
833 rra_step_cnt = (unsigned long *)
834 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
837 for(i = 0, rra_start = rra_begin;
838 i < rrd.stat_head->rra_cnt;
839 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
842 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
843 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
844 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
845 if (start_pdp_offset <= elapsed_pdp_st) {
846 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
847 rrd.rra_def[i].pdp_cnt + 1;
852 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
854 /* If this is a bulk update, we need to skip ahead in the seasonal
855 * arrays so that they will be correct for the next observed value;
856 * note that for the bulk update itself, no update will occur to
857 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
859 if (rra_step_cnt[i] > 2)
861 /* skip update by resetting rra_step_cnt[i],
862 * note that this is not data source specific; this is due
863 * to the bulk update, not a DNAN value for the specific data
866 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
867 &last_seasonal_coef);
868 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
872 /* periodically run a smoother for seasonal effects */
873 /* Need to use first cdp parameter buffer to track
874 * burnin (burnin requires a specific smoothing schedule).
875 * The CDP_init_seasonal parameter is really an RRA level,
876 * not a data source within RRA level parameter, but the rra_def
877 * is read only for rrd_update (not flushed to disk). */
878 iii = i*(rrd.stat_head -> ds_cnt);
879 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
882 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
883 > rrd.rra_def[i].row_cnt - 1) {
884 /* mark off one of the burnin cycles */
885 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
889 /* someone has no doubt invented a trick to deal with this
890 * wrap around, but at least this code is clear. */
891 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
892 rrd.rra_ptr[i].cur_row)
894 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
895 * mapping between PDP and CDP */
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
897 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
901 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
902 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
903 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
908 /* can't rely on negative numbers because we are working with
910 /* Don't need modulus here. If we've wrapped more than once, only
911 * one smooth is executed at the end. */
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
913 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
914 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
918 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
920 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
927 rra_current = ftell(rrd_file);
928 } /* if cf is DEVSEASONAL or SEASONAL */
930 if (rrd_test_error()) break;
932 /* update CDP_PREP areas */
933 /* loop over data soures within each RRA */
935 ii < rrd.stat_head->ds_cnt;
939 /* iii indexes the CDP prep area for this data source within the RRA */
940 iii=i*rrd.stat_head->ds_cnt+ii;
942 if (rrd.rra_def[i].pdp_cnt > 1) {
944 if (rra_step_cnt[i] > 0) {
945 /* If we are in this block, as least 1 CDP value will be written to
946 * disk, this is the CDP_primary_val entry. If more than 1 value needs
947 * to be written, then the "fill in" value is the CDP_secondary_val
949 if (isnan(pdp_temp[ii]))
951 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
952 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
954 /* CDP_secondary value is the RRA "fill in" value for intermediary
955 * CDP data entries. No matter the CF, the value is the same because
956 * the average, max, min, and last of a list of identical values is
957 * the same, namely, the value itself. */
958 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
961 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
962 > rrd.rra_def[i].pdp_cnt*
963 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
965 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
966 /* initialize carry over */
967 if (current_cf == CF_AVERAGE) {
968 if (isnan(pdp_temp[ii])) {
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
971 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
972 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
978 rrd_value_t cum_val, cur_val;
979 switch (current_cf) {
981 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
982 cur_val = IFDNAN(pdp_temp[ii],0.0);
983 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
984 (cum_val + cur_val * start_pdp_offset) /
985 (rrd.rra_def[i].pdp_cnt
986 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
987 /* initialize carry over value */
988 if (isnan(pdp_temp[ii])) {
989 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
991 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
992 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
996 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
997 cur_val = IFDNAN(pdp_temp[ii],-DINF);
999 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1000 isnan(pdp_temp[ii])) {
1002 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1007 if (cur_val > cum_val)
1008 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1010 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1011 /* initialize carry over value */
1012 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1015 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1016 cur_val = IFDNAN(pdp_temp[ii],DINF);
1018 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1019 isnan(pdp_temp[ii])) {
1021 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1026 if (cur_val < cum_val)
1027 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1029 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1030 /* initialize carry over value */
1031 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1035 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1036 /* initialize carry over value */
1037 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1040 } /* endif meets xff value requirement for a valid value */
1041 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1042 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1043 if (isnan(pdp_temp[ii]))
1044 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1045 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1047 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1048 } else /* rra_step_cnt[i] == 0 */
1051 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1052 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1055 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1056 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1059 if (isnan(pdp_temp[ii])) {
1060 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1061 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1063 if (current_cf == CF_AVERAGE) {
1064 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1070 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1071 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1074 switch (current_cf) {
1076 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1080 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1084 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1095 if (elapsed_pdp_st > 2)
1097 switch (current_cf) {
1100 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1101 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1104 case CF_DEVSEASONAL:
1105 /* need to update cached seasonal values, so they are consistent
1106 * with the bulk update */
1107 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1108 * CDP_last_deviation are the same. */
1109 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1110 last_seasonal_coef[ii];
1111 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1115 /* need to update the null_count and last_null_count.
1116 * even do this for non-DNAN pdp_temp because the
1117 * algorithm is not learning from batch updates. */
1118 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1120 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1124 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1125 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1128 /* do not count missed bulk values as failures */
1129 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1130 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1131 /* need to reset violations buffer.
1132 * could do this more carefully, but for now, just
1133 * assume a bulk update wipes away all violations. */
1134 erase_violations(&rrd, iii, i);
1138 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1140 if (rrd_test_error()) break;
1142 } /* endif data sources loop */
1143 } /* end RRA Loop */
1145 /* this loop is only entered if elapsed_pdp_st < 3 */
1146 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1147 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1149 for(i = 0, rra_start = rra_begin;
1150 i < rrd.stat_head->rra_cnt;
1151 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1154 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1156 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1157 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1159 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1160 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1162 rra_current = ftell(rrd_file);
1164 if (rrd_test_error()) break;
1165 /* loop over data soures within each RRA */
1167 ii < rrd.stat_head->ds_cnt;
1170 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1171 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1172 scratch_idx, seasonal_coef);
1174 } /* end RRA Loop */
1175 if (rrd_test_error()) break;
1176 } /* end elapsed_pdp_st loop */
1178 if (rrd_test_error()) break;
1180 /* Ready to write to disk */
1181 /* Move sequentially through the file, writing one RRA at a time.
1182 * Note this architecture divorces the computation of CDP with
1183 * flushing updated RRA entries to disk. */
1184 for(i = 0, rra_start = rra_begin;
1185 i < rrd.stat_head->rra_cnt;
1186 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1188 /* is there anything to write for this RRA? If not, continue. */
1189 if (rra_step_cnt[i] == 0) continue;
1191 /* write the first row */
1193 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1195 rrd.rra_ptr[i].cur_row++;
1196 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1197 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1198 /* positition on the first row */
1199 rra_pos_tmp = rra_start +
1200 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1201 if(rra_pos_tmp != rra_current) {
1203 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1204 rrd_set_error("seek error in rrd");
1208 rra_current = rra_pos_tmp;
1212 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1214 scratch_idx = CDP_primary_val;
1215 if (pcdp_summary != NULL)
1217 rra_time = (current_time - current_time
1218 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1219 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1222 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1223 pcdp_summary, &rra_time, rrd_mmaped_file);
1225 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1226 pcdp_summary, &rra_time);
1228 if (rrd_test_error()) break;
1230 /* write other rows of the bulk update, if any */
1231 scratch_idx = CDP_secondary_val;
1232 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1234 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1237 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1238 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1241 rrd.rra_ptr[i].cur_row = 0;
1242 /* seek back to beginning of current rra */
1243 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1245 rrd_set_error("seek error in rrd");
1249 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1251 rra_current = rra_start;
1253 if (pcdp_summary != NULL)
1255 rra_time = (current_time - current_time
1256 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1257 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1260 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1261 pcdp_summary, &rra_time, rrd_mmaped_file);
1263 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1264 pcdp_summary, &rra_time);
1268 if (rrd_test_error())
1272 /* break out of the argument parsing loop if error_string is set */
1273 if (rrd_test_error()){
1278 } /* endif a pdp_st has occurred */
1279 rrd.live_head->last_up = current_time;
1280 rrd.live_head->last_up_usec = current_time_usec;
1282 } /* function argument loop */
1284 if (seasonal_coef != NULL) free(seasonal_coef);
1285 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1286 if (rra_step_cnt != NULL) free(rra_step_cnt);
1287 rpnstack_free(&rpnstack);
1290 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1291 rrd_set_error("error writing(unmapping) file: %s", filename);
1294 /* if we got here and if there is an error and if the file has not been
1295 * written to, then close things up and return. */
1296 if (rrd_test_error()) {
1306 /* aargh ... that was tough ... so many loops ... anyway, its done.
1307 * we just need to write back the live header portion now*/
1309 if (fseek(rrd_file, (sizeof(stat_head_t)
1310 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1311 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1313 rrd_set_error("seek rrd for live header writeback");
1324 if(fwrite( rrd.live_head,
1325 sizeof(live_head_t), 1, rrd_file) != 1){
1326 rrd_set_error("fwrite live_head to rrd");
1337 if(fwrite( &rrd.live_head->last_up,
1338 sizeof(time_t), 1, rrd_file) != 1){
1339 rrd_set_error("fwrite live_head to rrd");
1351 if(fwrite( rrd.pdp_prep,
1353 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1354 rrd_set_error("ftwrite pdp_prep to rrd");
1364 if(fwrite( rrd.cdp_prep,
1366 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1367 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1369 rrd_set_error("ftwrite cdp_prep to rrd");
1379 if(fwrite( rrd.rra_ptr,
1381 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1382 rrd_set_error("fwrite rra_ptr to rrd");
1392 /* OK now close the files and free the memory */
1393 if(fclose(rrd_file) != 0){
1394 rrd_set_error("closing rrd");
1403 /* calling the smoothing code here guarantees at most
1404 * one smoothing operation per rrd_update call. Unfortunately,
1405 * it is possible with bulk updates, or a long-delayed update
1406 * for smoothing to occur off-schedule. This really isn't
1407 * critical except during the burning cycles. */
1408 if (schedule_smooth)
1410 rrd_file = fopen(filename,"rb+");
1411 rra_start = rra_begin;
1412 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1414 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1415 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1418 fprintf(stderr,"Running smoother for rra %ld\n",i);
1420 apply_smoother(&rrd,i,rra_start,rrd_file);
1421 if (rrd_test_error())
1424 rra_start += rrd.rra_def[i].row_cnt
1425 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1438 * get exclusive lock to whole file.
1439 * lock gets removed when we close the file
1441 * returns 0 on success
1444 LockRRD(FILE *rrdfile)
1446 int rrd_fd; /* File descriptor for RRD */
1449 rrd_fd = fileno(rrdfile);
1452 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1455 if ( _fstat( rrd_fd, &st ) == 0 ) {
1456 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1462 lock.l_type = F_WRLCK; /* exclusive write lock */
1463 lock.l_len = 0; /* whole file */
1464 lock.l_start = 0; /* start of file */
1465 lock.l_whence = SEEK_SET; /* end of file */
1467 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1477 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1478 unsigned short CDP_scratch_idx,
1480 FILE UNUSED(*rrd_file),
1484 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1487 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1488 unsigned short CDP_scratch_idx, FILE *rrd_file,
1489 info_t *pcdp_summary, time_t *rra_time)
1492 unsigned long ds_idx, cdp_idx;
1495 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1497 /* compute the cdp index */
1498 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1500 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1501 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1502 rrd -> rra_def[rra_idx].cf_nam);
1504 if (pcdp_summary != NULL)
1506 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1507 /* append info to the return hash */
1508 pcdp_summary = info_push(pcdp_summary,
1509 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1510 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1511 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1515 memcpy((char *)rrd_mmaped_file + *rra_current,
1516 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1517 sizeof(rrd_value_t));
1519 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1520 sizeof(rrd_value_t),1,rrd_file) != 1)
1522 rrd_set_error("writing rrd");
1526 *rra_current += sizeof(rrd_value_t);
1528 return (pcdp_summary);