1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.11 2003/09/02 21:58:35 oetiker
9 * be pickier about what we accept in rrd_update. Complain if things do not work out
11 * Revision 1.10 2003/04/29 19:14:12 jake
12 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
13 * Also revert accidental addition of -I to aclocal MakeMakefile.
15 * Revision 1.9 2003/04/25 18:35:08 jake
16 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
18 * Revision 1.8 2003/03/31 21:22:12 oetiker
19 * enables RRDtool updates with microsecond or in case of windows millisecond
20 * precision. This is needed to reduce time measurement error when archive step
21 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
23 * Revision 1.7 2003/02/13 07:05:27 oetiker
24 * Find attached the patch I promised to send to you. Please note that there
25 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
26 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
27 * library is identical to librrd, but it contains support code for per-thread
28 * global variables currently used for error information only. This is similar
29 * to how errno per-thread variables are implemented. librrd_th must be linked
30 * alongside of libpthred
32 * There is also a new file "THREADS", holding some documentation.
34 * -- Peter Stamfest <peter@stamfest.at>
36 * Revision 1.6 2002/02/01 20:34:49 oetiker
37 * fixed version number and date/time
39 * Revision 1.5 2001/05/09 05:31:01 oetiker
40 * Bug fix: when update of multiple PDP/CDP RRAs coincided
41 * with interpolation of multiple PDPs an incorrect value was
42 * stored as the CDP. Especially evident for GAUGE data sources.
43 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
45 * Revision 1.4 2001/03/10 23:54:41 oetiker
46 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
47 * parser and calculator from rrd_graph and puts then in a new file,
48 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
49 * clean-up of aberrant behavior stuff, including a bug fix.
50 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
51 * -- Jake Brutlag <jakeb@corp.webtv.net>
53 * Revision 1.3 2001/03/04 13:01:55 oetiker
54 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
55 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
56 * This is backwards compatible! But new files using the Aberrant stuff are not readable
57 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
58 * -- Jake Brutlag <jakeb@corp.webtv.net>
60 * Revision 1.2 2001/03/04 11:14:25 oetiker
61 * added at-style-time@value:value syntax to rrd_update
62 * -- Dave Bodenstab <imdave@mcs.net>
64 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
67 *****************************************************************************/
70 #include <sys/types.h>
74 #include <sys/locking.h>
80 #include "rrd_rpncalc.h"
82 #include "rrd_is_thread_safe.h"
86 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
89 #include <sys/timeb.h>
92 time_t tv_sec; /* seconds */
93 long tv_usec; /* microseconds */
97 int tz_minuteswest; /* minutes W of Greenwich */
98 int tz_dsttime; /* type of dst correction */
101 static gettimeofday(struct timeval *t, struct __timezone *tz) {
103 struct timeb current_time;
105 _ftime(¤t_time);
107 t->tv_sec = current_time.time;
108 t->tv_usec = current_time.millitm * 1000;
113 * normilize time as returned by gettimeofday. usec part must
116 static void normalize_time(struct timeval *t)
120 t->tv_usec += 1000000L;
124 /* Local prototypes */
125 int LockRRD(FILE *rrd_file);
126 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
127 unsigned long *rra_current,
128 unsigned short CDP_scratch_idx, FILE *rrd_file,
129 info_t *pcdp_summary, time_t *rra_time);
130 int rrd_update_r(char *filename, char *template, int argc, char **argv);
131 int _rrd_update(char *filename, char *template, int argc, char **argv,
134 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
139 main(int argc, char **argv){
140 rrd_update(argc,argv);
141 if (rrd_test_error()) {
142 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
143 "Usage: rrdupdate filename\n"
144 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
145 "\t\t\ttime|N:value[:value...]\n\n"
146 "\t\t\tat-time@value[:value...]\n\n"
147 "\t\t\t[ time:value[:value...] ..]\n\n");
149 printf("ERROR: %s\n",rrd_get_error());
157 info_t *rrd_update_v(int argc, char **argv)
159 char *template = NULL;
160 info_t *result = NULL;
164 static struct option long_options[] =
166 {"template", required_argument, 0, 't'},
169 int option_index = 0;
171 opt = getopt_long(argc, argv, "t:",
172 long_options, &option_index);
183 rrd_set_error("unknown option '%s'",argv[optind-1]);
189 /* need at least 2 arguments: filename, data. */
190 if (argc-optind < 2) {
191 rrd_set_error("Not enough arguments");
195 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
196 rc.u_int = _rrd_update(argv[optind], template,
197 argc - optind - 1, argv + optind + 1, result);
198 result->value.u_int = rc.u_int;
204 rrd_update(int argc, char **argv)
206 char *template = NULL;
210 static struct option long_options[] =
212 {"template", required_argument, 0, 't'},
215 int option_index = 0;
217 opt = getopt_long(argc, argv, "t:",
218 long_options, &option_index);
229 rrd_set_error("unknown option '%s'",argv[optind-1]);
234 /* need at least 2 arguments: filename, data. */
235 if (argc-optind < 2) {
236 rrd_set_error("Not enough arguments");
241 rc = rrd_update_r(argv[optind], template,
242 argc - optind - 1, argv + optind + 1);
247 rrd_update_r(char *filename, char *template, int argc, char **argv)
249 return _rrd_update(filename, template, argc, argv, NULL);
253 _rrd_update(char *filename, char *template, int argc, char **argv,
254 info_t *pcdp_summary)
259 unsigned long i,ii,iii=1;
261 unsigned long rra_begin; /* byte pointer to the rra
262 * area in the rrd file. this
263 * pointer never changes value */
264 unsigned long rra_start; /* byte pointer to the rra
265 * area in the rrd file. this
266 * pointer changes as each rrd is
268 unsigned long rra_current; /* byte pointer to the current write
269 * spot in the rrd file. */
270 unsigned long rra_pos_tmp; /* temporary byte pointer. */
272 pre_int,post_int; /* interval between this and
274 unsigned long proc_pdp_st; /* which pdp_st was the last
276 unsigned long occu_pdp_st; /* when was the pdp_st
277 * before the last update
279 unsigned long proc_pdp_age; /* how old was the data in
280 * the pdp prep area when it
281 * was last updated */
282 unsigned long occu_pdp_age; /* how long ago was the last
284 rrd_value_t *pdp_new; /* prepare the incoming data
285 * to be added the the
287 rrd_value_t *pdp_temp; /* prepare the pdp values
288 * to be added the the
291 long *tmpl_idx; /* index representing the settings
292 transported by the template index */
293 unsigned long tmpl_cnt = 2; /* time and data */
298 time_t rra_time; /* time of update for a RRA */
299 unsigned long current_time_usec; /* microseconds part of current time */
300 struct timeval tmp_time; /* used for time conversion */
303 int schedule_smooth = 0;
304 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
305 /* a vector of future Holt-Winters seasonal coefs */
306 unsigned long elapsed_pdp_st;
307 /* number of elapsed PDP steps since last update */
308 unsigned long *rra_step_cnt = NULL;
309 /* number of rows to be updated in an RRA for a data
311 unsigned long start_pdp_offset;
312 /* number of PDP steps since the last update that
313 * are assigned to the first CDP to be generated
314 * since the last update. */
315 unsigned short scratch_idx;
316 /* index into the CDP scratch array */
317 enum cf_en current_cf;
318 /* numeric id of the current consolidation function */
319 rpnstack_t rpnstack; /* used for COMPUTE DS */
320 int version; /* rrd version */
321 char *endptr; /* used in the conversion */
323 rpnstack_init(&rpnstack);
325 /* need at least 1 arguments: data. */
327 rrd_set_error("Not enough arguments");
333 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
336 /* initialize time */
337 version = atoi(rrd.stat_head->version);
338 gettimeofday(&tmp_time, 0);
339 normalize_time(&tmp_time);
340 current_time = tmp_time.tv_sec;
342 current_time_usec = tmp_time.tv_usec;
345 current_time_usec = 0;
348 rra_current = rra_start = rra_begin = ftell(rrd_file);
349 /* This is defined in the ANSI C standard, section 7.9.5.3:
351 When a file is opened with udpate mode ('+' as the second
352 or third character in the ... list of mode argument
353 variables), both input and ouptut may be performed on the
354 associated stream. However, ... input may not be directly
355 followed by output without an intervening call to a file
356 positioning function, unless the input oepration encounters
358 fseek(rrd_file, 0, SEEK_CUR);
361 /* get exclusive lock to whole file.
362 * lock gets removed when we close the file.
364 if (LockRRD(rrd_file) != 0) {
365 rrd_set_error("could not lock RRD");
371 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
372 rrd_set_error("allocating updvals pointer array");
378 if ((pdp_temp = malloc(sizeof(rrd_value_t)
379 *rrd.stat_head->ds_cnt))==NULL){
380 rrd_set_error("allocating pdp_temp ...");
387 if ((tmpl_idx = malloc(sizeof(unsigned long)
388 *(rrd.stat_head->ds_cnt+1)))==NULL){
389 rrd_set_error("allocating tmpl_idx ...");
396 /* initialize template redirector */
397 /* default config example (assume DS 1 is a CDEF DS)
398 tmpl_idx[0] -> 0; (time)
399 tmpl_idx[1] -> 1; (DS 0)
400 tmpl_idx[2] -> 3; (DS 2)
401 tmpl_idx[3] -> 4; (DS 3) */
402 tmpl_idx[0] = 0; /* time */
403 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
405 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
412 unsigned int tmpl_len;
414 tmpl_cnt = 1; /* the first entry is the time */
415 tmpl_len = strlen(template);
416 for(i=0;i<=tmpl_len ;i++) {
417 if (template[i] == ':' || template[i] == '\0') {
419 if (tmpl_cnt>rrd.stat_head->ds_cnt){
420 rrd_set_error("Template contains more DS definitions than RRD");
421 free(updvals); free(pdp_temp);
422 free(tmpl_idx); rrd_free(&rrd);
423 fclose(rrd_file); return(-1);
425 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
426 rrd_set_error("unknown DS name '%s'",dsname);
427 free(updvals); free(pdp_temp);
428 free(tmpl_idx); rrd_free(&rrd);
429 fclose(rrd_file); return(-1);
431 /* the first element is always the time */
432 tmpl_idx[tmpl_cnt-1]++;
433 /* go to the next entry on the template */
434 dsname = &template[i+1];
435 /* fix the damage we did before */
444 if ((pdp_new = malloc(sizeof(rrd_value_t)
445 *rrd.stat_head->ds_cnt))==NULL){
446 rrd_set_error("allocating pdp_new ...");
455 /* loop through the arguments. */
456 for(arg_i=0; arg_i<argc;arg_i++) {
457 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
458 char *step_start = stepper;
460 char *parsetime_error = NULL;
461 enum {atstyle, normal} timesyntax;
462 struct time_value ds_tv;
463 if (stepper == NULL){
464 rrd_set_error("failed duplication argv entry");
472 /* initialize all ds input to unknown except the first one
473 which has always got to be set */
474 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
475 strcpy(stepper,argv[arg_i]);
477 /* separate all ds elements; first must be examined separately
478 due to alternate time syntax */
479 if ((p=strchr(stepper,'@'))!=NULL) {
480 timesyntax = atstyle;
483 } else if ((p=strchr(stepper,':'))!=NULL) {
488 rrd_set_error("expected timestamp not found in data source from %s:...",
494 updvals[tmpl_idx[ii]] = stepper;
496 if (*stepper == ':') {
500 updvals[tmpl_idx[ii]] = stepper+1;
506 if (ii != tmpl_cnt-1) {
507 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
508 tmpl_cnt-1, ii, argv[arg_i]);
513 /* get the time from the reading ... handle N */
514 if (timesyntax == atstyle) {
515 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
516 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
520 if (ds_tv.type == RELATIVE_TO_END_TIME ||
521 ds_tv.type == RELATIVE_TO_START_TIME) {
522 rrd_set_error("specifying time relative to the 'start' "
523 "or 'end' makes no sense here: %s",
529 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
530 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
532 } else if (strcmp(updvals[0],"N")==0){
533 gettimeofday(&tmp_time, 0);
534 normalize_time(&tmp_time);
535 current_time = tmp_time.tv_sec;
536 current_time_usec = tmp_time.tv_usec;
539 tmp = strtod(updvals[0], 0);
540 current_time = floor(tmp);
541 current_time_usec = (long)((tmp - current_time) * 1000000L);
543 /* dont do any correction for old version RRDs */
545 current_time_usec = 0;
547 if(current_time <= rrd.live_head->last_up){
548 rrd_set_error("illegal attempt to update using time %ld when "
549 "last update time is %ld (minimum one second step)",
550 current_time, rrd.live_head->last_up);
556 /* seek to the beginning of the rra's */
557 if (rra_current != rra_begin) {
558 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
559 rrd_set_error("seek error in rrd");
563 rra_current = rra_begin;
565 rra_start = rra_begin;
567 /* when was the current pdp started */
568 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
571 /* when did the last pdp_st occur */
572 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573 occu_pdp_st = current_time - occu_pdp_age;
574 /* interval = current_time - rrd.live_head->last_up; */
575 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
577 if (occu_pdp_st > proc_pdp_st){
578 /* OK we passed the pdp_st moment*/
579 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
580 * occurred before the latest
582 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
583 post_int = occu_pdp_age; /* how much after it */
584 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
598 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
599 occu_pdp_age, occu_pdp_st,
600 interval, pre_int, post_int);
603 /* process the data sources and update the pdp_prep
604 * area accordingly */
605 for(i=0;i<rrd.stat_head->ds_cnt;i++){
607 dst_idx= dst_conv(rrd.ds_def[i].dst);
608 /* NOTE: DST_CDEF should never enter this if block, because
609 * updvals[i+1][0] is initialized to 'U'; unless the caller
610 * accidently specified a value for the DST_CDEF. To handle
611 * this case, an extra check is required. */
612 if((updvals[i+1][0] != 'U') &&
613 (dst_idx != DST_CDEF) &&
614 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
616 /* the data source type defines how to process the data */
617 /* pdp_new contains rate * time ... eg the bytes
618 * transferred during the interval. Doing it this way saves
619 * a lot of math operations */
625 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
626 for(ii=0;updvals[i+1][ii] != '\0';ii++){
627 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii=0 && updvals[i+1][ii] == '-')){
628 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
632 if (rrd_test_error()){
635 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636 if(dst_idx == DST_COUNTER) {
637 /* simple overflow catcher sugestet by andres kroonmaa */
638 /* this will fail terribly for non 32 or 64 bit counters ... */
639 /* are there any others in SNMP land ? */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
642 if (pdp_new[i] < (double)0.0 )
643 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
645 rate = pdp_new[i] / interval;
653 pdp_new[i] = strtod(updvals[i+1],&endptr);
655 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
658 if (endptr[0] != '\0'){
659 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
662 rate = pdp_new[i] / interval;
666 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
668 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
671 if (endptr[0] != '\0'){
672 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
675 rate = pdp_new[i] / interval;
678 rrd_set_error("rrd contains unknown DS type : '%s'",
682 /* break out of this for loop if the error string is set */
683 if (rrd_test_error()){
686 /* make sure pdp_temp is neither too large or too small
687 * if any of these occur it becomes unknown ...
689 if ( ! isnan(rate) &&
690 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
692 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
697 /* no news is news all the same */
701 /* make a copy of the command line argument for the next run */
709 rrd.pdp_prep[i].last_ds,
710 updvals[i+1], pdp_new[i]);
712 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
713 strncpy(rrd.pdp_prep[i].last_ds,
714 updvals[i+1],LAST_DS_LEN-1);
715 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
718 /* break out of the argument parsing loop if the error_string is set */
719 if (rrd_test_error()){
723 /* has a pdp_st moment occurred since the last run ? */
725 if (proc_pdp_st == occu_pdp_st){
726 /* no we have not passed a pdp_st moment. therefore update is simple */
728 for(i=0;i<rrd.stat_head->ds_cnt;i++){
729 if(isnan(pdp_new[i]))
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
732 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
739 rrd.pdp_prep[i].scratch[PDP_val].u_val,
740 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
744 /* an pdp_st has occurred. */
746 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
747 * occurred up to the last run.
748 pdp_new[] contains rate*seconds from the latest run.
749 pdp_temp[] will contain the rate for cdp */
751 for(i=0;i<rrd.stat_head->ds_cnt;i++){
752 /* update pdp_prep to the current pdp_st */
753 if(isnan(pdp_new[i]))
754 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
756 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
757 pdp_new[i]/(double)interval*(double)pre_int;
759 /* if too much of the pdp_prep is unknown we dump it */
760 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
761 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
762 (occu_pdp_st-proc_pdp_st <=
763 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
766 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
767 / (double)( occu_pdp_st
769 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
772 /* process CDEF data sources; remember each CDEF DS can
773 * only reference other DS with a lower index number */
774 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
776 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
777 /* substitue data values for OP_VARIABLE nodes */
778 for (ii = 0; rpnp[ii].op != OP_END; ii++)
780 if (rpnp[ii].op == OP_VARIABLE) {
781 rpnp[ii].op = OP_NUMBER;
782 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
785 /* run the rpn calculator */
786 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
788 break; /* exits the data sources pdp_temp loop */
792 /* make pdp_prep ready for the next run */
793 if(isnan(pdp_new[i])){
794 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
795 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
797 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
798 rrd.pdp_prep[i].scratch[PDP_val].u_val =
799 pdp_new[i]/(double)interval*(double)post_int;
807 "new_unkn_sec %5lu\n",
809 rrd.pdp_prep[i].scratch[PDP_val].u_val,
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
814 /* if there were errors during the last loop, bail out here */
815 if (rrd_test_error()){
820 /* compute the number of elapsed pdp_st moments */
821 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
823 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
825 if (rra_step_cnt == NULL)
827 rra_step_cnt = (unsigned long *)
828 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
831 for(i = 0, rra_start = rra_begin;
832 i < rrd.stat_head->rra_cnt;
833 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
836 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
837 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
838 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
839 if (start_pdp_offset <= elapsed_pdp_st) {
840 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
841 rrd.rra_def[i].pdp_cnt + 1;
846 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
848 /* If this is a bulk update, we need to skip ahead in the seasonal
849 * arrays so that they will be correct for the next observed value;
850 * note that for the bulk update itself, no update will occur to
851 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
853 if (rra_step_cnt[i] > 2)
855 /* skip update by resetting rra_step_cnt[i],
856 * note that this is not data source specific; this is due
857 * to the bulk update, not a DNAN value for the specific data
860 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
861 &last_seasonal_coef);
862 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
866 /* periodically run a smoother for seasonal effects */
867 /* Need to use first cdp parameter buffer to track
868 * burnin (burnin requires a specific smoothing schedule).
869 * The CDP_init_seasonal parameter is really an RRA level,
870 * not a data source within RRA level parameter, but the rra_def
871 * is read only for rrd_update (not flushed to disk). */
872 iii = i*(rrd.stat_head -> ds_cnt);
873 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
876 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
877 > rrd.rra_def[i].row_cnt - 1) {
878 /* mark off one of the burnin cycles */
879 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
883 /* someone has no doubt invented a trick to deal with this
884 * wrap around, but at least this code is clear. */
885 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
886 rrd.rra_ptr[i].cur_row)
888 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
889 * mapping between PDP and CDP */
890 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
891 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
895 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
896 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
897 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
902 /* can't rely on negative numbers because we are working with
904 /* Don't need modulus here. If we've wrapped more than once, only
905 * one smooth is executed at the end. */
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
907 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
908 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
912 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
913 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
914 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
921 rra_current = ftell(rrd_file);
922 } /* if cf is DEVSEASONAL or SEASONAL */
924 if (rrd_test_error()) break;
926 /* update CDP_PREP areas */
927 /* loop over data soures within each RRA */
929 ii < rrd.stat_head->ds_cnt;
933 /* iii indexes the CDP prep area for this data source within the RRA */
934 iii=i*rrd.stat_head->ds_cnt+ii;
936 if (rrd.rra_def[i].pdp_cnt > 1) {
938 if (rra_step_cnt[i] > 0) {
939 /* If we are in this block, as least 1 CDP value will be written to
940 * disk, this is the CDP_primary_val entry. If more than 1 value needs
941 * to be written, then the "fill in" value is the CDP_secondary_val
943 if (isnan(pdp_temp[ii]))
945 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
946 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
948 /* CDP_secondary value is the RRA "fill in" value for intermediary
949 * CDP data entries. No matter the CF, the value is the same because
950 * the average, max, min, and last of a list of identical values is
951 * the same, namely, the value itself. */
952 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
955 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
956 > rrd.rra_def[i].pdp_cnt*
957 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
959 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
960 /* initialize carry over */
961 if (current_cf == CF_AVERAGE) {
962 if (isnan(pdp_temp[ii])) {
963 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
965 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
966 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
972 rrd_value_t cum_val, cur_val;
973 switch (current_cf) {
975 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
976 cur_val = IFDNAN(pdp_temp[ii],0.0);
977 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
978 (cum_val + cur_val * start_pdp_offset) /
979 (rrd.rra_def[i].pdp_cnt
980 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
981 /* initialize carry over value */
982 if (isnan(pdp_temp[ii])) {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
990 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
991 cur_val = IFDNAN(pdp_temp[ii],-DINF);
993 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
994 isnan(pdp_temp[ii])) {
996 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1001 if (cur_val > cum_val)
1002 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1004 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1005 /* initialize carry over value */
1006 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1009 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1010 cur_val = IFDNAN(pdp_temp[ii],DINF);
1012 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1013 isnan(pdp_temp[ii])) {
1015 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1020 if (cur_val < cum_val)
1021 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1024 /* initialize carry over value */
1025 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1029 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1030 /* initialize carry over value */
1031 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1034 } /* endif meets xff value requirement for a valid value */
1035 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1036 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1037 if (isnan(pdp_temp[ii]))
1038 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1039 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1041 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1042 } else /* rra_step_cnt[i] == 0 */
1045 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1046 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1049 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1050 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1053 if (isnan(pdp_temp[ii])) {
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1055 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1057 if (current_cf == CF_AVERAGE) {
1058 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1064 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1065 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1068 switch (current_cf) {
1070 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1074 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1075 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1088 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1089 if (elapsed_pdp_st > 2)
1091 switch (current_cf) {
1094 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1095 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1098 case CF_DEVSEASONAL:
1099 /* need to update cached seasonal values, so they are consistent
1100 * with the bulk update */
1101 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1102 * CDP_last_deviation are the same. */
1103 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1104 last_seasonal_coef[ii];
1105 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1109 /* need to update the null_count and last_null_count.
1110 * even do this for non-DNAN pdp_temp because the
1111 * algorithm is not learning from batch updates. */
1112 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1114 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1118 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1119 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1122 /* do not count missed bulk values as failures */
1123 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1124 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1125 /* need to reset violations buffer.
1126 * could do this more carefully, but for now, just
1127 * assume a bulk update wipes away all violations. */
1128 erase_violations(&rrd, iii, i);
1132 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1134 if (rrd_test_error()) break;
1136 } /* endif data sources loop */
1137 } /* end RRA Loop */
1139 /* this loop is only entered if elapsed_pdp_st < 3 */
1140 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1141 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1143 for(i = 0, rra_start = rra_begin;
1144 i < rrd.stat_head->rra_cnt;
1145 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1148 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1150 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1151 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1153 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1154 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1156 rra_current = ftell(rrd_file);
1158 if (rrd_test_error()) break;
1159 /* loop over data soures within each RRA */
1161 ii < rrd.stat_head->ds_cnt;
1164 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1165 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1166 scratch_idx, seasonal_coef);
1168 } /* end RRA Loop */
1169 if (rrd_test_error()) break;
1170 } /* end elapsed_pdp_st loop */
1172 if (rrd_test_error()) break;
1174 /* Ready to write to disk */
1175 /* Move sequentially through the file, writing one RRA at a time.
1176 * Note this architecture divorces the computation of CDP with
1177 * flushing updated RRA entries to disk. */
1178 for(i = 0, rra_start = rra_begin;
1179 i < rrd.stat_head->rra_cnt;
1180 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1182 /* is there anything to write for this RRA? If not, continue. */
1183 if (rra_step_cnt[i] == 0) continue;
1185 /* write the first row */
1187 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1189 rrd.rra_ptr[i].cur_row++;
1190 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1191 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1192 /* positition on the first row */
1193 rra_pos_tmp = rra_start +
1194 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1195 if(rra_pos_tmp != rra_current) {
1196 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1197 rrd_set_error("seek error in rrd");
1200 rra_current = rra_pos_tmp;
1204 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1206 scratch_idx = CDP_primary_val;
1207 if (pcdp_summary != NULL)
1209 rra_time = (current_time - current_time
1210 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1213 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1214 pcdp_summary, &rra_time);
1215 if (rrd_test_error()) break;
1217 /* write other rows of the bulk update, if any */
1218 scratch_idx = CDP_secondary_val;
1219 for ( ; rra_step_cnt[i] > 1;
1220 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1222 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1225 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1226 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1229 rrd.rra_ptr[i].cur_row = 0;
1230 /* seek back to beginning of current rra */
1231 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1233 rrd_set_error("seek error in rrd");
1237 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1239 rra_current = rra_start;
1241 if (pcdp_summary != NULL)
1243 rra_time = (current_time - current_time
1244 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1245 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1247 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248 pcdp_summary, &rra_time);
1251 if (rrd_test_error())
1255 /* break out of the argument parsing loop if error_string is set */
1256 if (rrd_test_error()){
1261 } /* endif a pdp_st has occurred */
1262 rrd.live_head->last_up = current_time;
1263 rrd.live_head->last_up_usec = current_time_usec;
1265 } /* function argument loop */
1267 if (seasonal_coef != NULL) free(seasonal_coef);
1268 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1269 if (rra_step_cnt != NULL) free(rra_step_cnt);
1270 rpnstack_free(&rpnstack);
1272 /* if we got here and if there is an error and if the file has not been
1273 * written to, then close things up and return. */
1274 if (rrd_test_error()) {
1284 /* aargh ... that was tough ... so many loops ... anyway, its done.
1285 * we just need to write back the live header portion now*/
1287 if (fseek(rrd_file, (sizeof(stat_head_t)
1288 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1289 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1291 rrd_set_error("seek rrd for live header writeback");
1302 if(fwrite( rrd.live_head,
1303 sizeof(live_head_t), 1, rrd_file) != 1){
1304 rrd_set_error("fwrite live_head to rrd");
1315 if(fwrite( &rrd.live_head->last_up,
1316 sizeof(time_t), 1, rrd_file) != 1){
1317 rrd_set_error("fwrite live_head to rrd");
1329 if(fwrite( rrd.pdp_prep,
1331 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332 rrd_set_error("ftwrite pdp_prep to rrd");
1342 if(fwrite( rrd.cdp_prep,
1344 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1345 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1347 rrd_set_error("ftwrite cdp_prep to rrd");
1357 if(fwrite( rrd.rra_ptr,
1359 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360 rrd_set_error("fwrite rra_ptr to rrd");
1370 /* OK now close the files and free the memory */
1371 if(fclose(rrd_file) != 0){
1372 rrd_set_error("closing rrd");
1381 /* calling the smoothing code here guarantees at most
1382 * one smoothing operation per rrd_update call. Unfortunately,
1383 * it is possible with bulk updates, or a long-delayed update
1384 * for smoothing to occur off-schedule. This really isn't
1385 * critical except during the burning cycles. */
1386 if (schedule_smooth)
1389 rrd_file = fopen(filename,"r+");
1391 rrd_file = fopen(filename,"rb+");
1393 rra_start = rra_begin;
1394 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1396 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1397 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1400 fprintf(stderr,"Running smoother for rra %ld\n",i);
1402 apply_smoother(&rrd,i,rra_start,rrd_file);
1403 if (rrd_test_error())
1406 rra_start += rrd.rra_def[i].row_cnt
1407 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1420 * get exclusive lock to whole file.
1421 * lock gets removed when we close the file
1423 * returns 0 on success
1426 LockRRD(FILE *rrdfile)
1428 int rrd_fd; /* File descriptor for RRD */
1431 rrd_fd = fileno(rrdfile);
1436 lock.l_type = F_WRLCK; /* exclusive write lock */
1437 lock.l_len = 0; /* whole file */
1438 lock.l_start = 0; /* start of file */
1439 lock.l_whence = SEEK_SET; /* end of file */
1441 stat = fcntl(rrd_fd, F_SETLK, &lock);
1445 if ( _fstat( rrd_fd, &st ) == 0 ) {
1446 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1458 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1459 unsigned short CDP_scratch_idx, FILE *rrd_file,
1460 info_t *pcdp_summary, time_t *rra_time)
1462 unsigned long ds_idx, cdp_idx;
1465 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1467 /* compute the cdp index */
1468 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1470 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1471 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1472 rrd -> rra_def[rra_idx].cf_nam);
1474 if (pcdp_summary != NULL)
1476 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1477 /* append info to the return hash */
1478 pcdp_summary = info_push(pcdp_summary,
1479 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1480 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1481 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1484 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1485 sizeof(rrd_value_t),1,rrd_file) != 1)
1487 rrd_set_error("writing rrd");
1490 *rra_current += sizeof(rrd_value_t);
1492 return (pcdp_summary);