1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.10 2003/04/29 19:14:12 jake
9 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
10 * Also revert accidental addition of -I to aclocal MakeMakefile.
12 * Revision 1.9 2003/04/25 18:35:08 jake
13 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
15 * Revision 1.8 2003/03/31 21:22:12 oetiker
16 * enables RRDtool updates with microsecond or in case of windows millisecond
17 * precision. This is needed to reduce time measurement error when archive step
18 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
20 * Revision 1.7 2003/02/13 07:05:27 oetiker
21 * Find attached the patch I promised to send to you. Please note that there
22 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
23 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
24 * library is identical to librrd, but it contains support code for per-thread
25 * global variables currently used for error information only. This is similar
26 * to how errno per-thread variables are implemented. librrd_th must be linked
27 * alongside of libpthred
29 * There is also a new file "THREADS", holding some documentation.
31 * -- Peter Stamfest <peter@stamfest.at>
33 * Revision 1.6 2002/02/01 20:34:49 oetiker
34 * fixed version number and date/time
36 * Revision 1.5 2001/05/09 05:31:01 oetiker
37 * Bug fix: when update of multiple PDP/CDP RRAs coincided
38 * with interpolation of multiple PDPs an incorrect value was
39 * stored as the CDP. Especially evident for GAUGE data sources.
40 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
42 * Revision 1.4 2001/03/10 23:54:41 oetiker
43 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
44 * parser and calculator from rrd_graph and puts then in a new file,
45 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
46 * clean-up of aberrant behavior stuff, including a bug fix.
47 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
48 * -- Jake Brutlag <jakeb@corp.webtv.net>
50 * Revision 1.3 2001/03/04 13:01:55 oetiker
51 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
52 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
53 * This is backwards compatible! But new files using the Aberrant stuff are not readable
54 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
55 * -- Jake Brutlag <jakeb@corp.webtv.net>
57 * Revision 1.2 2001/03/04 11:14:25 oetiker
58 * added at-style-time@value:value syntax to rrd_update
59 * -- Dave Bodenstab <imdave@mcs.net>
61 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
64 *****************************************************************************/
67 #include <sys/types.h>
71 #include <sys/locking.h>
77 #include "rrd_rpncalc.h"
79 #include "rrd_is_thread_safe.h"
83 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
86 #include <sys/timeb.h>
89 time_t tv_sec; /* seconds */
90 long tv_usec; /* microseconds */
94 int tz_minuteswest; /* minutes W of Greenwich */
95 int tz_dsttime; /* type of dst correction */
98 static gettimeofday(struct timeval *t, struct __timezone *tz) {
100 struct timeb current_time;
102 _ftime(¤t_time);
104 t->tv_sec = current_time.time;
105 t->tv_usec = current_time.millitm * 1000;
110 * normilize time as returned by gettimeofday. usec part must
113 static void normalize_time(struct timeval *t)
117 t->tv_usec += 1000000L;
121 /* Local prototypes */
122 int LockRRD(FILE *rrd_file);
123 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
124 unsigned long *rra_current,
125 unsigned short CDP_scratch_idx, FILE *rrd_file,
126 info_t *pcdp_summary, time_t *rra_time);
127 int rrd_update_r(char *filename, char *template, int argc, char **argv);
128 int _rrd_update(char *filename, char *template, int argc, char **argv,
131 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
136 main(int argc, char **argv){
137 rrd_update(argc,argv);
138 if (rrd_test_error()) {
139 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
140 "Usage: rrdupdate filename\n"
141 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
142 "\t\t\ttime|N:value[:value...]\n\n"
143 "\t\t\tat-time@value[:value...]\n\n"
144 "\t\t\t[ time:value[:value...] ..]\n\n");
146 printf("ERROR: %s\n",rrd_get_error());
154 info_t *rrd_update_v(int argc, char **argv)
156 char *template = NULL;
157 info_t *result = NULL;
161 static struct option long_options[] =
163 {"template", required_argument, 0, 't'},
166 int option_index = 0;
168 opt = getopt_long(argc, argv, "t:",
169 long_options, &option_index);
180 rrd_set_error("unknown option '%s'",argv[optind-1]);
186 /* need at least 2 arguments: filename, data. */
187 if (argc-optind < 2) {
188 rrd_set_error("Not enough arguments");
192 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
193 rc.u_int = _rrd_update(argv[optind], template,
194 argc - optind - 1, argv + optind + 1, result);
195 result->value.u_int = rc.u_int;
201 rrd_update(int argc, char **argv)
203 char *template = NULL;
207 static struct option long_options[] =
209 {"template", required_argument, 0, 't'},
212 int option_index = 0;
214 opt = getopt_long(argc, argv, "t:",
215 long_options, &option_index);
226 rrd_set_error("unknown option '%s'",argv[optind-1]);
231 /* need at least 2 arguments: filename, data. */
232 if (argc-optind < 2) {
233 rrd_set_error("Not enough arguments");
238 rc = rrd_update_r(argv[optind], template,
239 argc - optind - 1, argv + optind + 1);
244 rrd_update_r(char *filename, char *template, int argc, char **argv)
246 return _rrd_update(filename, template, argc, argv, NULL);
250 _rrd_update(char *filename, char *template, int argc, char **argv,
251 info_t *pcdp_summary)
256 unsigned long i,ii,iii=1;
258 unsigned long rra_begin; /* byte pointer to the rra
259 * area in the rrd file. this
260 * pointer never changes value */
261 unsigned long rra_start; /* byte pointer to the rra
262 * area in the rrd file. this
263 * pointer changes as each rrd is
265 unsigned long rra_current; /* byte pointer to the current write
266 * spot in the rrd file. */
267 unsigned long rra_pos_tmp; /* temporary byte pointer. */
269 pre_int,post_int; /* interval between this and
271 unsigned long proc_pdp_st; /* which pdp_st was the last
273 unsigned long occu_pdp_st; /* when was the pdp_st
274 * before the last update
276 unsigned long proc_pdp_age; /* how old was the data in
277 * the pdp prep area when it
278 * was last updated */
279 unsigned long occu_pdp_age; /* how long ago was the last
281 rrd_value_t *pdp_new; /* prepare the incoming data
282 * to be added the the
284 rrd_value_t *pdp_temp; /* prepare the pdp values
285 * to be added the the
288 long *tmpl_idx; /* index representing the settings
289 transported by the template index */
290 unsigned long tmpl_cnt = 2; /* time and data */
295 time_t rra_time; /* time of update for a RRA */
296 unsigned long current_time_usec; /* microseconds part of current time */
297 struct timeval tmp_time; /* used for time conversion */
300 int schedule_smooth = 0;
301 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
302 /* a vector of future Holt-Winters seasonal coefs */
303 unsigned long elapsed_pdp_st;
304 /* number of elapsed PDP steps since last update */
305 unsigned long *rra_step_cnt = NULL;
306 /* number of rows to be updated in an RRA for a data
308 unsigned long start_pdp_offset;
309 /* number of PDP steps since the last update that
310 * are assigned to the first CDP to be generated
311 * since the last update. */
312 unsigned short scratch_idx;
313 /* index into the CDP scratch array */
314 enum cf_en current_cf;
315 /* numeric id of the current consolidation function */
316 rpnstack_t rpnstack; /* used for COMPUTE DS */
317 int version; /* rrd version */
319 rpnstack_init(&rpnstack);
321 /* need at least 1 arguments: data. */
323 rrd_set_error("Not enough arguments");
329 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
332 /* initialize time */
333 version = atoi(rrd.stat_head->version);
334 gettimeofday(&tmp_time, 0);
335 normalize_time(&tmp_time);
336 current_time = tmp_time.tv_sec;
338 current_time_usec = tmp_time.tv_usec;
341 current_time_usec = 0;
344 rra_current = rra_start = rra_begin = ftell(rrd_file);
345 /* This is defined in the ANSI C standard, section 7.9.5.3:
347 When a file is opened with udpate mode ('+' as the second
348 or third character in the ... list of mode argument
349 variables), both input and ouptut may be performed on the
350 associated stream. However, ... input may not be directly
351 followed by output without an intervening call to a file
352 positioning function, unless the input oepration encounters
354 fseek(rrd_file, 0, SEEK_CUR);
357 /* get exclusive lock to whole file.
358 * lock gets removed when we close the file.
360 if (LockRRD(rrd_file) != 0) {
361 rrd_set_error("could not lock RRD");
367 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
368 rrd_set_error("allocating updvals pointer array");
374 if ((pdp_temp = malloc(sizeof(rrd_value_t)
375 *rrd.stat_head->ds_cnt))==NULL){
376 rrd_set_error("allocating pdp_temp ...");
383 if ((tmpl_idx = malloc(sizeof(unsigned long)
384 *(rrd.stat_head->ds_cnt+1)))==NULL){
385 rrd_set_error("allocating tmpl_idx ...");
392 /* initialize template redirector */
393 /* default config example (assume DS 1 is a CDEF DS)
394 tmpl_idx[0] -> 0; (time)
395 tmpl_idx[1] -> 1; (DS 0)
396 tmpl_idx[2] -> 3; (DS 2)
397 tmpl_idx[3] -> 4; (DS 3) */
398 tmpl_idx[0] = 0; /* time */
399 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
401 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
408 unsigned int tmpl_len;
410 tmpl_cnt = 1; /* the first entry is the time */
411 tmpl_len = strlen(template);
412 for(i=0;i<=tmpl_len ;i++) {
413 if (template[i] == ':' || template[i] == '\0') {
415 if (tmpl_cnt>rrd.stat_head->ds_cnt){
416 rrd_set_error("Template contains more DS definitions than RRD");
417 free(updvals); free(pdp_temp);
418 free(tmpl_idx); rrd_free(&rrd);
419 fclose(rrd_file); return(-1);
421 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
422 rrd_set_error("unknown DS name '%s'",dsname);
423 free(updvals); free(pdp_temp);
424 free(tmpl_idx); rrd_free(&rrd);
425 fclose(rrd_file); return(-1);
427 /* the first element is always the time */
428 tmpl_idx[tmpl_cnt-1]++;
429 /* go to the next entry on the template */
430 dsname = &template[i+1];
431 /* fix the damage we did before */
440 if ((pdp_new = malloc(sizeof(rrd_value_t)
441 *rrd.stat_head->ds_cnt))==NULL){
442 rrd_set_error("allocating pdp_new ...");
451 /* loop through the arguments. */
452 for(arg_i=0; arg_i<argc;arg_i++) {
453 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
454 char *step_start = stepper;
456 char *parsetime_error = NULL;
457 enum {atstyle, normal} timesyntax;
458 struct time_value ds_tv;
459 if (stepper == NULL){
460 rrd_set_error("failed duplication argv entry");
468 /* initialize all ds input to unknown except the first one
469 which has always got to be set */
470 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471 strcpy(stepper,argv[arg_i]);
473 /* separate all ds elements; first must be examined separately
474 due to alternate time syntax */
475 if ((p=strchr(stepper,'@'))!=NULL) {
476 timesyntax = atstyle;
479 } else if ((p=strchr(stepper,':'))!=NULL) {
484 rrd_set_error("expected timestamp not found in data source from %s:...",
490 updvals[tmpl_idx[ii]] = stepper;
492 if (*stepper == ':') {
496 updvals[tmpl_idx[ii]] = stepper+1;
502 if (ii != tmpl_cnt-1) {
503 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504 tmpl_cnt-1, ii, argv[arg_i]);
509 /* get the time from the reading ... handle N */
510 if (timesyntax == atstyle) {
511 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
516 if (ds_tv.type == RELATIVE_TO_END_TIME ||
517 ds_tv.type == RELATIVE_TO_START_TIME) {
518 rrd_set_error("specifying time relative to the 'start' "
519 "or 'end' makes no sense here: %s",
525 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
528 } else if (strcmp(updvals[0],"N")==0){
529 gettimeofday(&tmp_time, 0);
530 normalize_time(&tmp_time);
531 current_time = tmp_time.tv_sec;
532 current_time_usec = tmp_time.tv_usec;
535 tmp = strtod(updvals[0], 0);
536 current_time = floor(tmp);
537 current_time_usec = (long)((tmp - current_time) * 1000000L);
539 /* dont do any correction for old version RRDs */
541 current_time_usec = 0;
543 if(current_time <= rrd.live_head->last_up){
544 rrd_set_error("illegal attempt to update using time %ld when "
545 "last update time is %ld (minimum one second step)",
546 current_time, rrd.live_head->last_up);
552 /* seek to the beginning of the rra's */
553 if (rra_current != rra_begin) {
554 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
555 rrd_set_error("seek error in rrd");
559 rra_current = rra_begin;
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
570 /* interval = current_time - rrd.live_head->last_up; */
571 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
573 if (occu_pdp_st > proc_pdp_st){
574 /* OK we passed the pdp_st moment*/
575 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
576 * occurred before the latest
578 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
579 post_int = occu_pdp_age; /* how much after it */
580 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
594 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
595 occu_pdp_age, occu_pdp_st,
596 interval, pre_int, post_int);
599 /* process the data sources and update the pdp_prep
600 * area accordingly */
601 for(i=0;i<rrd.stat_head->ds_cnt;i++){
603 dst_idx= dst_conv(rrd.ds_def[i].dst);
604 /* NOTE: DST_CDEF should never enter this if block, because
605 * updvals[i+1][0] is initialized to 'U'; unless the caller
606 * accidently specified a value for the DST_CDEF. To handle
607 * this case, an extra check is required. */
608 if((updvals[i+1][0] != 'U') &&
609 (dst_idx != DST_CDEF) &&
610 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
612 /* the data source type defines how to process the data */
613 /* pdp_new contains rate * time ... eg the bytes
614 * transferred during the interval. Doing it this way saves
615 * a lot of math operations */
621 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
622 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
623 if(dst_idx == DST_COUNTER) {
624 /* simple overflow catcher sugestet by andres kroonmaa */
625 /* this will fail terribly for non 32 or 64 bit counters ... */
626 /* are there any others in SNMP land ? */
627 if (pdp_new[i] < (double)0.0 )
628 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
629 if (pdp_new[i] < (double)0.0 )
630 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
632 rate = pdp_new[i] / interval;
639 pdp_new[i]= atof(updvals[i+1]);
640 rate = pdp_new[i] / interval;
643 pdp_new[i] = atof(updvals[i+1]) * interval;
644 rate = pdp_new[i] / interval;
647 rrd_set_error("rrd contains unknown DS type : '%s'",
651 /* break out of this for loop if the error string is set */
652 if (rrd_test_error()){
655 /* make sure pdp_temp is neither too large or too small
656 * if any of these occur it becomes unknown ...
658 if ( ! isnan(rate) &&
659 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
660 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
661 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
662 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
666 /* no news is news all the same */
670 /* make a copy of the command line argument for the next run */
678 rrd.pdp_prep[i].last_ds,
679 updvals[i+1], pdp_new[i]);
681 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
682 strncpy(rrd.pdp_prep[i].last_ds,
683 updvals[i+1],LAST_DS_LEN-1);
684 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
687 /* break out of the argument parsing loop if the error_string is set */
688 if (rrd_test_error()){
692 /* has a pdp_st moment occurred since the last run ? */
694 if (proc_pdp_st == occu_pdp_st){
695 /* no we have not passed a pdp_st moment. therefore update is simple */
697 for(i=0;i<rrd.stat_head->ds_cnt;i++){
698 if(isnan(pdp_new[i]))
699 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
701 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
708 rrd.pdp_prep[i].scratch[PDP_val].u_val,
709 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
713 /* an pdp_st has occurred. */
715 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
716 * occurred up to the last run.
717 pdp_new[] contains rate*seconds from the latest run.
718 pdp_temp[] will contain the rate for cdp */
720 for(i=0;i<rrd.stat_head->ds_cnt;i++){
721 /* update pdp_prep to the current pdp_st */
722 if(isnan(pdp_new[i]))
723 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
725 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
726 pdp_new[i]/(double)interval*(double)pre_int;
728 /* if too much of the pdp_prep is unknown we dump it */
729 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
730 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
731 (occu_pdp_st-proc_pdp_st <=
732 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
735 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
736 / (double)( occu_pdp_st
738 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
741 /* process CDEF data sources; remember each CDEF DS can
742 * only reference other DS with a lower index number */
743 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
745 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
746 /* substitue data values for OP_VARIABLE nodes */
747 for (ii = 0; rpnp[ii].op != OP_END; ii++)
749 if (rpnp[ii].op == OP_VARIABLE) {
750 rpnp[ii].op = OP_NUMBER;
751 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
754 /* run the rpn calculator */
755 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
757 break; /* exits the data sources pdp_temp loop */
761 /* make pdp_prep ready for the next run */
762 if(isnan(pdp_new[i])){
763 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
764 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
766 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
767 rrd.pdp_prep[i].scratch[PDP_val].u_val =
768 pdp_new[i]/(double)interval*(double)post_int;
776 "new_unkn_sec %5lu\n",
778 rrd.pdp_prep[i].scratch[PDP_val].u_val,
779 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
783 /* if there were errors during the last loop, bail out here */
784 if (rrd_test_error()){
789 /* compute the number of elapsed pdp_st moments */
790 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
792 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
794 if (rra_step_cnt == NULL)
796 rra_step_cnt = (unsigned long *)
797 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
800 for(i = 0, rra_start = rra_begin;
801 i < rrd.stat_head->rra_cnt;
802 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
805 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
806 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
807 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
808 if (start_pdp_offset <= elapsed_pdp_st) {
809 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
810 rrd.rra_def[i].pdp_cnt + 1;
815 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
817 /* If this is a bulk update, we need to skip ahead in the seasonal
818 * arrays so that they will be correct for the next observed value;
819 * note that for the bulk update itself, no update will occur to
820 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
822 if (rra_step_cnt[i] > 2)
824 /* skip update by resetting rra_step_cnt[i],
825 * note that this is not data source specific; this is due
826 * to the bulk update, not a DNAN value for the specific data
829 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
830 &last_seasonal_coef);
831 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
835 /* periodically run a smoother for seasonal effects */
836 /* Need to use first cdp parameter buffer to track
837 * burnin (burnin requires a specific smoothing schedule).
838 * The CDP_init_seasonal parameter is really an RRA level,
839 * not a data source within RRA level parameter, but the rra_def
840 * is read only for rrd_update (not flushed to disk). */
841 iii = i*(rrd.stat_head -> ds_cnt);
842 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
845 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
846 > rrd.rra_def[i].row_cnt - 1) {
847 /* mark off one of the burnin cycles */
848 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
852 /* someone has no doubt invented a trick to deal with this
853 * wrap around, but at least this code is clear. */
854 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
855 rrd.rra_ptr[i].cur_row)
857 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
858 * mapping between PDP and CDP */
859 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
860 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
864 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
865 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
866 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
871 /* can't rely on negative numbers because we are working with
873 /* Don't need modulus here. If we've wrapped more than once, only
874 * one smooth is executed at the end. */
875 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
876 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
877 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
881 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
882 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
883 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
890 rra_current = ftell(rrd_file);
891 } /* if cf is DEVSEASONAL or SEASONAL */
893 if (rrd_test_error()) break;
895 /* update CDP_PREP areas */
896 /* loop over data soures within each RRA */
898 ii < rrd.stat_head->ds_cnt;
902 /* iii indexes the CDP prep area for this data source within the RRA */
903 iii=i*rrd.stat_head->ds_cnt+ii;
905 if (rrd.rra_def[i].pdp_cnt > 1) {
907 if (rra_step_cnt[i] > 0) {
908 /* If we are in this block, as least 1 CDP value will be written to
909 * disk, this is the CDP_primary_val entry. If more than 1 value needs
910 * to be written, then the "fill in" value is the CDP_secondary_val
912 if (isnan(pdp_temp[ii]))
914 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
915 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
917 /* CDP_secondary value is the RRA "fill in" value for intermediary
918 * CDP data entries. No matter the CF, the value is the same because
919 * the average, max, min, and last of a list of identical values is
920 * the same, namely, the value itself. */
921 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
924 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
925 > rrd.rra_def[i].pdp_cnt*
926 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
928 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
929 /* initialize carry over */
930 if (current_cf == CF_AVERAGE) {
931 if (isnan(pdp_temp[ii])) {
932 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
934 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
935 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
938 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
941 rrd_value_t cum_val, cur_val;
942 switch (current_cf) {
944 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
945 cur_val = IFDNAN(pdp_temp[ii],0.0);
946 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
947 (cum_val + cur_val * start_pdp_offset) /
948 (rrd.rra_def[i].pdp_cnt
949 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
950 /* initialize carry over value */
951 if (isnan(pdp_temp[ii])) {
952 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
954 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
955 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
959 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
960 cur_val = IFDNAN(pdp_temp[ii],-DINF);
962 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
963 isnan(pdp_temp[ii])) {
965 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
970 if (cur_val > cum_val)
971 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
973 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
974 /* initialize carry over value */
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
978 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
979 cur_val = IFDNAN(pdp_temp[ii],DINF);
981 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
982 isnan(pdp_temp[ii])) {
984 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
989 if (cur_val < cum_val)
990 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
992 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
993 /* initialize carry over value */
994 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
998 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
999 /* initialize carry over value */
1000 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1003 } /* endif meets xff value requirement for a valid value */
1004 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1005 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1006 if (isnan(pdp_temp[ii]))
1007 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1008 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1010 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1011 } else /* rra_step_cnt[i] == 0 */
1014 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1015 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1018 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1019 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1022 if (isnan(pdp_temp[ii])) {
1023 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1024 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1026 if (current_cf == CF_AVERAGE) {
1027 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1030 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1033 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1034 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1037 switch (current_cf) {
1039 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1043 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1057 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1058 if (elapsed_pdp_st > 2)
1060 switch (current_cf) {
1063 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1064 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1067 case CF_DEVSEASONAL:
1068 /* need to update cached seasonal values, so they are consistent
1069 * with the bulk update */
1070 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1071 * CDP_last_deviation are the same. */
1072 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1073 last_seasonal_coef[ii];
1074 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1078 /* need to update the null_count and last_null_count.
1079 * even do this for non-DNAN pdp_temp because the
1080 * algorithm is not learning from batch updates. */
1081 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1083 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1087 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1088 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1091 /* do not count missed bulk values as failures */
1092 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1093 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1094 /* need to reset violations buffer.
1095 * could do this more carefully, but for now, just
1096 * assume a bulk update wipes away all violations. */
1097 erase_violations(&rrd, iii, i);
1101 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1103 if (rrd_test_error()) break;
1105 } /* endif data sources loop */
1106 } /* end RRA Loop */
1108 /* this loop is only entered if elapsed_pdp_st < 3 */
1109 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1110 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1112 for(i = 0, rra_start = rra_begin;
1113 i < rrd.stat_head->rra_cnt;
1114 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1117 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1119 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1120 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1122 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1123 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1125 rra_current = ftell(rrd_file);
1127 if (rrd_test_error()) break;
1128 /* loop over data soures within each RRA */
1130 ii < rrd.stat_head->ds_cnt;
1133 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1134 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1135 scratch_idx, seasonal_coef);
1137 } /* end RRA Loop */
1138 if (rrd_test_error()) break;
1139 } /* end elapsed_pdp_st loop */
1141 if (rrd_test_error()) break;
1143 /* Ready to write to disk */
1144 /* Move sequentially through the file, writing one RRA at a time.
1145 * Note this architecture divorces the computation of CDP with
1146 * flushing updated RRA entries to disk. */
1147 for(i = 0, rra_start = rra_begin;
1148 i < rrd.stat_head->rra_cnt;
1149 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1151 /* is there anything to write for this RRA? If not, continue. */
1152 if (rra_step_cnt[i] == 0) continue;
1154 /* write the first row */
1156 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1158 rrd.rra_ptr[i].cur_row++;
1159 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1160 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1161 /* positition on the first row */
1162 rra_pos_tmp = rra_start +
1163 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1164 if(rra_pos_tmp != rra_current) {
1165 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1166 rrd_set_error("seek error in rrd");
1169 rra_current = rra_pos_tmp;
1173 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1175 scratch_idx = CDP_primary_val;
1176 if (pcdp_summary != NULL)
1178 rra_time = (current_time - current_time
1179 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1180 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1182 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1183 pcdp_summary, &rra_time);
1184 if (rrd_test_error()) break;
1186 /* write other rows of the bulk update, if any */
1187 scratch_idx = CDP_secondary_val;
1188 for ( ; rra_step_cnt[i] > 1;
1189 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1191 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1194 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1195 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1198 rrd.rra_ptr[i].cur_row = 0;
1199 /* seek back to beginning of current rra */
1200 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1202 rrd_set_error("seek error in rrd");
1206 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1208 rra_current = rra_start;
1210 if (pcdp_summary != NULL)
1212 rra_time = (current_time - current_time
1213 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1214 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1216 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1217 pcdp_summary, &rra_time);
1220 if (rrd_test_error())
1224 /* break out of the argument parsing loop if error_string is set */
1225 if (rrd_test_error()){
1230 } /* endif a pdp_st has occurred */
1231 rrd.live_head->last_up = current_time;
1232 rrd.live_head->last_up_usec = current_time_usec;
1234 } /* function argument loop */
1236 if (seasonal_coef != NULL) free(seasonal_coef);
1237 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1238 if (rra_step_cnt != NULL) free(rra_step_cnt);
1239 rpnstack_free(&rpnstack);
1241 /* if we got here and if there is an error and if the file has not been
1242 * written to, then close things up and return. */
1243 if (rrd_test_error()) {
1253 /* aargh ... that was tough ... so many loops ... anyway, its done.
1254 * we just need to write back the live header portion now*/
1256 if (fseek(rrd_file, (sizeof(stat_head_t)
1257 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1258 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1260 rrd_set_error("seek rrd for live header writeback");
1271 if(fwrite( rrd.live_head,
1272 sizeof(live_head_t), 1, rrd_file) != 1){
1273 rrd_set_error("fwrite live_head to rrd");
1284 if(fwrite( &rrd.live_head->last_up,
1285 sizeof(time_t), 1, rrd_file) != 1){
1286 rrd_set_error("fwrite live_head to rrd");
1298 if(fwrite( rrd.pdp_prep,
1300 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1301 rrd_set_error("ftwrite pdp_prep to rrd");
1311 if(fwrite( rrd.cdp_prep,
1313 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1314 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1316 rrd_set_error("ftwrite cdp_prep to rrd");
1326 if(fwrite( rrd.rra_ptr,
1328 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1329 rrd_set_error("fwrite rra_ptr to rrd");
1339 /* OK now close the files and free the memory */
1340 if(fclose(rrd_file) != 0){
1341 rrd_set_error("closing rrd");
1350 /* calling the smoothing code here guarantees at most
1351 * one smoothing operation per rrd_update call. Unfortunately,
1352 * it is possible with bulk updates, or a long-delayed update
1353 * for smoothing to occur off-schedule. This really isn't
1354 * critical except during the burning cycles. */
1355 if (schedule_smooth)
1358 rrd_file = fopen(filename,"r+");
1360 rrd_file = fopen(filename,"rb+");
1362 rra_start = rra_begin;
1363 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1365 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1366 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1369 fprintf(stderr,"Running smoother for rra %ld\n",i);
1371 apply_smoother(&rrd,i,rra_start,rrd_file);
1372 if (rrd_test_error())
1375 rra_start += rrd.rra_def[i].row_cnt
1376 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1389 * get exclusive lock to whole file.
1390 * lock gets removed when we close the file
1392 * returns 0 on success
1395 LockRRD(FILE *rrdfile)
1397 int rrd_fd; /* File descriptor for RRD */
1400 rrd_fd = fileno(rrdfile);
1405 lock.l_type = F_WRLCK; /* exclusive write lock */
1406 lock.l_len = 0; /* whole file */
1407 lock.l_start = 0; /* start of file */
1408 lock.l_whence = SEEK_SET; /* end of file */
1410 stat = fcntl(rrd_fd, F_SETLK, &lock);
1414 if ( _fstat( rrd_fd, &st ) == 0 ) {
1415 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1427 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1428 unsigned short CDP_scratch_idx, FILE *rrd_file,
1429 info_t *pcdp_summary, time_t *rra_time)
1431 unsigned long ds_idx, cdp_idx;
1434 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1436 /* compute the cdp index */
1437 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1439 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1440 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1441 rrd -> rra_def[rra_idx].cf_nam);
1443 if (pcdp_summary != NULL)
1445 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1446 /* append info to the return hash */
1447 pcdp_summary = info_push(pcdp_summary,
1448 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1449 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1450 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1453 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1454 sizeof(rrd_value_t),1,rrd_file) != 1)
1456 rrd_set_error("writing rrd");
1459 *rra_current += sizeof(rrd_value_t);
1461 return (pcdp_summary);