1 /*****************************************************************************
2 * RRDtool 1.2.10 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
36 time_t tv_sec; /* seconds */
37 long tv_usec; /* microseconds */
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
47 struct timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
57 * normilize time as returned by gettimeofday. usec part must
60 static void normalize_time(struct timeval *t)
64 t->tv_usec += 1000000L;
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx,
75 FILE UNUSED(*rrd_file),
79 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
82 unsigned long *rra_current,
83 unsigned short CDP_scratch_idx, FILE *rrd_file,
84 info_t *pcdp_summary, time_t *rra_time);
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv,
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95 main(int argc, char **argv){
96 rrd_update(argc,argv);
97 if (rrd_test_error()) {
98 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
99 "Usage: rrdupdate filename\n"
100 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101 "\t\t\ttime|N:value[:value...]\n\n"
102 "\t\t\tat-time@value[:value...]\n\n"
103 "\t\t\t[ time:value[:value...] ..]\n\n");
105 printf("ERROR: %s\n",rrd_get_error());
113 info_t *rrd_update_v(int argc, char **argv)
115 char *template = NULL;
116 info_t *result = NULL;
118 optind = 0; opterr = 0; /* initialize getopt */
121 static struct option long_options[] =
123 {"template", required_argument, 0, 't'},
126 int option_index = 0;
128 opt = getopt_long(argc, argv, "t:",
129 long_options, &option_index);
140 rrd_set_error("unknown option '%s'",argv[optind-1]);
146 /* need at least 2 arguments: filename, data. */
147 if (argc-optind < 2) {
148 rrd_set_error("Not enough arguments");
152 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153 rc.u_int = _rrd_update(argv[optind], template,
154 argc - optind - 1, argv + optind + 1, result);
155 result->value.u_int = rc.u_int;
161 rrd_update(int argc, char **argv)
163 char *template = NULL;
165 optind = 0; opterr = 0; /* initialize getopt */
168 static struct option long_options[] =
170 {"template", required_argument, 0, 't'},
173 int option_index = 0;
175 opt = getopt_long(argc, argv, "t:",
176 long_options, &option_index);
187 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
199 rc = rrd_update_r(argv[optind], template,
200 argc - optind - 1, argv + optind + 1);
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
207 return _rrd_update(filename, template, argc, argv, NULL);
211 _rrd_update(char *filename, char *template, int argc, char **argv,
212 info_t *pcdp_summary)
217 unsigned long i,ii,iii=1;
219 unsigned long rra_begin; /* byte pointer to the rra
220 * area in the rrd file. this
221 * pointer never changes value */
222 unsigned long rra_start; /* byte pointer to the rra
223 * area in the rrd file. this
224 * pointer changes as each rrd is
226 unsigned long rra_current; /* byte pointer to the current write
227 * spot in the rrd file. */
228 unsigned long rra_pos_tmp; /* temporary byte pointer. */
230 pre_int,post_int; /* interval between this and
232 unsigned long proc_pdp_st; /* which pdp_st was the last
234 unsigned long occu_pdp_st; /* when was the pdp_st
235 * before the last update
237 unsigned long proc_pdp_age; /* how old was the data in
238 * the pdp prep area when it
239 * was last updated */
240 unsigned long occu_pdp_age; /* how long ago was the last
242 rrd_value_t *pdp_new; /* prepare the incoming data
243 * to be added the the
245 rrd_value_t *pdp_temp; /* prepare the pdp values
246 * to be added the the
249 long *tmpl_idx; /* index representing the settings
250 transported by the template index */
251 unsigned long tmpl_cnt = 2; /* time and data */
255 time_t current_time = 0;
256 time_t rra_time = 0; /* time of update for a RRA */
257 unsigned long current_time_usec=0;/* microseconds part of current time */
258 struct timeval tmp_time; /* used for time conversion */
261 int schedule_smooth = 0;
262 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263 /* a vector of future Holt-Winters seasonal coefs */
264 unsigned long elapsed_pdp_st;
265 /* number of elapsed PDP steps since last update */
266 unsigned long *rra_step_cnt = NULL;
267 /* number of rows to be updated in an RRA for a data
269 unsigned long start_pdp_offset;
270 /* number of PDP steps since the last update that
271 * are assigned to the first CDP to be generated
272 * since the last update. */
273 unsigned short scratch_idx;
274 /* index into the CDP scratch array */
275 enum cf_en current_cf;
276 /* numeric id of the current consolidation function */
277 rpnstack_t rpnstack; /* used for COMPUTE DS */
278 int version; /* rrd version */
279 char *endptr; /* used in the conversion */
281 void *rrd_mmaped_file;
282 unsigned long rrd_filesize;
285 rpnstack_init(&rpnstack);
287 /* need at least 1 arguments: data. */
289 rrd_set_error("Not enough arguments");
295 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
298 /* initialize time */
299 version = atoi(rrd.stat_head->version);
300 gettimeofday(&tmp_time, 0);
301 normalize_time(&tmp_time);
302 current_time = tmp_time.tv_sec;
304 current_time_usec = tmp_time.tv_usec;
307 current_time_usec = 0;
310 rra_current = rra_start = rra_begin = ftell(rrd_file);
311 /* This is defined in the ANSI C standard, section 7.9.5.3:
313 When a file is opened with udpate mode ('+' as the second
314 or third character in the ... list of mode argument
315 variables), both input and ouptut may be performed on the
316 associated stream. However, ... input may not be directly
317 followed by output without an intervening call to a file
318 positioning function, unless the input oepration encounters
321 fseek(rrd_file, 0, SEEK_END);
322 rrd_filesize = ftell(rrd_file);
323 fseek(rrd_file, rra_current, SEEK_SET);
325 fseek(rrd_file, 0, SEEK_CUR);
329 /* get exclusive lock to whole file.
330 * lock gets removed when we close the file.
332 if (LockRRD(rrd_file) != 0) {
333 rrd_set_error("could not lock RRD");
339 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340 rrd_set_error("allocating updvals pointer array");
346 if ((pdp_temp = malloc(sizeof(rrd_value_t)
347 *rrd.stat_head->ds_cnt))==NULL){
348 rrd_set_error("allocating pdp_temp ...");
355 if ((tmpl_idx = malloc(sizeof(unsigned long)
356 *(rrd.stat_head->ds_cnt+1)))==NULL){
357 rrd_set_error("allocating tmpl_idx ...");
364 /* initialize template redirector */
365 /* default config example (assume DS 1 is a CDEF DS)
366 tmpl_idx[0] -> 0; (time)
367 tmpl_idx[1] -> 1; (DS 0)
368 tmpl_idx[2] -> 3; (DS 2)
369 tmpl_idx[3] -> 4; (DS 3) */
370 tmpl_idx[0] = 0; /* time */
371 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
373 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
380 unsigned int tmpl_len;
382 tmpl_cnt = 1; /* the first entry is the time */
383 tmpl_len = strlen(template);
384 for(i=0;i<=tmpl_len ;i++) {
385 if (template[i] == ':' || template[i] == '\0') {
387 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388 rrd_set_error("Template contains more DS definitions than RRD");
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
393 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394 rrd_set_error("unknown DS name '%s'",dsname);
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
399 /* the first element is always the time */
400 tmpl_idx[tmpl_cnt-1]++;
401 /* go to the next entry on the template */
402 dsname = &template[i+1];
403 /* fix the damage we did before */
412 if ((pdp_new = malloc(sizeof(rrd_value_t)
413 *rrd.stat_head->ds_cnt))==NULL){
414 rrd_set_error("allocating pdp_new ...");
424 rrd_mmaped_file = mmap(0,
426 PROT_READ | PROT_WRITE,
430 if (rrd_mmaped_file == MAP_FAILED) {
431 rrd_set_error("error mmapping file %s", filename);
440 /* loop through the arguments. */
441 for(arg_i=0; arg_i<argc;arg_i++) {
442 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443 char *step_start = stepper;
445 char *parsetime_error = NULL;
446 enum {atstyle, normal} timesyntax;
447 struct rrd_time_value ds_tv;
448 if (stepper == NULL){
449 rrd_set_error("failed duplication argv entry");
455 munmap(rrd_mmaped_file, rrd_filesize);
460 /* initialize all ds input to unknown except the first one
461 which has always got to be set */
462 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463 strcpy(stepper,argv[arg_i]);
465 /* separate all ds elements; first must be examined separately
466 due to alternate time syntax */
467 if ((p=strchr(stepper,'@'))!=NULL) {
468 timesyntax = atstyle;
471 } else if ((p=strchr(stepper,':'))!=NULL) {
476 rrd_set_error("expected timestamp not found in data source from %s:...",
482 updvals[tmpl_idx[ii]] = stepper;
484 if (*stepper == ':') {
488 updvals[tmpl_idx[ii]] = stepper+1;
494 if (ii != tmpl_cnt-1) {
495 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496 tmpl_cnt-1, ii, argv[arg_i]);
501 /* get the time from the reading ... handle N */
502 if (timesyntax == atstyle) {
503 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
508 if (ds_tv.type == RELATIVE_TO_END_TIME ||
509 ds_tv.type == RELATIVE_TO_START_TIME) {
510 rrd_set_error("specifying time relative to the 'start' "
511 "or 'end' makes no sense here: %s",
517 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
520 } else if (strcmp(updvals[0],"N")==0){
521 gettimeofday(&tmp_time, 0);
522 normalize_time(&tmp_time);
523 current_time = tmp_time.tv_sec;
524 current_time_usec = tmp_time.tv_usec;
527 tmp = strtod(updvals[0], 0);
528 current_time = floor(tmp);
529 current_time_usec = (long)((tmp - current_time) * 1000000L);
531 /* dont do any correction for old version RRDs */
533 current_time_usec = 0;
535 if(current_time <= rrd.live_head->last_up){
536 rrd_set_error("illegal attempt to update using time %ld when "
537 "last update time is %ld (minimum one second step)",
538 current_time, rrd.live_head->last_up);
544 /* seek to the beginning of the rra's */
545 if (rra_current != rra_begin) {
547 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 rrd_set_error("seek error in rrd");
553 rra_current = rra_begin;
555 rra_start = rra_begin;
557 /* when was the current pdp started */
558 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561 /* when did the last pdp_st occur */
562 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 occu_pdp_st = current_time - occu_pdp_age;
564 /* interval = current_time - rrd.live_head->last_up; */
565 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
567 if (occu_pdp_st > proc_pdp_st){
568 /* OK we passed the pdp_st moment*/
569 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570 * occurred before the latest
572 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573 post_int = occu_pdp_age; /* how much after it */
574 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
588 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
589 occu_pdp_age, occu_pdp_st,
590 interval, pre_int, post_int);
593 /* process the data sources and update the pdp_prep
594 * area accordingly */
595 for(i=0;i<rrd.stat_head->ds_cnt;i++){
597 dst_idx= dst_conv(rrd.ds_def[i].dst);
599 /* make sure we do not build diffs with old last_ds values */
600 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
601 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
602 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
605 /* NOTE: DST_CDEF should never enter this if block, because
606 * updvals[i+1][0] is initialized to 'U'; unless the caller
607 * accidently specified a value for the DST_CDEF. To handle
608 * this case, an extra check is required. */
610 if((updvals[i+1][0] != 'U') &&
611 (dst_idx != DST_CDEF) &&
612 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
614 /* the data source type defines how to process the data */
615 /* pdp_new contains rate * time ... eg the bytes
616 * transferred during the interval. Doing it this way saves
617 * a lot of math operations */
623 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
624 for(ii=0;updvals[i+1][ii] != '\0';ii++){
625 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
626 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
630 if (rrd_test_error()){
633 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
634 if(dst_idx == DST_COUNTER) {
635 /* simple overflow catcher suggested by Andres Kroonmaa */
636 /* this will fail terribly for non 32 or 64 bit counters ... */
637 /* are there any others in SNMP land ? */
638 if (pdp_new[i] < (double)0.0 )
639 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
643 rate = pdp_new[i] / interval;
651 pdp_new[i] = strtod(updvals[i+1],&endptr);
653 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656 if (endptr[0] != '\0'){
657 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660 rate = pdp_new[i] / interval;
664 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
666 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669 if (endptr[0] != '\0'){
670 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673 rate = pdp_new[i] / interval;
676 rrd_set_error("rrd contains unknown DS type : '%s'",
680 /* break out of this for loop if the error string is set */
681 if (rrd_test_error()){
684 /* make sure pdp_temp is neither too large or too small
685 * if any of these occur it becomes unknown ...
687 if ( ! isnan(rate) &&
688 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
689 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
690 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
691 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
695 /* no news is news all the same */
699 /* make a copy of the command line argument for the next run */
707 rrd.pdp_prep[i].last_ds,
708 updvals[i+1], pdp_new[i]);
710 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
711 strncpy(rrd.pdp_prep[i].last_ds,
712 updvals[i+1],LAST_DS_LEN-1);
713 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
716 /* break out of the argument parsing loop if the error_string is set */
717 if (rrd_test_error()){
721 /* has a pdp_st moment occurred since the last run ? */
723 if (proc_pdp_st == occu_pdp_st){
724 /* no we have not passed a pdp_st moment. therefore update is simple */
726 for(i=0;i<rrd.stat_head->ds_cnt;i++){
727 if(isnan(pdp_new[i]))
728 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
730 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
745 * occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for(i=0;i<rrd.stat_head->ds_cnt;i++){
750 /* update pdp_prep to the current pdp_st */
751 if(isnan(pdp_new[i]))
752 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
754 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
755 pdp_new[i]/(double)interval*(double)pre_int;
757 /* if too much of the pdp_prep is unknown we dump it */
758 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
759 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
760 (occu_pdp_st-proc_pdp_st <=
761 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
764 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
765 / (double)( occu_pdp_st
767 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
770 /* process CDEF data sources; remember each CDEF DS can
771 * only reference other DS with a lower index number */
772 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
774 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
775 /* substitue data values for OP_VARIABLE nodes */
776 for (ii = 0; rpnp[ii].op != OP_END; ii++)
778 if (rpnp[ii].op == OP_VARIABLE) {
779 rpnp[ii].op = OP_NUMBER;
780 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
783 /* run the rpn calculator */
784 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
786 break; /* exits the data sources pdp_temp loop */
790 /* make pdp_prep ready for the next run */
791 if(isnan(pdp_new[i])){
792 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
793 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
795 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
796 rrd.pdp_prep[i].scratch[PDP_val].u_val =
797 pdp_new[i]/(double)interval*(double)post_int;
805 "new_unkn_sec %5lu\n",
807 rrd.pdp_prep[i].scratch[PDP_val].u_val,
808 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
812 /* if there were errors during the last loop, bail out here */
813 if (rrd_test_error()){
818 /* compute the number of elapsed pdp_st moments */
819 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
821 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
823 if (rra_step_cnt == NULL)
825 rra_step_cnt = (unsigned long *)
826 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
829 for(i = 0, rra_start = rra_begin;
830 i < rrd.stat_head->rra_cnt;
831 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
834 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
835 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
836 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
837 if (start_pdp_offset <= elapsed_pdp_st) {
838 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
839 rrd.rra_def[i].pdp_cnt + 1;
844 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
846 /* If this is a bulk update, we need to skip ahead in the seasonal
847 * arrays so that they will be correct for the next observed value;
848 * note that for the bulk update itself, no update will occur to
849 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
851 if (rra_step_cnt[i] > 2)
853 /* skip update by resetting rra_step_cnt[i],
854 * note that this is not data source specific; this is due
855 * to the bulk update, not a DNAN value for the specific data
858 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
859 &last_seasonal_coef);
860 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
864 /* periodically run a smoother for seasonal effects */
865 /* Need to use first cdp parameter buffer to track
866 * burnin (burnin requires a specific smoothing schedule).
867 * The CDP_init_seasonal parameter is really an RRA level,
868 * not a data source within RRA level parameter, but the rra_def
869 * is read only for rrd_update (not flushed to disk). */
870 iii = i*(rrd.stat_head -> ds_cnt);
871 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
874 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
875 > rrd.rra_def[i].row_cnt - 1) {
876 /* mark off one of the burnin cycles */
877 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
881 /* someone has no doubt invented a trick to deal with this
882 * wrap around, but at least this code is clear. */
883 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
884 rrd.rra_ptr[i].cur_row)
886 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
887 * mapping between PDP and CDP */
888 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
889 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
893 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
894 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
895 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
900 /* can't rely on negative numbers because we are working with
902 /* Don't need modulus here. If we've wrapped more than once, only
903 * one smooth is executed at the end. */
904 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
905 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
906 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
910 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
911 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
912 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
919 rra_current = ftell(rrd_file);
920 } /* if cf is DEVSEASONAL or SEASONAL */
922 if (rrd_test_error()) break;
924 /* update CDP_PREP areas */
925 /* loop over data soures within each RRA */
927 ii < rrd.stat_head->ds_cnt;
931 /* iii indexes the CDP prep area for this data source within the RRA */
932 iii=i*rrd.stat_head->ds_cnt+ii;
934 if (rrd.rra_def[i].pdp_cnt > 1) {
936 if (rra_step_cnt[i] > 0) {
937 /* If we are in this block, as least 1 CDP value will be written to
938 * disk, this is the CDP_primary_val entry. If more than 1 value needs
939 * to be written, then the "fill in" value is the CDP_secondary_val
941 if (isnan(pdp_temp[ii]))
943 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
944 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
946 /* CDP_secondary value is the RRA "fill in" value for intermediary
947 * CDP data entries. No matter the CF, the value is the same because
948 * the average, max, min, and last of a list of identical values is
949 * the same, namely, the value itself. */
950 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
953 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
954 > rrd.rra_def[i].pdp_cnt*
955 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
957 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
958 /* initialize carry over */
959 if (current_cf == CF_AVERAGE) {
960 if (isnan(pdp_temp[ii])) {
961 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
963 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
964 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
970 rrd_value_t cum_val, cur_val;
971 switch (current_cf) {
973 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
974 cur_val = IFDNAN(pdp_temp[ii],0.0);
975 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
976 (cum_val + cur_val * start_pdp_offset) /
977 (rrd.rra_def[i].pdp_cnt
978 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
979 /* initialize carry over value */
980 if (isnan(pdp_temp[ii])) {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
988 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
989 cur_val = IFDNAN(pdp_temp[ii],-DINF);
991 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
992 isnan(pdp_temp[ii])) {
994 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
999 if (cur_val > cum_val)
1000 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1002 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1003 /* initialize carry over value */
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1007 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1008 cur_val = IFDNAN(pdp_temp[ii],DINF);
1010 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1011 isnan(pdp_temp[ii])) {
1013 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1018 if (cur_val < cum_val)
1019 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1021 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1022 /* initialize carry over value */
1023 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1028 /* initialize carry over value */
1029 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1032 } /* endif meets xff value requirement for a valid value */
1033 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1034 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1035 if (isnan(pdp_temp[ii]))
1036 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1037 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1039 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1040 } else /* rra_step_cnt[i] == 0 */
1043 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1044 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1047 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1048 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1051 if (isnan(pdp_temp[ii])) {
1052 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1053 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1055 if (current_cf == CF_AVERAGE) {
1056 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1059 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1063 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066 switch (current_cf) {
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1072 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1076 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1087 if (elapsed_pdp_st > 2)
1089 switch (current_cf) {
1092 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1093 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1096 case CF_DEVSEASONAL:
1097 /* need to update cached seasonal values, so they are consistent
1098 * with the bulk update */
1099 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1100 * CDP_last_deviation are the same. */
1101 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1102 last_seasonal_coef[ii];
1103 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1107 /* need to update the null_count and last_null_count.
1108 * even do this for non-DNAN pdp_temp because the
1109 * algorithm is not learning from batch updates. */
1110 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1112 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1116 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1117 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1120 /* do not count missed bulk values as failures */
1121 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1122 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1123 /* need to reset violations buffer.
1124 * could do this more carefully, but for now, just
1125 * assume a bulk update wipes away all violations. */
1126 erase_violations(&rrd, iii, i);
1130 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1132 if (rrd_test_error()) break;
1134 } /* endif data sources loop */
1135 } /* end RRA Loop */
1137 /* this loop is only entered if elapsed_pdp_st < 3 */
1138 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1139 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1141 for(i = 0, rra_start = rra_begin;
1142 i < rrd.stat_head->rra_cnt;
1143 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1146 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1148 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1149 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1151 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1152 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1154 rra_current = ftell(rrd_file);
1156 if (rrd_test_error()) break;
1157 /* loop over data soures within each RRA */
1159 ii < rrd.stat_head->ds_cnt;
1162 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1163 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1164 scratch_idx, seasonal_coef);
1166 } /* end RRA Loop */
1167 if (rrd_test_error()) break;
1168 } /* end elapsed_pdp_st loop */
1170 if (rrd_test_error()) break;
1172 /* Ready to write to disk */
1173 /* Move sequentially through the file, writing one RRA at a time.
1174 * Note this architecture divorces the computation of CDP with
1175 * flushing updated RRA entries to disk. */
1176 for(i = 0, rra_start = rra_begin;
1177 i < rrd.stat_head->rra_cnt;
1178 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1180 /* is there anything to write for this RRA? If not, continue. */
1181 if (rra_step_cnt[i] == 0) continue;
1183 /* write the first row */
1185 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1187 rrd.rra_ptr[i].cur_row++;
1188 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1189 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1190 /* positition on the first row */
1191 rra_pos_tmp = rra_start +
1192 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1193 if(rra_pos_tmp != rra_current) {
1195 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1196 rrd_set_error("seek error in rrd");
1200 rra_current = rra_pos_tmp;
1204 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1206 scratch_idx = CDP_primary_val;
1207 if (pcdp_summary != NULL)
1209 rra_time = (current_time - current_time
1210 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1214 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1215 pcdp_summary, &rra_time, rrd_mmaped_file);
1217 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1218 pcdp_summary, &rra_time);
1220 if (rrd_test_error()) break;
1222 /* write other rows of the bulk update, if any */
1223 scratch_idx = CDP_secondary_val;
1224 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1226 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1229 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1230 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1233 rrd.rra_ptr[i].cur_row = 0;
1234 /* seek back to beginning of current rra */
1235 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1237 rrd_set_error("seek error in rrd");
1241 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1243 rra_current = rra_start;
1245 if (pcdp_summary != NULL)
1247 rra_time = (current_time - current_time
1248 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1249 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1252 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1253 pcdp_summary, &rra_time, rrd_mmaped_file);
1255 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1256 pcdp_summary, &rra_time);
1260 if (rrd_test_error())
1264 /* break out of the argument parsing loop if error_string is set */
1265 if (rrd_test_error()){
1270 } /* endif a pdp_st has occurred */
1271 rrd.live_head->last_up = current_time;
1272 rrd.live_head->last_up_usec = current_time_usec;
1274 } /* function argument loop */
1276 if (seasonal_coef != NULL) free(seasonal_coef);
1277 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278 if (rra_step_cnt != NULL) free(rra_step_cnt);
1279 rpnstack_free(&rpnstack);
1282 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1283 rrd_set_error("error writing(unmapping) file: %s", filename);
1286 /* if we got here and if there is an error and if the file has not been
1287 * written to, then close things up and return. */
1288 if (rrd_test_error()) {
1298 /* aargh ... that was tough ... so many loops ... anyway, its done.
1299 * we just need to write back the live header portion now*/
1301 if (fseek(rrd_file, (sizeof(stat_head_t)
1302 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1303 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1305 rrd_set_error("seek rrd for live header writeback");
1316 if(fwrite( rrd.live_head,
1317 sizeof(live_head_t), 1, rrd_file) != 1){
1318 rrd_set_error("fwrite live_head to rrd");
1329 if(fwrite( &rrd.live_head->last_up,
1330 sizeof(time_t), 1, rrd_file) != 1){
1331 rrd_set_error("fwrite live_head to rrd");
1343 if(fwrite( rrd.pdp_prep,
1345 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1346 rrd_set_error("ftwrite pdp_prep to rrd");
1356 if(fwrite( rrd.cdp_prep,
1358 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1359 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1361 rrd_set_error("ftwrite cdp_prep to rrd");
1371 if(fwrite( rrd.rra_ptr,
1373 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1374 rrd_set_error("fwrite rra_ptr to rrd");
1384 /* OK now close the files and free the memory */
1385 if(fclose(rrd_file) != 0){
1386 rrd_set_error("closing rrd");
1395 /* calling the smoothing code here guarantees at most
1396 * one smoothing operation per rrd_update call. Unfortunately,
1397 * it is possible with bulk updates, or a long-delayed update
1398 * for smoothing to occur off-schedule. This really isn't
1399 * critical except during the burning cycles. */
1400 if (schedule_smooth)
1402 rrd_file = fopen(filename,"rb+");
1403 rra_start = rra_begin;
1404 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1406 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1407 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1410 fprintf(stderr,"Running smoother for rra %ld\n",i);
1412 apply_smoother(&rrd,i,rra_start,rrd_file);
1413 if (rrd_test_error())
1416 rra_start += rrd.rra_def[i].row_cnt
1417 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1430 * get exclusive lock to whole file.
1431 * lock gets removed when we close the file
1433 * returns 0 on success
1436 LockRRD(FILE *rrdfile)
1438 int rrd_fd; /* File descriptor for RRD */
1441 rrd_fd = fileno(rrdfile);
1444 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1447 if ( _fstat( rrd_fd, &st ) == 0 ) {
1448 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1454 lock.l_type = F_WRLCK; /* exclusive write lock */
1455 lock.l_len = 0; /* whole file */
1456 lock.l_start = 0; /* start of file */
1457 lock.l_whence = SEEK_SET; /* end of file */
1459 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1469 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1470 unsigned short CDP_scratch_idx,
1472 FILE UNUSED(*rrd_file),
1476 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1479 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1480 unsigned short CDP_scratch_idx, FILE *rrd_file,
1481 info_t *pcdp_summary, time_t *rra_time)
1484 unsigned long ds_idx, cdp_idx;
1487 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1489 /* compute the cdp index */
1490 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1492 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1493 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1494 rrd -> rra_def[rra_idx].cf_nam);
1496 if (pcdp_summary != NULL)
1498 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1499 /* append info to the return hash */
1500 pcdp_summary = info_push(pcdp_summary,
1501 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1502 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1503 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1507 memcpy((char *)rrd_mmaped_file + *rra_current,
1508 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1509 sizeof(rrd_value_t));
1511 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1512 sizeof(rrd_value_t),1,rrd_file) != 1)
1514 rrd_set_error("writing rrd");
1518 *rra_current += sizeof(rrd_value_t);
1520 return (pcdp_summary);