1 /*****************************************************************************
2 * RRDtool 1.2.3 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
32 #include <sys/timeb.h>
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46 struct timeb current_time;
48 _ftime(¤t_time);
50 t->tv_sec = current_time.time;
51 t->tv_usec = current_time.millitm * 1000;
56 * normilize time as returned by gettimeofday. usec part must
59 static void normalize_time(struct timeval *t)
63 t->tv_usec += 1000000L;
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
71 unsigned long *rra_current,
72 unsigned short CDP_scratch_idx, FILE *rrd_file,
73 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx, FILE *rrd_file,
78 info_t *pcdp_summary, time_t *rra_time);
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv,
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
89 main(int argc, char **argv){
90 rrd_update(argc,argv);
91 if (rrd_test_error()) {
92 printf("RRDtool 1.2.3 Copyright by Tobi Oetiker, 1997-2005\n\n"
93 "Usage: rrdupdate filename\n"
94 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95 "\t\t\ttime|N:value[:value...]\n\n"
96 "\t\t\tat-time@value[:value...]\n\n"
97 "\t\t\t[ time:value[:value...] ..]\n\n");
99 printf("ERROR: %s\n",rrd_get_error());
107 info_t *rrd_update_v(int argc, char **argv)
109 char *template = NULL;
110 info_t *result = NULL;
112 optind = 0; opterr = 0; /* initialize getopt */
115 static struct option long_options[] =
117 {"template", required_argument, 0, 't'},
120 int option_index = 0;
122 opt = getopt_long(argc, argv, "t:",
123 long_options, &option_index);
134 rrd_set_error("unknown option '%s'",argv[optind-1]);
140 /* need at least 2 arguments: filename, data. */
141 if (argc-optind < 2) {
142 rrd_set_error("Not enough arguments");
146 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
147 rc.u_int = _rrd_update(argv[optind], template,
148 argc - optind - 1, argv + optind + 1, result);
149 result->value.u_int = rc.u_int;
155 rrd_update(int argc, char **argv)
157 char *template = NULL;
159 optind = 0; opterr = 0; /* initialize getopt */
162 static struct option long_options[] =
164 {"template", required_argument, 0, 't'},
167 int option_index = 0;
169 opt = getopt_long(argc, argv, "t:",
170 long_options, &option_index);
181 rrd_set_error("unknown option '%s'",argv[optind-1]);
186 /* need at least 2 arguments: filename, data. */
187 if (argc-optind < 2) {
188 rrd_set_error("Not enough arguments");
193 rc = rrd_update_r(argv[optind], template,
194 argc - optind - 1, argv + optind + 1);
199 rrd_update_r(char *filename, char *template, int argc, char **argv)
201 return _rrd_update(filename, template, argc, argv, NULL);
205 _rrd_update(char *filename, char *template, int argc, char **argv,
206 info_t *pcdp_summary)
211 unsigned long i,ii,iii=1;
213 unsigned long rra_begin; /* byte pointer to the rra
214 * area in the rrd file. this
215 * pointer never changes value */
216 unsigned long rra_start; /* byte pointer to the rra
217 * area in the rrd file. this
218 * pointer changes as each rrd is
220 unsigned long rra_current; /* byte pointer to the current write
221 * spot in the rrd file. */
222 unsigned long rra_pos_tmp; /* temporary byte pointer. */
224 pre_int,post_int; /* interval between this and
226 unsigned long proc_pdp_st; /* which pdp_st was the last
228 unsigned long occu_pdp_st; /* when was the pdp_st
229 * before the last update
231 unsigned long proc_pdp_age; /* how old was the data in
232 * the pdp prep area when it
233 * was last updated */
234 unsigned long occu_pdp_age; /* how long ago was the last
236 rrd_value_t *pdp_new; /* prepare the incoming data
237 * to be added the the
239 rrd_value_t *pdp_temp; /* prepare the pdp values
240 * to be added the the
243 long *tmpl_idx; /* index representing the settings
244 transported by the template index */
245 unsigned long tmpl_cnt = 2; /* time and data */
250 time_t rra_time; /* time of update for a RRA */
251 unsigned long current_time_usec; /* microseconds part of current time */
252 struct timeval tmp_time; /* used for time conversion */
255 int schedule_smooth = 0;
256 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
257 /* a vector of future Holt-Winters seasonal coefs */
258 unsigned long elapsed_pdp_st;
259 /* number of elapsed PDP steps since last update */
260 unsigned long *rra_step_cnt = NULL;
261 /* number of rows to be updated in an RRA for a data
263 unsigned long start_pdp_offset;
264 /* number of PDP steps since the last update that
265 * are assigned to the first CDP to be generated
266 * since the last update. */
267 unsigned short scratch_idx;
268 /* index into the CDP scratch array */
269 enum cf_en current_cf;
270 /* numeric id of the current consolidation function */
271 rpnstack_t rpnstack; /* used for COMPUTE DS */
272 int version; /* rrd version */
273 char *endptr; /* used in the conversion */
275 void *rrd_mmaped_file;
276 unsigned long rrd_filesize;
279 rpnstack_init(&rpnstack);
281 /* need at least 1 arguments: data. */
283 rrd_set_error("Not enough arguments");
289 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
292 /* initialize time */
293 version = atoi(rrd.stat_head->version);
294 gettimeofday(&tmp_time, 0);
295 normalize_time(&tmp_time);
296 current_time = tmp_time.tv_sec;
298 current_time_usec = tmp_time.tv_usec;
301 current_time_usec = 0;
304 rra_current = rra_start = rra_begin = ftell(rrd_file);
305 /* This is defined in the ANSI C standard, section 7.9.5.3:
307 When a file is opened with udpate mode ('+' as the second
308 or third character in the ... list of mode argument
309 variables), both input and ouptut may be performed on the
310 associated stream. However, ... input may not be directly
311 followed by output without an intervening call to a file
312 positioning function, unless the input oepration encounters
315 fseek(rrd_file, 0, SEEK_END);
316 rrd_filesize = ftell(rrd_file);
317 fseek(rrd_file, rra_current, SEEK_SET);
319 fseek(rrd_file, 0, SEEK_CUR);
323 /* get exclusive lock to whole file.
324 * lock gets removed when we close the file.
326 if (LockRRD(rrd_file) != 0) {
327 rrd_set_error("could not lock RRD");
333 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
334 rrd_set_error("allocating updvals pointer array");
340 if ((pdp_temp = malloc(sizeof(rrd_value_t)
341 *rrd.stat_head->ds_cnt))==NULL){
342 rrd_set_error("allocating pdp_temp ...");
349 if ((tmpl_idx = malloc(sizeof(unsigned long)
350 *(rrd.stat_head->ds_cnt+1)))==NULL){
351 rrd_set_error("allocating tmpl_idx ...");
358 /* initialize template redirector */
359 /* default config example (assume DS 1 is a CDEF DS)
360 tmpl_idx[0] -> 0; (time)
361 tmpl_idx[1] -> 1; (DS 0)
362 tmpl_idx[2] -> 3; (DS 2)
363 tmpl_idx[3] -> 4; (DS 3) */
364 tmpl_idx[0] = 0; /* time */
365 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
367 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374 unsigned int tmpl_len;
376 tmpl_cnt = 1; /* the first entry is the time */
377 tmpl_len = strlen(template);
378 for(i=0;i<=tmpl_len ;i++) {
379 if (template[i] == ':' || template[i] == '\0') {
381 if (tmpl_cnt>rrd.stat_head->ds_cnt){
382 rrd_set_error("Template contains more DS definitions than RRD");
383 free(updvals); free(pdp_temp);
384 free(tmpl_idx); rrd_free(&rrd);
385 fclose(rrd_file); return(-1);
387 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
388 rrd_set_error("unknown DS name '%s'",dsname);
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
393 /* the first element is always the time */
394 tmpl_idx[tmpl_cnt-1]++;
395 /* go to the next entry on the template */
396 dsname = &template[i+1];
397 /* fix the damage we did before */
406 if ((pdp_new = malloc(sizeof(rrd_value_t)
407 *rrd.stat_head->ds_cnt))==NULL){
408 rrd_set_error("allocating pdp_new ...");
418 rrd_mmaped_file = mmap(0,
420 PROT_READ | PROT_WRITE,
424 if (rrd_mmaped_file == MAP_FAILED) {
425 rrd_set_error("error mmapping file %s", filename);
434 /* loop through the arguments. */
435 for(arg_i=0; arg_i<argc;arg_i++) {
436 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
437 char *step_start = stepper;
439 char *parsetime_error = NULL;
440 enum {atstyle, normal} timesyntax;
441 struct rrd_time_value ds_tv;
442 if (stepper == NULL){
443 rrd_set_error("failed duplication argv entry");
449 munmap(rrd_mmaped_file, rrd_filesize);
454 /* initialize all ds input to unknown except the first one
455 which has always got to be set */
456 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
457 strcpy(stepper,argv[arg_i]);
459 /* separate all ds elements; first must be examined separately
460 due to alternate time syntax */
461 if ((p=strchr(stepper,'@'))!=NULL) {
462 timesyntax = atstyle;
465 } else if ((p=strchr(stepper,':'))!=NULL) {
470 rrd_set_error("expected timestamp not found in data source from %s:...",
476 updvals[tmpl_idx[ii]] = stepper;
478 if (*stepper == ':') {
482 updvals[tmpl_idx[ii]] = stepper+1;
488 if (ii != tmpl_cnt-1) {
489 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
490 tmpl_cnt-1, ii, argv[arg_i]);
495 /* get the time from the reading ... handle N */
496 if (timesyntax == atstyle) {
497 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
498 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
502 if (ds_tv.type == RELATIVE_TO_END_TIME ||
503 ds_tv.type == RELATIVE_TO_START_TIME) {
504 rrd_set_error("specifying time relative to the 'start' "
505 "or 'end' makes no sense here: %s",
511 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
512 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
514 } else if (strcmp(updvals[0],"N")==0){
515 gettimeofday(&tmp_time, 0);
516 normalize_time(&tmp_time);
517 current_time = tmp_time.tv_sec;
518 current_time_usec = tmp_time.tv_usec;
521 tmp = strtod(updvals[0], 0);
522 current_time = floor(tmp);
523 current_time_usec = (long)((tmp - current_time) * 1000000L);
525 /* dont do any correction for old version RRDs */
527 current_time_usec = 0;
529 if(current_time <= rrd.live_head->last_up){
530 rrd_set_error("illegal attempt to update using time %ld when "
531 "last update time is %ld (minimum one second step)",
532 current_time, rrd.live_head->last_up);
538 /* seek to the beginning of the rra's */
539 if (rra_current != rra_begin) {
541 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
542 rrd_set_error("seek error in rrd");
547 rra_current = rra_begin;
549 rra_start = rra_begin;
551 /* when was the current pdp started */
552 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
553 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
555 /* when did the last pdp_st occur */
556 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
557 occu_pdp_st = current_time - occu_pdp_age;
558 /* interval = current_time - rrd.live_head->last_up; */
559 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
561 if (occu_pdp_st > proc_pdp_st){
562 /* OK we passed the pdp_st moment*/
563 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
564 * occurred before the latest
566 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
567 post_int = occu_pdp_age; /* how much after it */
568 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
582 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
583 occu_pdp_age, occu_pdp_st,
584 interval, pre_int, post_int);
587 /* process the data sources and update the pdp_prep
588 * area accordingly */
589 for(i=0;i<rrd.stat_head->ds_cnt;i++){
591 dst_idx= dst_conv(rrd.ds_def[i].dst);
592 /* NOTE: DST_CDEF should never enter this if block, because
593 * updvals[i+1][0] is initialized to 'U'; unless the caller
594 * accidently specified a value for the DST_CDEF. To handle
595 * this case, an extra check is required. */
596 if((updvals[i+1][0] != 'U') &&
597 (dst_idx != DST_CDEF) &&
598 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
600 /* the data source type defines how to process the data */
601 /* pdp_new contains rate * time ... eg the bytes
602 * transferred during the interval. Doing it this way saves
603 * a lot of math operations */
609 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
610 for(ii=0;updvals[i+1][ii] != '\0';ii++){
611 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
612 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
616 if (rrd_test_error()){
619 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
620 if(dst_idx == DST_COUNTER) {
621 /* simple overflow catcher suggested by Andres Kroonmaa */
622 /* this will fail terribly for non 32 or 64 bit counters ... */
623 /* are there any others in SNMP land ? */
624 if (pdp_new[i] < (double)0.0 )
625 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
626 if (pdp_new[i] < (double)0.0 )
627 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
629 rate = pdp_new[i] / interval;
637 pdp_new[i] = strtod(updvals[i+1],&endptr);
639 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
642 if (endptr[0] != '\0'){
643 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
646 rate = pdp_new[i] / interval;
650 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
652 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
655 if (endptr[0] != '\0'){
656 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
659 rate = pdp_new[i] / interval;
662 rrd_set_error("rrd contains unknown DS type : '%s'",
666 /* break out of this for loop if the error string is set */
667 if (rrd_test_error()){
670 /* make sure pdp_temp is neither too large or too small
671 * if any of these occur it becomes unknown ...
673 if ( ! isnan(rate) &&
674 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
675 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
676 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
677 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
681 /* no news is news all the same */
685 /* make a copy of the command line argument for the next run */
693 rrd.pdp_prep[i].last_ds,
694 updvals[i+1], pdp_new[i]);
696 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
697 strncpy(rrd.pdp_prep[i].last_ds,
698 updvals[i+1],LAST_DS_LEN-1);
699 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
702 /* break out of the argument parsing loop if the error_string is set */
703 if (rrd_test_error()){
707 /* has a pdp_st moment occurred since the last run ? */
709 if (proc_pdp_st == occu_pdp_st){
710 /* no we have not passed a pdp_st moment. therefore update is simple */
712 for(i=0;i<rrd.stat_head->ds_cnt;i++){
713 if(isnan(pdp_new[i]))
714 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
716 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
723 rrd.pdp_prep[i].scratch[PDP_val].u_val,
724 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
728 /* an pdp_st has occurred. */
730 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
731 * occurred up to the last run.
732 pdp_new[] contains rate*seconds from the latest run.
733 pdp_temp[] will contain the rate for cdp */
735 for(i=0;i<rrd.stat_head->ds_cnt;i++){
736 /* update pdp_prep to the current pdp_st */
737 if(isnan(pdp_new[i]))
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
740 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
741 pdp_new[i]/(double)interval*(double)pre_int;
743 /* if too much of the pdp_prep is unknown we dump it */
744 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
745 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
746 (occu_pdp_st-proc_pdp_st <=
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
750 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
751 / (double)( occu_pdp_st
753 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
756 /* process CDEF data sources; remember each CDEF DS can
757 * only reference other DS with a lower index number */
758 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
760 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
761 /* substitue data values for OP_VARIABLE nodes */
762 for (ii = 0; rpnp[ii].op != OP_END; ii++)
764 if (rpnp[ii].op == OP_VARIABLE) {
765 rpnp[ii].op = OP_NUMBER;
766 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
769 /* run the rpn calculator */
770 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
772 break; /* exits the data sources pdp_temp loop */
776 /* make pdp_prep ready for the next run */
777 if(isnan(pdp_new[i])){
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
779 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
781 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
782 rrd.pdp_prep[i].scratch[PDP_val].u_val =
783 pdp_new[i]/(double)interval*(double)post_int;
791 "new_unkn_sec %5lu\n",
793 rrd.pdp_prep[i].scratch[PDP_val].u_val,
794 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
798 /* if there were errors during the last loop, bail out here */
799 if (rrd_test_error()){
804 /* compute the number of elapsed pdp_st moments */
805 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
807 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
809 if (rra_step_cnt == NULL)
811 rra_step_cnt = (unsigned long *)
812 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
815 for(i = 0, rra_start = rra_begin;
816 i < rrd.stat_head->rra_cnt;
817 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
820 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
821 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
822 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
823 if (start_pdp_offset <= elapsed_pdp_st) {
824 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
825 rrd.rra_def[i].pdp_cnt + 1;
830 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
832 /* If this is a bulk update, we need to skip ahead in the seasonal
833 * arrays so that they will be correct for the next observed value;
834 * note that for the bulk update itself, no update will occur to
835 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
837 if (rra_step_cnt[i] > 2)
839 /* skip update by resetting rra_step_cnt[i],
840 * note that this is not data source specific; this is due
841 * to the bulk update, not a DNAN value for the specific data
844 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
845 &last_seasonal_coef);
846 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
850 /* periodically run a smoother for seasonal effects */
851 /* Need to use first cdp parameter buffer to track
852 * burnin (burnin requires a specific smoothing schedule).
853 * The CDP_init_seasonal parameter is really an RRA level,
854 * not a data source within RRA level parameter, but the rra_def
855 * is read only for rrd_update (not flushed to disk). */
856 iii = i*(rrd.stat_head -> ds_cnt);
857 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
860 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
861 > rrd.rra_def[i].row_cnt - 1) {
862 /* mark off one of the burnin cycles */
863 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
867 /* someone has no doubt invented a trick to deal with this
868 * wrap around, but at least this code is clear. */
869 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
870 rrd.rra_ptr[i].cur_row)
872 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
873 * mapping between PDP and CDP */
874 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
875 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
879 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
880 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
881 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
886 /* can't rely on negative numbers because we are working with
888 /* Don't need modulus here. If we've wrapped more than once, only
889 * one smooth is executed at the end. */
890 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
891 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
892 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
896 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
897 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
898 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 rra_current = ftell(rrd_file);
906 } /* if cf is DEVSEASONAL or SEASONAL */
908 if (rrd_test_error()) break;
910 /* update CDP_PREP areas */
911 /* loop over data soures within each RRA */
913 ii < rrd.stat_head->ds_cnt;
917 /* iii indexes the CDP prep area for this data source within the RRA */
918 iii=i*rrd.stat_head->ds_cnt+ii;
920 if (rrd.rra_def[i].pdp_cnt > 1) {
922 if (rra_step_cnt[i] > 0) {
923 /* If we are in this block, as least 1 CDP value will be written to
924 * disk, this is the CDP_primary_val entry. If more than 1 value needs
925 * to be written, then the "fill in" value is the CDP_secondary_val
927 if (isnan(pdp_temp[ii]))
929 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
930 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
932 /* CDP_secondary value is the RRA "fill in" value for intermediary
933 * CDP data entries. No matter the CF, the value is the same because
934 * the average, max, min, and last of a list of identical values is
935 * the same, namely, the value itself. */
936 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
939 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
940 > rrd.rra_def[i].pdp_cnt*
941 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
943 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
944 /* initialize carry over */
945 if (current_cf == CF_AVERAGE) {
946 if (isnan(pdp_temp[ii])) {
947 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
949 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
950 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
953 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
956 rrd_value_t cum_val, cur_val;
957 switch (current_cf) {
959 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
960 cur_val = IFDNAN(pdp_temp[ii],0.0);
961 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
962 (cum_val + cur_val * start_pdp_offset) /
963 (rrd.rra_def[i].pdp_cnt
964 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
965 /* initialize carry over value */
966 if (isnan(pdp_temp[ii])) {
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
970 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
974 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
975 cur_val = IFDNAN(pdp_temp[ii],-DINF);
977 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978 isnan(pdp_temp[ii])) {
980 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
985 if (cur_val > cum_val)
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989 /* initialize carry over value */
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
993 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
994 cur_val = IFDNAN(pdp_temp[ii],DINF);
996 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
997 isnan(pdp_temp[ii])) {
999 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1004 if (cur_val < cum_val)
1005 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1007 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1008 /* initialize carry over value */
1009 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1013 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1014 /* initialize carry over value */
1015 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1018 } /* endif meets xff value requirement for a valid value */
1019 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1020 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1021 if (isnan(pdp_temp[ii]))
1022 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1023 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1025 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1026 } else /* rra_step_cnt[i] == 0 */
1029 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1030 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1033 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1034 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1037 if (isnan(pdp_temp[ii])) {
1038 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1039 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1041 if (current_cf == CF_AVERAGE) {
1042 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1045 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1049 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1052 switch (current_cf) {
1054 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1058 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1059 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1063 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1072 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1073 if (elapsed_pdp_st > 2)
1075 switch (current_cf) {
1078 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1079 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1082 case CF_DEVSEASONAL:
1083 /* need to update cached seasonal values, so they are consistent
1084 * with the bulk update */
1085 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1086 * CDP_last_deviation are the same. */
1087 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1088 last_seasonal_coef[ii];
1089 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1093 /* need to update the null_count and last_null_count.
1094 * even do this for non-DNAN pdp_temp because the
1095 * algorithm is not learning from batch updates. */
1096 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1098 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1102 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1103 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1106 /* do not count missed bulk values as failures */
1107 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1108 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1109 /* need to reset violations buffer.
1110 * could do this more carefully, but for now, just
1111 * assume a bulk update wipes away all violations. */
1112 erase_violations(&rrd, iii, i);
1116 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1118 if (rrd_test_error()) break;
1120 } /* endif data sources loop */
1121 } /* end RRA Loop */
1123 /* this loop is only entered if elapsed_pdp_st < 3 */
1124 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1125 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1127 for(i = 0, rra_start = rra_begin;
1128 i < rrd.stat_head->rra_cnt;
1129 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1132 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1134 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1135 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1137 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1138 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1140 rra_current = ftell(rrd_file);
1142 if (rrd_test_error()) break;
1143 /* loop over data soures within each RRA */
1145 ii < rrd.stat_head->ds_cnt;
1148 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1149 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1150 scratch_idx, seasonal_coef);
1152 } /* end RRA Loop */
1153 if (rrd_test_error()) break;
1154 } /* end elapsed_pdp_st loop */
1156 if (rrd_test_error()) break;
1158 /* Ready to write to disk */
1159 /* Move sequentially through the file, writing one RRA at a time.
1160 * Note this architecture divorces the computation of CDP with
1161 * flushing updated RRA entries to disk. */
1162 for(i = 0, rra_start = rra_begin;
1163 i < rrd.stat_head->rra_cnt;
1164 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1166 /* is there anything to write for this RRA? If not, continue. */
1167 if (rra_step_cnt[i] == 0) continue;
1169 /* write the first row */
1171 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1173 rrd.rra_ptr[i].cur_row++;
1174 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1175 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1176 /* positition on the first row */
1177 rra_pos_tmp = rra_start +
1178 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1179 if(rra_pos_tmp != rra_current) {
1181 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1182 rrd_set_error("seek error in rrd");
1186 rra_current = rra_pos_tmp;
1190 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1192 scratch_idx = CDP_primary_val;
1193 if (pcdp_summary != NULL)
1195 rra_time = (current_time - current_time
1196 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1197 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1200 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1201 pcdp_summary, &rra_time, rrd_mmaped_file);
1203 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1204 pcdp_summary, &rra_time);
1206 if (rrd_test_error()) break;
1208 /* write other rows of the bulk update, if any */
1209 scratch_idx = CDP_secondary_val;
1210 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1212 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1215 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1216 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1219 rrd.rra_ptr[i].cur_row = 0;
1220 /* seek back to beginning of current rra */
1221 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1223 rrd_set_error("seek error in rrd");
1227 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1229 rra_current = rra_start;
1231 if (pcdp_summary != NULL)
1233 rra_time = (current_time - current_time
1234 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1235 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1238 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1239 pcdp_summary, &rra_time, rrd_mmaped_file);
1241 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1242 pcdp_summary, &rra_time);
1246 if (rrd_test_error())
1250 /* break out of the argument parsing loop if error_string is set */
1251 if (rrd_test_error()){
1256 } /* endif a pdp_st has occurred */
1257 rrd.live_head->last_up = current_time;
1258 rrd.live_head->last_up_usec = current_time_usec;
1260 } /* function argument loop */
1262 if (seasonal_coef != NULL) free(seasonal_coef);
1263 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1264 if (rra_step_cnt != NULL) free(rra_step_cnt);
1265 rpnstack_free(&rpnstack);
1268 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1269 rrd_set_error("error writing(unmapping) file: %s", filename);
1272 /* if we got here and if there is an error and if the file has not been
1273 * written to, then close things up and return. */
1274 if (rrd_test_error()) {
1284 /* aargh ... that was tough ... so many loops ... anyway, its done.
1285 * we just need to write back the live header portion now*/
1287 if (fseek(rrd_file, (sizeof(stat_head_t)
1288 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1289 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1291 rrd_set_error("seek rrd for live header writeback");
1302 if(fwrite( rrd.live_head,
1303 sizeof(live_head_t), 1, rrd_file) != 1){
1304 rrd_set_error("fwrite live_head to rrd");
1315 if(fwrite( &rrd.live_head->last_up,
1316 sizeof(time_t), 1, rrd_file) != 1){
1317 rrd_set_error("fwrite live_head to rrd");
1329 if(fwrite( rrd.pdp_prep,
1331 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332 rrd_set_error("ftwrite pdp_prep to rrd");
1342 if(fwrite( rrd.cdp_prep,
1344 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1345 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1347 rrd_set_error("ftwrite cdp_prep to rrd");
1357 if(fwrite( rrd.rra_ptr,
1359 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360 rrd_set_error("fwrite rra_ptr to rrd");
1370 /* OK now close the files and free the memory */
1371 if(fclose(rrd_file) != 0){
1372 rrd_set_error("closing rrd");
1381 /* calling the smoothing code here guarantees at most
1382 * one smoothing operation per rrd_update call. Unfortunately,
1383 * it is possible with bulk updates, or a long-delayed update
1384 * for smoothing to occur off-schedule. This really isn't
1385 * critical except during the burning cycles. */
1386 if (schedule_smooth)
1388 rrd_file = fopen(filename,"rb+");
1389 rra_start = rra_begin;
1390 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1392 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1393 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1396 fprintf(stderr,"Running smoother for rra %ld\n",i);
1398 apply_smoother(&rrd,i,rra_start,rrd_file);
1399 if (rrd_test_error())
1402 rra_start += rrd.rra_def[i].row_cnt
1403 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1416 * get exclusive lock to whole file.
1417 * lock gets removed when we close the file
1419 * returns 0 on success
1422 LockRRD(FILE *rrdfile)
1424 int rrd_fd; /* File descriptor for RRD */
1427 rrd_fd = fileno(rrdfile);
1430 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1433 if ( _fstat( rrd_fd, &st ) == 0 ) {
1434 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1440 lock.l_type = F_WRLCK; /* exclusive write lock */
1441 lock.l_len = 0; /* whole file */
1442 lock.l_start = 0; /* start of file */
1443 lock.l_whence = SEEK_SET; /* end of file */
1445 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1455 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1456 unsigned short CDP_scratch_idx, FILE *rrd_file,
1457 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1460 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1461 unsigned short CDP_scratch_idx, FILE *rrd_file,
1462 info_t *pcdp_summary, time_t *rra_time)
1465 unsigned long ds_idx, cdp_idx;
1468 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1470 /* compute the cdp index */
1471 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1473 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1474 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1475 rrd -> rra_def[rra_idx].cf_nam);
1477 if (pcdp_summary != NULL)
1479 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1480 /* append info to the return hash */
1481 pcdp_summary = info_push(pcdp_summary,
1482 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1483 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1484 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1488 memcpy((char *)rrd_mmaped_file + *rra_current,
1489 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1490 sizeof(rrd_value_t));
1492 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1493 sizeof(rrd_value_t),1,rrd_file) != 1)
1495 rrd_set_error("writing rrd");
1499 *rra_current += sizeof(rrd_value_t);
1501 return (pcdp_summary);