1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2004
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.15 2004/05/25 20:51:49 oetiker
9 * Update displayed copyright messages to be consistent. -- Mike Slifcak
11 * Revision 1.14 2003/11/11 19:46:21 oetiker
12 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
14 * Revision 1.13 2003/11/11 19:38:03 oetiker
15 * rrd files should NOT change size ever ... bulk update code wa buggy.
16 * -- David M. Grimes <dgrimes@navisite.com>
18 * Revision 1.12 2003/09/04 13:16:12 oetiker
19 * should not assigne but compare ... grrrrr
21 * Revision 1.11 2003/09/02 21:58:35 oetiker
22 * be pickier about what we accept in rrd_update. Complain if things do not work out
24 * Revision 1.10 2003/04/29 19:14:12 jake
25 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
26 * Also revert accidental addition of -I to aclocal MakeMakefile.
28 * Revision 1.9 2003/04/25 18:35:08 jake
29 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
31 * Revision 1.8 2003/03/31 21:22:12 oetiker
32 * enables RRDtool updates with microsecond or in case of windows millisecond
33 * precision. This is needed to reduce time measurement error when archive step
34 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
36 * Revision 1.7 2003/02/13 07:05:27 oetiker
37 * Find attached the patch I promised to send to you. Please note that there
38 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
39 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
40 * library is identical to librrd, but it contains support code for per-thread
41 * global variables currently used for error information only. This is similar
42 * to how errno per-thread variables are implemented. librrd_th must be linked
43 * alongside of libpthred
45 * There is also a new file "THREADS", holding some documentation.
47 * -- Peter Stamfest <peter@stamfest.at>
49 * Revision 1.6 2002/02/01 20:34:49 oetiker
50 * fixed version number and date/time
52 * Revision 1.5 2001/05/09 05:31:01 oetiker
53 * Bug fix: when update of multiple PDP/CDP RRAs coincided
54 * with interpolation of multiple PDPs an incorrect value was
55 * stored as the CDP. Especially evident for GAUGE data sources.
56 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
58 * Revision 1.4 2001/03/10 23:54:41 oetiker
59 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
60 * parser and calculator from rrd_graph and puts then in a new file,
61 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
62 * clean-up of aberrant behavior stuff, including a bug fix.
63 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
64 * -- Jake Brutlag <jakeb@corp.webtv.net>
66 * Revision 1.3 2001/03/04 13:01:55 oetiker
67 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
68 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
69 * This is backwards compatible! But new files using the Aberrant stuff are not readable
70 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
71 * -- Jake Brutlag <jakeb@corp.webtv.net>
73 * Revision 1.2 2001/03/04 11:14:25 oetiker
74 * added at-style-time@value:value syntax to rrd_update
75 * -- Dave Bodenstab <imdave@mcs.net>
77 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
80 *****************************************************************************/
83 #include <sys/types.h>
87 #include <sys/locking.h>
93 #include "rrd_rpncalc.h"
95 #include "rrd_is_thread_safe.h"
99 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
102 #include <sys/timeb.h>
105 time_t tv_sec; /* seconds */
106 long tv_usec; /* microseconds */
110 int tz_minuteswest; /* minutes W of Greenwich */
111 int tz_dsttime; /* type of dst correction */
114 static gettimeofday(struct timeval *t, struct __timezone *tz) {
116 struct timeb current_time;
118 _ftime(¤t_time);
120 t->tv_sec = current_time.time;
121 t->tv_usec = current_time.millitm * 1000;
126 * normilize time as returned by gettimeofday. usec part must
129 static void normalize_time(struct timeval *t)
133 t->tv_usec += 1000000L;
137 /* Local prototypes */
138 int LockRRD(FILE *rrd_file);
139 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
140 unsigned long *rra_current,
141 unsigned short CDP_scratch_idx, FILE *rrd_file,
142 info_t *pcdp_summary, time_t *rra_time);
143 int rrd_update_r(char *filename, char *template, int argc, char **argv);
144 int _rrd_update(char *filename, char *template, int argc, char **argv,
147 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
152 main(int argc, char **argv){
153 rrd_update(argc,argv);
154 if (rrd_test_error()) {
155 printf("RRDtool 1.1.x Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
156 "Usage: rrdupdate filename\n"
157 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
158 "\t\t\ttime|N:value[:value...]\n\n"
159 "\t\t\tat-time@value[:value...]\n\n"
160 "\t\t\t[ time:value[:value...] ..]\n\n");
162 printf("ERROR: %s\n",rrd_get_error());
170 info_t *rrd_update_v(int argc, char **argv)
172 char *template = NULL;
173 info_t *result = NULL;
177 static struct option long_options[] =
179 {"template", required_argument, 0, 't'},
182 int option_index = 0;
184 opt = getopt_long(argc, argv, "t:",
185 long_options, &option_index);
196 rrd_set_error("unknown option '%s'",argv[optind-1]);
202 /* need at least 2 arguments: filename, data. */
203 if (argc-optind < 2) {
204 rrd_set_error("Not enough arguments");
208 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
209 rc.u_int = _rrd_update(argv[optind], template,
210 argc - optind - 1, argv + optind + 1, result);
211 result->value.u_int = rc.u_int;
217 rrd_update(int argc, char **argv)
219 char *template = NULL;
223 static struct option long_options[] =
225 {"template", required_argument, 0, 't'},
228 int option_index = 0;
230 opt = getopt_long(argc, argv, "t:",
231 long_options, &option_index);
242 rrd_set_error("unknown option '%s'",argv[optind-1]);
247 /* need at least 2 arguments: filename, data. */
248 if (argc-optind < 2) {
249 rrd_set_error("Not enough arguments");
254 rc = rrd_update_r(argv[optind], template,
255 argc - optind - 1, argv + optind + 1);
260 rrd_update_r(char *filename, char *template, int argc, char **argv)
262 return _rrd_update(filename, template, argc, argv, NULL);
266 _rrd_update(char *filename, char *template, int argc, char **argv,
267 info_t *pcdp_summary)
272 unsigned long i,ii,iii=1;
274 unsigned long rra_begin; /* byte pointer to the rra
275 * area in the rrd file. this
276 * pointer never changes value */
277 unsigned long rra_start; /* byte pointer to the rra
278 * area in the rrd file. this
279 * pointer changes as each rrd is
281 unsigned long rra_current; /* byte pointer to the current write
282 * spot in the rrd file. */
283 unsigned long rra_pos_tmp; /* temporary byte pointer. */
285 pre_int,post_int; /* interval between this and
287 unsigned long proc_pdp_st; /* which pdp_st was the last
289 unsigned long occu_pdp_st; /* when was the pdp_st
290 * before the last update
292 unsigned long proc_pdp_age; /* how old was the data in
293 * the pdp prep area when it
294 * was last updated */
295 unsigned long occu_pdp_age; /* how long ago was the last
297 rrd_value_t *pdp_new; /* prepare the incoming data
298 * to be added the the
300 rrd_value_t *pdp_temp; /* prepare the pdp values
301 * to be added the the
304 long *tmpl_idx; /* index representing the settings
305 transported by the template index */
306 unsigned long tmpl_cnt = 2; /* time and data */
311 time_t rra_time; /* time of update for a RRA */
312 unsigned long current_time_usec; /* microseconds part of current time */
313 struct timeval tmp_time; /* used for time conversion */
316 int schedule_smooth = 0;
317 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
318 /* a vector of future Holt-Winters seasonal coefs */
319 unsigned long elapsed_pdp_st;
320 /* number of elapsed PDP steps since last update */
321 unsigned long *rra_step_cnt = NULL;
322 /* number of rows to be updated in an RRA for a data
324 unsigned long start_pdp_offset;
325 /* number of PDP steps since the last update that
326 * are assigned to the first CDP to be generated
327 * since the last update. */
328 unsigned short scratch_idx;
329 /* index into the CDP scratch array */
330 enum cf_en current_cf;
331 /* numeric id of the current consolidation function */
332 rpnstack_t rpnstack; /* used for COMPUTE DS */
333 int version; /* rrd version */
334 char *endptr; /* used in the conversion */
336 rpnstack_init(&rpnstack);
338 /* need at least 1 arguments: data. */
340 rrd_set_error("Not enough arguments");
346 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
349 /* initialize time */
350 version = atoi(rrd.stat_head->version);
351 gettimeofday(&tmp_time, 0);
352 normalize_time(&tmp_time);
353 current_time = tmp_time.tv_sec;
355 current_time_usec = tmp_time.tv_usec;
358 current_time_usec = 0;
361 rra_current = rra_start = rra_begin = ftell(rrd_file);
362 /* This is defined in the ANSI C standard, section 7.9.5.3:
364 When a file is opened with udpate mode ('+' as the second
365 or third character in the ... list of mode argument
366 variables), both input and ouptut may be performed on the
367 associated stream. However, ... input may not be directly
368 followed by output without an intervening call to a file
369 positioning function, unless the input oepration encounters
371 fseek(rrd_file, 0, SEEK_CUR);
374 /* get exclusive lock to whole file.
375 * lock gets removed when we close the file.
377 if (LockRRD(rrd_file) != 0) {
378 rrd_set_error("could not lock RRD");
384 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
385 rrd_set_error("allocating updvals pointer array");
391 if ((pdp_temp = malloc(sizeof(rrd_value_t)
392 *rrd.stat_head->ds_cnt))==NULL){
393 rrd_set_error("allocating pdp_temp ...");
400 if ((tmpl_idx = malloc(sizeof(unsigned long)
401 *(rrd.stat_head->ds_cnt+1)))==NULL){
402 rrd_set_error("allocating tmpl_idx ...");
409 /* initialize template redirector */
410 /* default config example (assume DS 1 is a CDEF DS)
411 tmpl_idx[0] -> 0; (time)
412 tmpl_idx[1] -> 1; (DS 0)
413 tmpl_idx[2] -> 3; (DS 2)
414 tmpl_idx[3] -> 4; (DS 3) */
415 tmpl_idx[0] = 0; /* time */
416 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
418 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
425 unsigned int tmpl_len;
427 tmpl_cnt = 1; /* the first entry is the time */
428 tmpl_len = strlen(template);
429 for(i=0;i<=tmpl_len ;i++) {
430 if (template[i] == ':' || template[i] == '\0') {
432 if (tmpl_cnt>rrd.stat_head->ds_cnt){
433 rrd_set_error("Template contains more DS definitions than RRD");
434 free(updvals); free(pdp_temp);
435 free(tmpl_idx); rrd_free(&rrd);
436 fclose(rrd_file); return(-1);
438 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
439 rrd_set_error("unknown DS name '%s'",dsname);
440 free(updvals); free(pdp_temp);
441 free(tmpl_idx); rrd_free(&rrd);
442 fclose(rrd_file); return(-1);
444 /* the first element is always the time */
445 tmpl_idx[tmpl_cnt-1]++;
446 /* go to the next entry on the template */
447 dsname = &template[i+1];
448 /* fix the damage we did before */
457 if ((pdp_new = malloc(sizeof(rrd_value_t)
458 *rrd.stat_head->ds_cnt))==NULL){
459 rrd_set_error("allocating pdp_new ...");
468 /* loop through the arguments. */
469 for(arg_i=0; arg_i<argc;arg_i++) {
470 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
471 char *step_start = stepper;
473 char *parsetime_error = NULL;
474 enum {atstyle, normal} timesyntax;
475 struct rrd_time_value ds_tv;
476 if (stepper == NULL){
477 rrd_set_error("failed duplication argv entry");
485 /* initialize all ds input to unknown except the first one
486 which has always got to be set */
487 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
488 strcpy(stepper,argv[arg_i]);
490 /* separate all ds elements; first must be examined separately
491 due to alternate time syntax */
492 if ((p=strchr(stepper,'@'))!=NULL) {
493 timesyntax = atstyle;
496 } else if ((p=strchr(stepper,':'))!=NULL) {
501 rrd_set_error("expected timestamp not found in data source from %s:...",
507 updvals[tmpl_idx[ii]] = stepper;
509 if (*stepper == ':') {
513 updvals[tmpl_idx[ii]] = stepper+1;
519 if (ii != tmpl_cnt-1) {
520 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
521 tmpl_cnt-1, ii, argv[arg_i]);
526 /* get the time from the reading ... handle N */
527 if (timesyntax == atstyle) {
528 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
529 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
533 if (ds_tv.type == RELATIVE_TO_END_TIME ||
534 ds_tv.type == RELATIVE_TO_START_TIME) {
535 rrd_set_error("specifying time relative to the 'start' "
536 "or 'end' makes no sense here: %s",
542 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
543 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
545 } else if (strcmp(updvals[0],"N")==0){
546 gettimeofday(&tmp_time, 0);
547 normalize_time(&tmp_time);
548 current_time = tmp_time.tv_sec;
549 current_time_usec = tmp_time.tv_usec;
552 tmp = strtod(updvals[0], 0);
553 current_time = floor(tmp);
554 current_time_usec = (long)((tmp - current_time) * 1000000L);
556 /* dont do any correction for old version RRDs */
558 current_time_usec = 0;
560 if(current_time <= rrd.live_head->last_up){
561 rrd_set_error("illegal attempt to update using time %ld when "
562 "last update time is %ld (minimum one second step)",
563 current_time, rrd.live_head->last_up);
569 /* seek to the beginning of the rra's */
570 if (rra_current != rra_begin) {
571 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
572 rrd_set_error("seek error in rrd");
576 rra_current = rra_begin;
578 rra_start = rra_begin;
580 /* when was the current pdp started */
581 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
582 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
584 /* when did the last pdp_st occur */
585 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
586 occu_pdp_st = current_time - occu_pdp_age;
587 /* interval = current_time - rrd.live_head->last_up; */
588 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
590 if (occu_pdp_st > proc_pdp_st){
591 /* OK we passed the pdp_st moment*/
592 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
593 * occurred before the latest
595 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
596 post_int = occu_pdp_age; /* how much after it */
597 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
611 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
612 occu_pdp_age, occu_pdp_st,
613 interval, pre_int, post_int);
616 /* process the data sources and update the pdp_prep
617 * area accordingly */
618 for(i=0;i<rrd.stat_head->ds_cnt;i++){
620 dst_idx= dst_conv(rrd.ds_def[i].dst);
621 /* NOTE: DST_CDEF should never enter this if block, because
622 * updvals[i+1][0] is initialized to 'U'; unless the caller
623 * accidently specified a value for the DST_CDEF. To handle
624 * this case, an extra check is required. */
625 if((updvals[i+1][0] != 'U') &&
626 (dst_idx != DST_CDEF) &&
627 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
629 /* the data source type defines how to process the data */
630 /* pdp_new contains rate * time ... eg the bytes
631 * transferred during the interval. Doing it this way saves
632 * a lot of math operations */
638 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
639 for(ii=0;updvals[i+1][ii] != '\0';ii++){
640 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
641 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
645 if (rrd_test_error()){
648 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
649 if(dst_idx == DST_COUNTER) {
650 /* simple overflow catcher sugestet by andres kroonmaa */
651 /* this will fail terribly for non 32 or 64 bit counters ... */
652 /* are there any others in SNMP land ? */
653 if (pdp_new[i] < (double)0.0 )
654 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
655 if (pdp_new[i] < (double)0.0 )
656 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
658 rate = pdp_new[i] / interval;
666 pdp_new[i] = strtod(updvals[i+1],&endptr);
668 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
671 if (endptr[0] != '\0'){
672 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
675 rate = pdp_new[i] / interval;
679 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
681 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
684 if (endptr[0] != '\0'){
685 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
688 rate = pdp_new[i] / interval;
691 rrd_set_error("rrd contains unknown DS type : '%s'",
695 /* break out of this for loop if the error string is set */
696 if (rrd_test_error()){
699 /* make sure pdp_temp is neither too large or too small
700 * if any of these occur it becomes unknown ...
702 if ( ! isnan(rate) &&
703 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
704 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
705 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
706 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
710 /* no news is news all the same */
714 /* make a copy of the command line argument for the next run */
722 rrd.pdp_prep[i].last_ds,
723 updvals[i+1], pdp_new[i]);
725 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
726 strncpy(rrd.pdp_prep[i].last_ds,
727 updvals[i+1],LAST_DS_LEN-1);
728 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
731 /* break out of the argument parsing loop if the error_string is set */
732 if (rrd_test_error()){
736 /* has a pdp_st moment occurred since the last run ? */
738 if (proc_pdp_st == occu_pdp_st){
739 /* no we have not passed a pdp_st moment. therefore update is simple */
741 for(i=0;i<rrd.stat_head->ds_cnt;i++){
742 if(isnan(pdp_new[i]))
743 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
745 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
752 rrd.pdp_prep[i].scratch[PDP_val].u_val,
753 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
757 /* an pdp_st has occurred. */
759 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
760 * occurred up to the last run.
761 pdp_new[] contains rate*seconds from the latest run.
762 pdp_temp[] will contain the rate for cdp */
764 for(i=0;i<rrd.stat_head->ds_cnt;i++){
765 /* update pdp_prep to the current pdp_st */
766 if(isnan(pdp_new[i]))
767 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
769 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
770 pdp_new[i]/(double)interval*(double)pre_int;
772 /* if too much of the pdp_prep is unknown we dump it */
773 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775 (occu_pdp_st-proc_pdp_st <=
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780 / (double)( occu_pdp_st
782 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
785 /* process CDEF data sources; remember each CDEF DS can
786 * only reference other DS with a lower index number */
787 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
789 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790 /* substitue data values for OP_VARIABLE nodes */
791 for (ii = 0; rpnp[ii].op != OP_END; ii++)
793 if (rpnp[ii].op == OP_VARIABLE) {
794 rpnp[ii].op = OP_NUMBER;
795 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
798 /* run the rpn calculator */
799 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
801 break; /* exits the data sources pdp_temp loop */
805 /* make pdp_prep ready for the next run */
806 if(isnan(pdp_new[i])){
807 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
808 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
811 rrd.pdp_prep[i].scratch[PDP_val].u_val =
812 pdp_new[i]/(double)interval*(double)post_int;
820 "new_unkn_sec %5lu\n",
822 rrd.pdp_prep[i].scratch[PDP_val].u_val,
823 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 /* if there were errors during the last loop, bail out here */
828 if (rrd_test_error()){
833 /* compute the number of elapsed pdp_st moments */
834 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
836 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
838 if (rra_step_cnt == NULL)
840 rra_step_cnt = (unsigned long *)
841 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
844 for(i = 0, rra_start = rra_begin;
845 i < rrd.stat_head->rra_cnt;
846 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
849 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
850 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
851 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
852 if (start_pdp_offset <= elapsed_pdp_st) {
853 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
854 rrd.rra_def[i].pdp_cnt + 1;
859 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
861 /* If this is a bulk update, we need to skip ahead in the seasonal
862 * arrays so that they will be correct for the next observed value;
863 * note that for the bulk update itself, no update will occur to
864 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
866 if (rra_step_cnt[i] > 2)
868 /* skip update by resetting rra_step_cnt[i],
869 * note that this is not data source specific; this is due
870 * to the bulk update, not a DNAN value for the specific data
873 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
874 &last_seasonal_coef);
875 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879 /* periodically run a smoother for seasonal effects */
880 /* Need to use first cdp parameter buffer to track
881 * burnin (burnin requires a specific smoothing schedule).
882 * The CDP_init_seasonal parameter is really an RRA level,
883 * not a data source within RRA level parameter, but the rra_def
884 * is read only for rrd_update (not flushed to disk). */
885 iii = i*(rrd.stat_head -> ds_cnt);
886 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
889 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
890 > rrd.rra_def[i].row_cnt - 1) {
891 /* mark off one of the burnin cycles */
892 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896 /* someone has no doubt invented a trick to deal with this
897 * wrap around, but at least this code is clear. */
898 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
899 rrd.rra_ptr[i].cur_row)
901 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
902 * mapping between PDP and CDP */
903 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
904 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
909 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
910 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
915 /* can't rely on negative numbers because we are working with
917 /* Don't need modulus here. If we've wrapped more than once, only
918 * one smooth is executed at the end. */
919 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
920 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
921 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
926 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
927 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
934 rra_current = ftell(rrd_file);
935 } /* if cf is DEVSEASONAL or SEASONAL */
937 if (rrd_test_error()) break;
939 /* update CDP_PREP areas */
940 /* loop over data soures within each RRA */
942 ii < rrd.stat_head->ds_cnt;
946 /* iii indexes the CDP prep area for this data source within the RRA */
947 iii=i*rrd.stat_head->ds_cnt+ii;
949 if (rrd.rra_def[i].pdp_cnt > 1) {
951 if (rra_step_cnt[i] > 0) {
952 /* If we are in this block, as least 1 CDP value will be written to
953 * disk, this is the CDP_primary_val entry. If more than 1 value needs
954 * to be written, then the "fill in" value is the CDP_secondary_val
956 if (isnan(pdp_temp[ii]))
958 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
959 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
961 /* CDP_secondary value is the RRA "fill in" value for intermediary
962 * CDP data entries. No matter the CF, the value is the same because
963 * the average, max, min, and last of a list of identical values is
964 * the same, namely, the value itself. */
965 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
968 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
969 > rrd.rra_def[i].pdp_cnt*
970 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
972 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
973 /* initialize carry over */
974 if (current_cf == CF_AVERAGE) {
975 if (isnan(pdp_temp[ii])) {
976 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
978 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
979 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
982 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
985 rrd_value_t cum_val, cur_val;
986 switch (current_cf) {
988 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
989 cur_val = IFDNAN(pdp_temp[ii],0.0);
990 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
991 (cum_val + cur_val * start_pdp_offset) /
992 (rrd.rra_def[i].pdp_cnt
993 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
994 /* initialize carry over value */
995 if (isnan(pdp_temp[ii])) {
996 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
998 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
999 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1004 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1006 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1007 isnan(pdp_temp[ii])) {
1009 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1014 if (cur_val > cum_val)
1015 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1017 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1018 /* initialize carry over value */
1019 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1022 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1023 cur_val = IFDNAN(pdp_temp[ii],DINF);
1025 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1026 isnan(pdp_temp[ii])) {
1028 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1033 if (cur_val < cum_val)
1034 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1036 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1037 /* initialize carry over value */
1038 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1043 /* initialize carry over value */
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047 } /* endif meets xff value requirement for a valid value */
1048 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1049 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1050 if (isnan(pdp_temp[ii]))
1051 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1052 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1055 } else /* rra_step_cnt[i] == 0 */
1058 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1059 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1062 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1063 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066 if (isnan(pdp_temp[ii])) {
1067 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1068 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1070 if (current_cf == CF_AVERAGE) {
1071 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1074 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1077 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1078 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1081 switch (current_cf) {
1083 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1088 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1092 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1101 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1102 if (elapsed_pdp_st > 2)
1104 switch (current_cf) {
1107 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1108 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1111 case CF_DEVSEASONAL:
1112 /* need to update cached seasonal values, so they are consistent
1113 * with the bulk update */
1114 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1115 * CDP_last_deviation are the same. */
1116 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1117 last_seasonal_coef[ii];
1118 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122 /* need to update the null_count and last_null_count.
1123 * even do this for non-DNAN pdp_temp because the
1124 * algorithm is not learning from batch updates. */
1125 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1127 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1131 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1132 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1135 /* do not count missed bulk values as failures */
1136 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1137 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1138 /* need to reset violations buffer.
1139 * could do this more carefully, but for now, just
1140 * assume a bulk update wipes away all violations. */
1141 erase_violations(&rrd, iii, i);
1145 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1147 if (rrd_test_error()) break;
1149 } /* endif data sources loop */
1150 } /* end RRA Loop */
1152 /* this loop is only entered if elapsed_pdp_st < 3 */
1153 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1154 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1156 for(i = 0, rra_start = rra_begin;
1157 i < rrd.stat_head->rra_cnt;
1158 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1161 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1163 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1164 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1166 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1167 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1169 rra_current = ftell(rrd_file);
1171 if (rrd_test_error()) break;
1172 /* loop over data soures within each RRA */
1174 ii < rrd.stat_head->ds_cnt;
1177 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1178 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1179 scratch_idx, seasonal_coef);
1181 } /* end RRA Loop */
1182 if (rrd_test_error()) break;
1183 } /* end elapsed_pdp_st loop */
1185 if (rrd_test_error()) break;
1187 /* Ready to write to disk */
1188 /* Move sequentially through the file, writing one RRA at a time.
1189 * Note this architecture divorces the computation of CDP with
1190 * flushing updated RRA entries to disk. */
1191 for(i = 0, rra_start = rra_begin;
1192 i < rrd.stat_head->rra_cnt;
1193 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1195 /* is there anything to write for this RRA? If not, continue. */
1196 if (rra_step_cnt[i] == 0) continue;
1198 /* write the first row */
1200 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1202 rrd.rra_ptr[i].cur_row++;
1203 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1204 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1205 /* positition on the first row */
1206 rra_pos_tmp = rra_start +
1207 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1208 if(rra_pos_tmp != rra_current) {
1209 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1210 rrd_set_error("seek error in rrd");
1213 rra_current = rra_pos_tmp;
1217 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1219 scratch_idx = CDP_primary_val;
1220 if (pcdp_summary != NULL)
1222 rra_time = (current_time - current_time
1223 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1224 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1226 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1227 pcdp_summary, &rra_time);
1228 if (rrd_test_error()) break;
1230 /* write other rows of the bulk update, if any */
1231 scratch_idx = CDP_secondary_val;
1232 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1234 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1237 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1238 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1241 rrd.rra_ptr[i].cur_row = 0;
1242 /* seek back to beginning of current rra */
1243 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1245 rrd_set_error("seek error in rrd");
1249 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1251 rra_current = rra_start;
1253 if (pcdp_summary != NULL)
1255 rra_time = (current_time - current_time
1256 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1257 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1259 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1260 pcdp_summary, &rra_time);
1263 if (rrd_test_error())
1267 /* break out of the argument parsing loop if error_string is set */
1268 if (rrd_test_error()){
1273 } /* endif a pdp_st has occurred */
1274 rrd.live_head->last_up = current_time;
1275 rrd.live_head->last_up_usec = current_time_usec;
1277 } /* function argument loop */
1279 if (seasonal_coef != NULL) free(seasonal_coef);
1280 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1281 if (rra_step_cnt != NULL) free(rra_step_cnt);
1282 rpnstack_free(&rpnstack);
1284 /* if we got here and if there is an error and if the file has not been
1285 * written to, then close things up and return. */
1286 if (rrd_test_error()) {
1296 /* aargh ... that was tough ... so many loops ... anyway, its done.
1297 * we just need to write back the live header portion now*/
1299 if (fseek(rrd_file, (sizeof(stat_head_t)
1300 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1301 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1303 rrd_set_error("seek rrd for live header writeback");
1314 if(fwrite( rrd.live_head,
1315 sizeof(live_head_t), 1, rrd_file) != 1){
1316 rrd_set_error("fwrite live_head to rrd");
1327 if(fwrite( &rrd.live_head->last_up,
1328 sizeof(time_t), 1, rrd_file) != 1){
1329 rrd_set_error("fwrite live_head to rrd");
1341 if(fwrite( rrd.pdp_prep,
1343 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1344 rrd_set_error("ftwrite pdp_prep to rrd");
1354 if(fwrite( rrd.cdp_prep,
1356 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1357 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1359 rrd_set_error("ftwrite cdp_prep to rrd");
1369 if(fwrite( rrd.rra_ptr,
1371 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1372 rrd_set_error("fwrite rra_ptr to rrd");
1382 /* OK now close the files and free the memory */
1383 if(fclose(rrd_file) != 0){
1384 rrd_set_error("closing rrd");
1393 /* calling the smoothing code here guarantees at most
1394 * one smoothing operation per rrd_update call. Unfortunately,
1395 * it is possible with bulk updates, or a long-delayed update
1396 * for smoothing to occur off-schedule. This really isn't
1397 * critical except during the burning cycles. */
1398 if (schedule_smooth)
1401 rrd_file = fopen(filename,"r+");
1403 rrd_file = fopen(filename,"rb+");
1405 rra_start = rra_begin;
1406 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1408 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1409 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1412 fprintf(stderr,"Running smoother for rra %ld\n",i);
1414 apply_smoother(&rrd,i,rra_start,rrd_file);
1415 if (rrd_test_error())
1418 rra_start += rrd.rra_def[i].row_cnt
1419 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1432 * get exclusive lock to whole file.
1433 * lock gets removed when we close the file
1435 * returns 0 on success
1438 LockRRD(FILE *rrdfile)
1440 int rrd_fd; /* File descriptor for RRD */
1443 rrd_fd = fileno(rrdfile);
1448 lock.l_type = F_WRLCK; /* exclusive write lock */
1449 lock.l_len = 0; /* whole file */
1450 lock.l_start = 0; /* start of file */
1451 lock.l_whence = SEEK_SET; /* end of file */
1453 stat = fcntl(rrd_fd, F_SETLK, &lock);
1457 if ( _fstat( rrd_fd, &st ) == 0 ) {
1458 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1470 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1471 unsigned short CDP_scratch_idx, FILE *rrd_file,
1472 info_t *pcdp_summary, time_t *rra_time)
1474 unsigned long ds_idx, cdp_idx;
1477 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1479 /* compute the cdp index */
1480 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1482 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1483 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1484 rrd -> rra_def[rra_idx].cf_nam);
1486 if (pcdp_summary != NULL)
1488 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1489 /* append info to the return hash */
1490 pcdp_summary = info_push(pcdp_summary,
1491 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1492 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1493 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1496 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1497 sizeof(rrd_value_t),1,rrd_file) != 1)
1499 rrd_set_error("writing rrd");
1502 *rra_current += sizeof(rrd_value_t);
1504 return (pcdp_summary);