1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.14 2003/11/11 19:46:21 oetiker
9 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
11 * Revision 1.13 2003/11/11 19:38:03 oetiker
12 * rrd files should NOT change size ever ... bulk update code wa buggy.
13 * -- David M. Grimes <dgrimes@navisite.com>
15 * Revision 1.12 2003/09/04 13:16:12 oetiker
16 * should not assigne but compare ... grrrrr
18 * Revision 1.11 2003/09/02 21:58:35 oetiker
19 * be pickier about what we accept in rrd_update. Complain if things do not work out
21 * Revision 1.10 2003/04/29 19:14:12 jake
22 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
23 * Also revert accidental addition of -I to aclocal MakeMakefile.
25 * Revision 1.9 2003/04/25 18:35:08 jake
26 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
28 * Revision 1.8 2003/03/31 21:22:12 oetiker
29 * enables RRDtool updates with microsecond or in case of windows millisecond
30 * precision. This is needed to reduce time measurement error when archive step
31 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
33 * Revision 1.7 2003/02/13 07:05:27 oetiker
34 * Find attached the patch I promised to send to you. Please note that there
35 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
36 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
37 * library is identical to librrd, but it contains support code for per-thread
38 * global variables currently used for error information only. This is similar
39 * to how errno per-thread variables are implemented. librrd_th must be linked
40 * alongside of libpthred
42 * There is also a new file "THREADS", holding some documentation.
44 * -- Peter Stamfest <peter@stamfest.at>
46 * Revision 1.6 2002/02/01 20:34:49 oetiker
47 * fixed version number and date/time
49 * Revision 1.5 2001/05/09 05:31:01 oetiker
50 * Bug fix: when update of multiple PDP/CDP RRAs coincided
51 * with interpolation of multiple PDPs an incorrect value was
52 * stored as the CDP. Especially evident for GAUGE data sources.
53 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
55 * Revision 1.4 2001/03/10 23:54:41 oetiker
56 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
57 * parser and calculator from rrd_graph and puts then in a new file,
58 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
59 * clean-up of aberrant behavior stuff, including a bug fix.
60 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
61 * -- Jake Brutlag <jakeb@corp.webtv.net>
63 * Revision 1.3 2001/03/04 13:01:55 oetiker
64 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
65 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
66 * This is backwards compatible! But new files using the Aberrant stuff are not readable
67 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
68 * -- Jake Brutlag <jakeb@corp.webtv.net>
70 * Revision 1.2 2001/03/04 11:14:25 oetiker
71 * added at-style-time@value:value syntax to rrd_update
72 * -- Dave Bodenstab <imdave@mcs.net>
74 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
77 *****************************************************************************/
80 #include <sys/types.h>
84 #include <sys/locking.h>
90 #include "rrd_rpncalc.h"
92 #include "rrd_is_thread_safe.h"
96 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
99 #include <sys/timeb.h>
102 time_t tv_sec; /* seconds */
103 long tv_usec; /* microseconds */
107 int tz_minuteswest; /* minutes W of Greenwich */
108 int tz_dsttime; /* type of dst correction */
111 static gettimeofday(struct timeval *t, struct __timezone *tz) {
113 struct timeb current_time;
115 _ftime(¤t_time);
117 t->tv_sec = current_time.time;
118 t->tv_usec = current_time.millitm * 1000;
123 * normilize time as returned by gettimeofday. usec part must
126 static void normalize_time(struct timeval *t)
130 t->tv_usec += 1000000L;
134 /* Local prototypes */
135 int LockRRD(FILE *rrd_file);
136 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
137 unsigned long *rra_current,
138 unsigned short CDP_scratch_idx, FILE *rrd_file,
139 info_t *pcdp_summary, time_t *rra_time);
140 int rrd_update_r(char *filename, char *template, int argc, char **argv);
141 int _rrd_update(char *filename, char *template, int argc, char **argv,
144 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
149 main(int argc, char **argv){
150 rrd_update(argc,argv);
151 if (rrd_test_error()) {
152 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
153 "Usage: rrdupdate filename\n"
154 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
155 "\t\t\ttime|N:value[:value...]\n\n"
156 "\t\t\tat-time@value[:value...]\n\n"
157 "\t\t\t[ time:value[:value...] ..]\n\n");
159 printf("ERROR: %s\n",rrd_get_error());
167 info_t *rrd_update_v(int argc, char **argv)
169 char *template = NULL;
170 info_t *result = NULL;
174 static struct option long_options[] =
176 {"template", required_argument, 0, 't'},
179 int option_index = 0;
181 opt = getopt_long(argc, argv, "t:",
182 long_options, &option_index);
193 rrd_set_error("unknown option '%s'",argv[optind-1]);
199 /* need at least 2 arguments: filename, data. */
200 if (argc-optind < 2) {
201 rrd_set_error("Not enough arguments");
205 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
206 rc.u_int = _rrd_update(argv[optind], template,
207 argc - optind - 1, argv + optind + 1, result);
208 result->value.u_int = rc.u_int;
214 rrd_update(int argc, char **argv)
216 char *template = NULL;
220 static struct option long_options[] =
222 {"template", required_argument, 0, 't'},
225 int option_index = 0;
227 opt = getopt_long(argc, argv, "t:",
228 long_options, &option_index);
239 rrd_set_error("unknown option '%s'",argv[optind-1]);
244 /* need at least 2 arguments: filename, data. */
245 if (argc-optind < 2) {
246 rrd_set_error("Not enough arguments");
251 rc = rrd_update_r(argv[optind], template,
252 argc - optind - 1, argv + optind + 1);
257 rrd_update_r(char *filename, char *template, int argc, char **argv)
259 return _rrd_update(filename, template, argc, argv, NULL);
263 _rrd_update(char *filename, char *template, int argc, char **argv,
264 info_t *pcdp_summary)
269 unsigned long i,ii,iii=1;
271 unsigned long rra_begin; /* byte pointer to the rra
272 * area in the rrd file. this
273 * pointer never changes value */
274 unsigned long rra_start; /* byte pointer to the rra
275 * area in the rrd file. this
276 * pointer changes as each rrd is
278 unsigned long rra_current; /* byte pointer to the current write
279 * spot in the rrd file. */
280 unsigned long rra_pos_tmp; /* temporary byte pointer. */
282 pre_int,post_int; /* interval between this and
284 unsigned long proc_pdp_st; /* which pdp_st was the last
286 unsigned long occu_pdp_st; /* when was the pdp_st
287 * before the last update
289 unsigned long proc_pdp_age; /* how old was the data in
290 * the pdp prep area when it
291 * was last updated */
292 unsigned long occu_pdp_age; /* how long ago was the last
294 rrd_value_t *pdp_new; /* prepare the incoming data
295 * to be added the the
297 rrd_value_t *pdp_temp; /* prepare the pdp values
298 * to be added the the
301 long *tmpl_idx; /* index representing the settings
302 transported by the template index */
303 unsigned long tmpl_cnt = 2; /* time and data */
308 time_t rra_time; /* time of update for a RRA */
309 unsigned long current_time_usec; /* microseconds part of current time */
310 struct timeval tmp_time; /* used for time conversion */
313 int schedule_smooth = 0;
314 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
315 /* a vector of future Holt-Winters seasonal coefs */
316 unsigned long elapsed_pdp_st;
317 /* number of elapsed PDP steps since last update */
318 unsigned long *rra_step_cnt = NULL;
319 /* number of rows to be updated in an RRA for a data
321 unsigned long start_pdp_offset;
322 /* number of PDP steps since the last update that
323 * are assigned to the first CDP to be generated
324 * since the last update. */
325 unsigned short scratch_idx;
326 /* index into the CDP scratch array */
327 enum cf_en current_cf;
328 /* numeric id of the current consolidation function */
329 rpnstack_t rpnstack; /* used for COMPUTE DS */
330 int version; /* rrd version */
331 char *endptr; /* used in the conversion */
333 rpnstack_init(&rpnstack);
335 /* need at least 1 arguments: data. */
337 rrd_set_error("Not enough arguments");
343 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
346 /* initialize time */
347 version = atoi(rrd.stat_head->version);
348 gettimeofday(&tmp_time, 0);
349 normalize_time(&tmp_time);
350 current_time = tmp_time.tv_sec;
352 current_time_usec = tmp_time.tv_usec;
355 current_time_usec = 0;
358 rra_current = rra_start = rra_begin = ftell(rrd_file);
359 /* This is defined in the ANSI C standard, section 7.9.5.3:
361 When a file is opened with udpate mode ('+' as the second
362 or third character in the ... list of mode argument
363 variables), both input and ouptut may be performed on the
364 associated stream. However, ... input may not be directly
365 followed by output without an intervening call to a file
366 positioning function, unless the input oepration encounters
368 fseek(rrd_file, 0, SEEK_CUR);
371 /* get exclusive lock to whole file.
372 * lock gets removed when we close the file.
374 if (LockRRD(rrd_file) != 0) {
375 rrd_set_error("could not lock RRD");
381 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
382 rrd_set_error("allocating updvals pointer array");
388 if ((pdp_temp = malloc(sizeof(rrd_value_t)
389 *rrd.stat_head->ds_cnt))==NULL){
390 rrd_set_error("allocating pdp_temp ...");
397 if ((tmpl_idx = malloc(sizeof(unsigned long)
398 *(rrd.stat_head->ds_cnt+1)))==NULL){
399 rrd_set_error("allocating tmpl_idx ...");
406 /* initialize template redirector */
407 /* default config example (assume DS 1 is a CDEF DS)
408 tmpl_idx[0] -> 0; (time)
409 tmpl_idx[1] -> 1; (DS 0)
410 tmpl_idx[2] -> 3; (DS 2)
411 tmpl_idx[3] -> 4; (DS 3) */
412 tmpl_idx[0] = 0; /* time */
413 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
415 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
422 unsigned int tmpl_len;
424 tmpl_cnt = 1; /* the first entry is the time */
425 tmpl_len = strlen(template);
426 for(i=0;i<=tmpl_len ;i++) {
427 if (template[i] == ':' || template[i] == '\0') {
429 if (tmpl_cnt>rrd.stat_head->ds_cnt){
430 rrd_set_error("Template contains more DS definitions than RRD");
431 free(updvals); free(pdp_temp);
432 free(tmpl_idx); rrd_free(&rrd);
433 fclose(rrd_file); return(-1);
435 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
436 rrd_set_error("unknown DS name '%s'",dsname);
437 free(updvals); free(pdp_temp);
438 free(tmpl_idx); rrd_free(&rrd);
439 fclose(rrd_file); return(-1);
441 /* the first element is always the time */
442 tmpl_idx[tmpl_cnt-1]++;
443 /* go to the next entry on the template */
444 dsname = &template[i+1];
445 /* fix the damage we did before */
454 if ((pdp_new = malloc(sizeof(rrd_value_t)
455 *rrd.stat_head->ds_cnt))==NULL){
456 rrd_set_error("allocating pdp_new ...");
465 /* loop through the arguments. */
466 for(arg_i=0; arg_i<argc;arg_i++) {
467 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
468 char *step_start = stepper;
470 char *parsetime_error = NULL;
471 enum {atstyle, normal} timesyntax;
472 struct rrd_time_value ds_tv;
473 if (stepper == NULL){
474 rrd_set_error("failed duplication argv entry");
482 /* initialize all ds input to unknown except the first one
483 which has always got to be set */
484 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
485 strcpy(stepper,argv[arg_i]);
487 /* separate all ds elements; first must be examined separately
488 due to alternate time syntax */
489 if ((p=strchr(stepper,'@'))!=NULL) {
490 timesyntax = atstyle;
493 } else if ((p=strchr(stepper,':'))!=NULL) {
498 rrd_set_error("expected timestamp not found in data source from %s:...",
504 updvals[tmpl_idx[ii]] = stepper;
506 if (*stepper == ':') {
510 updvals[tmpl_idx[ii]] = stepper+1;
516 if (ii != tmpl_cnt-1) {
517 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
518 tmpl_cnt-1, ii, argv[arg_i]);
523 /* get the time from the reading ... handle N */
524 if (timesyntax == atstyle) {
525 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
526 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
530 if (ds_tv.type == RELATIVE_TO_END_TIME ||
531 ds_tv.type == RELATIVE_TO_START_TIME) {
532 rrd_set_error("specifying time relative to the 'start' "
533 "or 'end' makes no sense here: %s",
539 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
540 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
542 } else if (strcmp(updvals[0],"N")==0){
543 gettimeofday(&tmp_time, 0);
544 normalize_time(&tmp_time);
545 current_time = tmp_time.tv_sec;
546 current_time_usec = tmp_time.tv_usec;
549 tmp = strtod(updvals[0], 0);
550 current_time = floor(tmp);
551 current_time_usec = (long)((tmp - current_time) * 1000000L);
553 /* dont do any correction for old version RRDs */
555 current_time_usec = 0;
557 if(current_time <= rrd.live_head->last_up){
558 rrd_set_error("illegal attempt to update using time %ld when "
559 "last update time is %ld (minimum one second step)",
560 current_time, rrd.live_head->last_up);
566 /* seek to the beginning of the rra's */
567 if (rra_current != rra_begin) {
568 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
569 rrd_set_error("seek error in rrd");
573 rra_current = rra_begin;
575 rra_start = rra_begin;
577 /* when was the current pdp started */
578 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
579 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
581 /* when did the last pdp_st occur */
582 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
583 occu_pdp_st = current_time - occu_pdp_age;
584 /* interval = current_time - rrd.live_head->last_up; */
585 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
587 if (occu_pdp_st > proc_pdp_st){
588 /* OK we passed the pdp_st moment*/
589 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
590 * occurred before the latest
592 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
593 post_int = occu_pdp_age; /* how much after it */
594 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
608 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
609 occu_pdp_age, occu_pdp_st,
610 interval, pre_int, post_int);
613 /* process the data sources and update the pdp_prep
614 * area accordingly */
615 for(i=0;i<rrd.stat_head->ds_cnt;i++){
617 dst_idx= dst_conv(rrd.ds_def[i].dst);
618 /* NOTE: DST_CDEF should never enter this if block, because
619 * updvals[i+1][0] is initialized to 'U'; unless the caller
620 * accidently specified a value for the DST_CDEF. To handle
621 * this case, an extra check is required. */
622 if((updvals[i+1][0] != 'U') &&
623 (dst_idx != DST_CDEF) &&
624 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
626 /* the data source type defines how to process the data */
627 /* pdp_new contains rate * time ... eg the bytes
628 * transferred during the interval. Doing it this way saves
629 * a lot of math operations */
635 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636 for(ii=0;updvals[i+1][ii] != '\0';ii++){
637 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
642 if (rrd_test_error()){
645 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646 if(dst_idx == DST_COUNTER) {
647 /* simple overflow catcher sugestet by andres kroonmaa */
648 /* this will fail terribly for non 32 or 64 bit counters ... */
649 /* are there any others in SNMP land ? */
650 if (pdp_new[i] < (double)0.0 )
651 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
652 if (pdp_new[i] < (double)0.0 )
653 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
655 rate = pdp_new[i] / interval;
663 pdp_new[i] = strtod(updvals[i+1],&endptr);
665 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
668 if (endptr[0] != '\0'){
669 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
672 rate = pdp_new[i] / interval;
676 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
678 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
681 if (endptr[0] != '\0'){
682 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
685 rate = pdp_new[i] / interval;
688 rrd_set_error("rrd contains unknown DS type : '%s'",
692 /* break out of this for loop if the error string is set */
693 if (rrd_test_error()){
696 /* make sure pdp_temp is neither too large or too small
697 * if any of these occur it becomes unknown ...
699 if ( ! isnan(rate) &&
700 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
702 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
707 /* no news is news all the same */
711 /* make a copy of the command line argument for the next run */
719 rrd.pdp_prep[i].last_ds,
720 updvals[i+1], pdp_new[i]);
722 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723 strncpy(rrd.pdp_prep[i].last_ds,
724 updvals[i+1],LAST_DS_LEN-1);
725 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
728 /* break out of the argument parsing loop if the error_string is set */
729 if (rrd_test_error()){
733 /* has a pdp_st moment occurred since the last run ? */
735 if (proc_pdp_st == occu_pdp_st){
736 /* no we have not passed a pdp_st moment. therefore update is simple */
738 for(i=0;i<rrd.stat_head->ds_cnt;i++){
739 if(isnan(pdp_new[i]))
740 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
742 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
749 rrd.pdp_prep[i].scratch[PDP_val].u_val,
750 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
754 /* an pdp_st has occurred. */
756 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
757 * occurred up to the last run.
758 pdp_new[] contains rate*seconds from the latest run.
759 pdp_temp[] will contain the rate for cdp */
761 for(i=0;i<rrd.stat_head->ds_cnt;i++){
762 /* update pdp_prep to the current pdp_st */
763 if(isnan(pdp_new[i]))
764 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
766 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
767 pdp_new[i]/(double)interval*(double)pre_int;
769 /* if too much of the pdp_prep is unknown we dump it */
770 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
771 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
772 (occu_pdp_st-proc_pdp_st <=
773 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
776 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
777 / (double)( occu_pdp_st
779 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
782 /* process CDEF data sources; remember each CDEF DS can
783 * only reference other DS with a lower index number */
784 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
786 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
787 /* substitue data values for OP_VARIABLE nodes */
788 for (ii = 0; rpnp[ii].op != OP_END; ii++)
790 if (rpnp[ii].op == OP_VARIABLE) {
791 rpnp[ii].op = OP_NUMBER;
792 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
795 /* run the rpn calculator */
796 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
798 break; /* exits the data sources pdp_temp loop */
802 /* make pdp_prep ready for the next run */
803 if(isnan(pdp_new[i])){
804 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
805 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
807 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
808 rrd.pdp_prep[i].scratch[PDP_val].u_val =
809 pdp_new[i]/(double)interval*(double)post_int;
817 "new_unkn_sec %5lu\n",
819 rrd.pdp_prep[i].scratch[PDP_val].u_val,
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
824 /* if there were errors during the last loop, bail out here */
825 if (rrd_test_error()){
830 /* compute the number of elapsed pdp_st moments */
831 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
833 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
835 if (rra_step_cnt == NULL)
837 rra_step_cnt = (unsigned long *)
838 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
841 for(i = 0, rra_start = rra_begin;
842 i < rrd.stat_head->rra_cnt;
843 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
846 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
847 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
848 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
849 if (start_pdp_offset <= elapsed_pdp_st) {
850 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
851 rrd.rra_def[i].pdp_cnt + 1;
856 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
858 /* If this is a bulk update, we need to skip ahead in the seasonal
859 * arrays so that they will be correct for the next observed value;
860 * note that for the bulk update itself, no update will occur to
861 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
863 if (rra_step_cnt[i] > 2)
865 /* skip update by resetting rra_step_cnt[i],
866 * note that this is not data source specific; this is due
867 * to the bulk update, not a DNAN value for the specific data
870 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
871 &last_seasonal_coef);
872 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
876 /* periodically run a smoother for seasonal effects */
877 /* Need to use first cdp parameter buffer to track
878 * burnin (burnin requires a specific smoothing schedule).
879 * The CDP_init_seasonal parameter is really an RRA level,
880 * not a data source within RRA level parameter, but the rra_def
881 * is read only for rrd_update (not flushed to disk). */
882 iii = i*(rrd.stat_head -> ds_cnt);
883 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
886 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
887 > rrd.rra_def[i].row_cnt - 1) {
888 /* mark off one of the burnin cycles */
889 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
893 /* someone has no doubt invented a trick to deal with this
894 * wrap around, but at least this code is clear. */
895 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
896 rrd.rra_ptr[i].cur_row)
898 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
899 * mapping between PDP and CDP */
900 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
901 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
905 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
906 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
907 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
912 /* can't rely on negative numbers because we are working with
914 /* Don't need modulus here. If we've wrapped more than once, only
915 * one smooth is executed at the end. */
916 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
917 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
918 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
922 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
923 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
924 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 rra_current = ftell(rrd_file);
932 } /* if cf is DEVSEASONAL or SEASONAL */
934 if (rrd_test_error()) break;
936 /* update CDP_PREP areas */
937 /* loop over data soures within each RRA */
939 ii < rrd.stat_head->ds_cnt;
943 /* iii indexes the CDP prep area for this data source within the RRA */
944 iii=i*rrd.stat_head->ds_cnt+ii;
946 if (rrd.rra_def[i].pdp_cnt > 1) {
948 if (rra_step_cnt[i] > 0) {
949 /* If we are in this block, as least 1 CDP value will be written to
950 * disk, this is the CDP_primary_val entry. If more than 1 value needs
951 * to be written, then the "fill in" value is the CDP_secondary_val
953 if (isnan(pdp_temp[ii]))
955 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
956 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
958 /* CDP_secondary value is the RRA "fill in" value for intermediary
959 * CDP data entries. No matter the CF, the value is the same because
960 * the average, max, min, and last of a list of identical values is
961 * the same, namely, the value itself. */
962 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
965 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
966 > rrd.rra_def[i].pdp_cnt*
967 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
969 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
970 /* initialize carry over */
971 if (current_cf == CF_AVERAGE) {
972 if (isnan(pdp_temp[ii])) {
973 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
982 rrd_value_t cum_val, cur_val;
983 switch (current_cf) {
985 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
986 cur_val = IFDNAN(pdp_temp[ii],0.0);
987 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
988 (cum_val + cur_val * start_pdp_offset) /
989 (rrd.rra_def[i].pdp_cnt
990 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
991 /* initialize carry over value */
992 if (isnan(pdp_temp[ii])) {
993 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
995 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
996 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1000 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1001 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1003 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1004 isnan(pdp_temp[ii])) {
1006 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1011 if (cur_val > cum_val)
1012 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1014 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1015 /* initialize carry over value */
1016 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1019 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1020 cur_val = IFDNAN(pdp_temp[ii],DINF);
1022 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1023 isnan(pdp_temp[ii])) {
1025 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1030 if (cur_val < cum_val)
1031 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1033 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1034 /* initialize carry over value */
1035 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1040 /* initialize carry over value */
1041 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1044 } /* endif meets xff value requirement for a valid value */
1045 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1046 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1047 if (isnan(pdp_temp[ii]))
1048 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1049 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1051 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1052 } else /* rra_step_cnt[i] == 0 */
1055 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1056 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1059 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1060 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1063 if (isnan(pdp_temp[ii])) {
1064 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1065 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1067 if (current_cf == CF_AVERAGE) {
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1071 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1074 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1075 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1078 switch (current_cf) {
1080 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1084 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1088 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1099 if (elapsed_pdp_st > 2)
1101 switch (current_cf) {
1104 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1105 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1108 case CF_DEVSEASONAL:
1109 /* need to update cached seasonal values, so they are consistent
1110 * with the bulk update */
1111 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1112 * CDP_last_deviation are the same. */
1113 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1114 last_seasonal_coef[ii];
1115 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1119 /* need to update the null_count and last_null_count.
1120 * even do this for non-DNAN pdp_temp because the
1121 * algorithm is not learning from batch updates. */
1122 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1124 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1128 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1129 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1132 /* do not count missed bulk values as failures */
1133 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1134 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1135 /* need to reset violations buffer.
1136 * could do this more carefully, but for now, just
1137 * assume a bulk update wipes away all violations. */
1138 erase_violations(&rrd, iii, i);
1142 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1144 if (rrd_test_error()) break;
1146 } /* endif data sources loop */
1147 } /* end RRA Loop */
1149 /* this loop is only entered if elapsed_pdp_st < 3 */
1150 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1151 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1153 for(i = 0, rra_start = rra_begin;
1154 i < rrd.stat_head->rra_cnt;
1155 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1158 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1160 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1161 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1163 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1164 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1166 rra_current = ftell(rrd_file);
1168 if (rrd_test_error()) break;
1169 /* loop over data soures within each RRA */
1171 ii < rrd.stat_head->ds_cnt;
1174 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1175 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1176 scratch_idx, seasonal_coef);
1178 } /* end RRA Loop */
1179 if (rrd_test_error()) break;
1180 } /* end elapsed_pdp_st loop */
1182 if (rrd_test_error()) break;
1184 /* Ready to write to disk */
1185 /* Move sequentially through the file, writing one RRA at a time.
1186 * Note this architecture divorces the computation of CDP with
1187 * flushing updated RRA entries to disk. */
1188 for(i = 0, rra_start = rra_begin;
1189 i < rrd.stat_head->rra_cnt;
1190 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1192 /* is there anything to write for this RRA? If not, continue. */
1193 if (rra_step_cnt[i] == 0) continue;
1195 /* write the first row */
1197 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1199 rrd.rra_ptr[i].cur_row++;
1200 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1201 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1202 /* positition on the first row */
1203 rra_pos_tmp = rra_start +
1204 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1205 if(rra_pos_tmp != rra_current) {
1206 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1207 rrd_set_error("seek error in rrd");
1210 rra_current = rra_pos_tmp;
1214 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1216 scratch_idx = CDP_primary_val;
1217 if (pcdp_summary != NULL)
1219 rra_time = (current_time - current_time
1220 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1221 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1223 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1224 pcdp_summary, &rra_time);
1225 if (rrd_test_error()) break;
1227 /* write other rows of the bulk update, if any */
1228 scratch_idx = CDP_secondary_val;
1229 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1231 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1234 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1235 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1238 rrd.rra_ptr[i].cur_row = 0;
1239 /* seek back to beginning of current rra */
1240 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1242 rrd_set_error("seek error in rrd");
1246 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1248 rra_current = rra_start;
1250 if (pcdp_summary != NULL)
1252 rra_time = (current_time - current_time
1253 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1254 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1256 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1257 pcdp_summary, &rra_time);
1260 if (rrd_test_error())
1264 /* break out of the argument parsing loop if error_string is set */
1265 if (rrd_test_error()){
1270 } /* endif a pdp_st has occurred */
1271 rrd.live_head->last_up = current_time;
1272 rrd.live_head->last_up_usec = current_time_usec;
1274 } /* function argument loop */
1276 if (seasonal_coef != NULL) free(seasonal_coef);
1277 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278 if (rra_step_cnt != NULL) free(rra_step_cnt);
1279 rpnstack_free(&rpnstack);
1281 /* if we got here and if there is an error and if the file has not been
1282 * written to, then close things up and return. */
1283 if (rrd_test_error()) {
1293 /* aargh ... that was tough ... so many loops ... anyway, its done.
1294 * we just need to write back the live header portion now*/
1296 if (fseek(rrd_file, (sizeof(stat_head_t)
1297 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1298 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1300 rrd_set_error("seek rrd for live header writeback");
1311 if(fwrite( rrd.live_head,
1312 sizeof(live_head_t), 1, rrd_file) != 1){
1313 rrd_set_error("fwrite live_head to rrd");
1324 if(fwrite( &rrd.live_head->last_up,
1325 sizeof(time_t), 1, rrd_file) != 1){
1326 rrd_set_error("fwrite live_head to rrd");
1338 if(fwrite( rrd.pdp_prep,
1340 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1341 rrd_set_error("ftwrite pdp_prep to rrd");
1351 if(fwrite( rrd.cdp_prep,
1353 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1354 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1356 rrd_set_error("ftwrite cdp_prep to rrd");
1366 if(fwrite( rrd.rra_ptr,
1368 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1369 rrd_set_error("fwrite rra_ptr to rrd");
1379 /* OK now close the files and free the memory */
1380 if(fclose(rrd_file) != 0){
1381 rrd_set_error("closing rrd");
1390 /* calling the smoothing code here guarantees at most
1391 * one smoothing operation per rrd_update call. Unfortunately,
1392 * it is possible with bulk updates, or a long-delayed update
1393 * for smoothing to occur off-schedule. This really isn't
1394 * critical except during the burning cycles. */
1395 if (schedule_smooth)
1398 rrd_file = fopen(filename,"r+");
1400 rrd_file = fopen(filename,"rb+");
1402 rra_start = rra_begin;
1403 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1405 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1406 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1409 fprintf(stderr,"Running smoother for rra %ld\n",i);
1411 apply_smoother(&rrd,i,rra_start,rrd_file);
1412 if (rrd_test_error())
1415 rra_start += rrd.rra_def[i].row_cnt
1416 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1429 * get exclusive lock to whole file.
1430 * lock gets removed when we close the file
1432 * returns 0 on success
1435 LockRRD(FILE *rrdfile)
1437 int rrd_fd; /* File descriptor for RRD */
1440 rrd_fd = fileno(rrdfile);
1445 lock.l_type = F_WRLCK; /* exclusive write lock */
1446 lock.l_len = 0; /* whole file */
1447 lock.l_start = 0; /* start of file */
1448 lock.l_whence = SEEK_SET; /* end of file */
1450 stat = fcntl(rrd_fd, F_SETLK, &lock);
1454 if ( _fstat( rrd_fd, &st ) == 0 ) {
1455 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1467 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1468 unsigned short CDP_scratch_idx, FILE *rrd_file,
1469 info_t *pcdp_summary, time_t *rra_time)
1471 unsigned long ds_idx, cdp_idx;
1474 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1476 /* compute the cdp index */
1477 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1479 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1480 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1481 rrd -> rra_def[rra_idx].cf_nam);
1483 if (pcdp_summary != NULL)
1485 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1486 /* append info to the return hash */
1487 pcdp_summary = info_push(pcdp_summary,
1488 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1489 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1490 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1493 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1494 sizeof(rrd_value_t),1,rrd_file) != 1)
1496 rrd_set_error("writing rrd");
1499 *rra_current += sizeof(rrd_value_t);
1501 return (pcdp_summary);