1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.12 2003/09/04 13:16:12 oetiker
9 * should not assigne but compare ... grrrrr
11 * Revision 1.11 2003/09/02 21:58:35 oetiker
12 * be pickier about what we accept in rrd_update. Complain if things do not work out
14 * Revision 1.10 2003/04/29 19:14:12 jake
15 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
16 * Also revert accidental addition of -I to aclocal MakeMakefile.
18 * Revision 1.9 2003/04/25 18:35:08 jake
19 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
21 * Revision 1.8 2003/03/31 21:22:12 oetiker
22 * enables RRDtool updates with microsecond or in case of windows millisecond
23 * precision. This is needed to reduce time measurement error when archive step
24 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
26 * Revision 1.7 2003/02/13 07:05:27 oetiker
27 * Find attached the patch I promised to send to you. Please note that there
28 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
29 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
30 * library is identical to librrd, but it contains support code for per-thread
31 * global variables currently used for error information only. This is similar
32 * to how errno per-thread variables are implemented. librrd_th must be linked
33 * alongside of libpthred
35 * There is also a new file "THREADS", holding some documentation.
37 * -- Peter Stamfest <peter@stamfest.at>
39 * Revision 1.6 2002/02/01 20:34:49 oetiker
40 * fixed version number and date/time
42 * Revision 1.5 2001/05/09 05:31:01 oetiker
43 * Bug fix: when update of multiple PDP/CDP RRAs coincided
44 * with interpolation of multiple PDPs an incorrect value was
45 * stored as the CDP. Especially evident for GAUGE data sources.
46 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
48 * Revision 1.4 2001/03/10 23:54:41 oetiker
49 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
50 * parser and calculator from rrd_graph and puts then in a new file,
51 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
52 * clean-up of aberrant behavior stuff, including a bug fix.
53 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
54 * -- Jake Brutlag <jakeb@corp.webtv.net>
56 * Revision 1.3 2001/03/04 13:01:55 oetiker
57 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
58 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
59 * This is backwards compatible! But new files using the Aberrant stuff are not readable
60 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
61 * -- Jake Brutlag <jakeb@corp.webtv.net>
63 * Revision 1.2 2001/03/04 11:14:25 oetiker
64 * added at-style-time@value:value syntax to rrd_update
65 * -- Dave Bodenstab <imdave@mcs.net>
67 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
70 *****************************************************************************/
73 #include <sys/types.h>
77 #include <sys/locking.h>
83 #include "rrd_rpncalc.h"
85 #include "rrd_is_thread_safe.h"
89 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
92 #include <sys/timeb.h>
95 time_t tv_sec; /* seconds */
96 long tv_usec; /* microseconds */
100 int tz_minuteswest; /* minutes W of Greenwich */
101 int tz_dsttime; /* type of dst correction */
104 static gettimeofday(struct timeval *t, struct __timezone *tz) {
106 struct timeb current_time;
108 _ftime(¤t_time);
110 t->tv_sec = current_time.time;
111 t->tv_usec = current_time.millitm * 1000;
116 * normilize time as returned by gettimeofday. usec part must
119 static void normalize_time(struct timeval *t)
123 t->tv_usec += 1000000L;
127 /* Local prototypes */
128 int LockRRD(FILE *rrd_file);
129 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
130 unsigned long *rra_current,
131 unsigned short CDP_scratch_idx, FILE *rrd_file,
132 info_t *pcdp_summary, time_t *rra_time);
133 int rrd_update_r(char *filename, char *template, int argc, char **argv);
134 int _rrd_update(char *filename, char *template, int argc, char **argv,
137 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
142 main(int argc, char **argv){
143 rrd_update(argc,argv);
144 if (rrd_test_error()) {
145 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
146 "Usage: rrdupdate filename\n"
147 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
148 "\t\t\ttime|N:value[:value...]\n\n"
149 "\t\t\tat-time@value[:value...]\n\n"
150 "\t\t\t[ time:value[:value...] ..]\n\n");
152 printf("ERROR: %s\n",rrd_get_error());
160 info_t *rrd_update_v(int argc, char **argv)
162 char *template = NULL;
163 info_t *result = NULL;
167 static struct option long_options[] =
169 {"template", required_argument, 0, 't'},
172 int option_index = 0;
174 opt = getopt_long(argc, argv, "t:",
175 long_options, &option_index);
186 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
198 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
199 rc.u_int = _rrd_update(argv[optind], template,
200 argc - optind - 1, argv + optind + 1, result);
201 result->value.u_int = rc.u_int;
207 rrd_update(int argc, char **argv)
209 char *template = NULL;
213 static struct option long_options[] =
215 {"template", required_argument, 0, 't'},
218 int option_index = 0;
220 opt = getopt_long(argc, argv, "t:",
221 long_options, &option_index);
232 rrd_set_error("unknown option '%s'",argv[optind-1]);
237 /* need at least 2 arguments: filename, data. */
238 if (argc-optind < 2) {
239 rrd_set_error("Not enough arguments");
244 rc = rrd_update_r(argv[optind], template,
245 argc - optind - 1, argv + optind + 1);
250 rrd_update_r(char *filename, char *template, int argc, char **argv)
252 return _rrd_update(filename, template, argc, argv, NULL);
256 _rrd_update(char *filename, char *template, int argc, char **argv,
257 info_t *pcdp_summary)
262 unsigned long i,ii,iii=1;
264 unsigned long rra_begin; /* byte pointer to the rra
265 * area in the rrd file. this
266 * pointer never changes value */
267 unsigned long rra_start; /* byte pointer to the rra
268 * area in the rrd file. this
269 * pointer changes as each rrd is
271 unsigned long rra_current; /* byte pointer to the current write
272 * spot in the rrd file. */
273 unsigned long rra_pos_tmp; /* temporary byte pointer. */
275 pre_int,post_int; /* interval between this and
277 unsigned long proc_pdp_st; /* which pdp_st was the last
279 unsigned long occu_pdp_st; /* when was the pdp_st
280 * before the last update
282 unsigned long proc_pdp_age; /* how old was the data in
283 * the pdp prep area when it
284 * was last updated */
285 unsigned long occu_pdp_age; /* how long ago was the last
287 rrd_value_t *pdp_new; /* prepare the incoming data
288 * to be added the the
290 rrd_value_t *pdp_temp; /* prepare the pdp values
291 * to be added the the
294 long *tmpl_idx; /* index representing the settings
295 transported by the template index */
296 unsigned long tmpl_cnt = 2; /* time and data */
301 time_t rra_time; /* time of update for a RRA */
302 unsigned long current_time_usec; /* microseconds part of current time */
303 struct timeval tmp_time; /* used for time conversion */
306 int schedule_smooth = 0;
307 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
308 /* a vector of future Holt-Winters seasonal coefs */
309 unsigned long elapsed_pdp_st;
310 /* number of elapsed PDP steps since last update */
311 unsigned long *rra_step_cnt = NULL;
312 /* number of rows to be updated in an RRA for a data
314 unsigned long start_pdp_offset;
315 /* number of PDP steps since the last update that
316 * are assigned to the first CDP to be generated
317 * since the last update. */
318 unsigned short scratch_idx;
319 /* index into the CDP scratch array */
320 enum cf_en current_cf;
321 /* numeric id of the current consolidation function */
322 rpnstack_t rpnstack; /* used for COMPUTE DS */
323 int version; /* rrd version */
324 char *endptr; /* used in the conversion */
326 rpnstack_init(&rpnstack);
328 /* need at least 1 arguments: data. */
330 rrd_set_error("Not enough arguments");
336 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
339 /* initialize time */
340 version = atoi(rrd.stat_head->version);
341 gettimeofday(&tmp_time, 0);
342 normalize_time(&tmp_time);
343 current_time = tmp_time.tv_sec;
345 current_time_usec = tmp_time.tv_usec;
348 current_time_usec = 0;
351 rra_current = rra_start = rra_begin = ftell(rrd_file);
352 /* This is defined in the ANSI C standard, section 7.9.5.3:
354 When a file is opened with udpate mode ('+' as the second
355 or third character in the ... list of mode argument
356 variables), both input and ouptut may be performed on the
357 associated stream. However, ... input may not be directly
358 followed by output without an intervening call to a file
359 positioning function, unless the input oepration encounters
361 fseek(rrd_file, 0, SEEK_CUR);
364 /* get exclusive lock to whole file.
365 * lock gets removed when we close the file.
367 if (LockRRD(rrd_file) != 0) {
368 rrd_set_error("could not lock RRD");
374 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
375 rrd_set_error("allocating updvals pointer array");
381 if ((pdp_temp = malloc(sizeof(rrd_value_t)
382 *rrd.stat_head->ds_cnt))==NULL){
383 rrd_set_error("allocating pdp_temp ...");
390 if ((tmpl_idx = malloc(sizeof(unsigned long)
391 *(rrd.stat_head->ds_cnt+1)))==NULL){
392 rrd_set_error("allocating tmpl_idx ...");
399 /* initialize template redirector */
400 /* default config example (assume DS 1 is a CDEF DS)
401 tmpl_idx[0] -> 0; (time)
402 tmpl_idx[1] -> 1; (DS 0)
403 tmpl_idx[2] -> 3; (DS 2)
404 tmpl_idx[3] -> 4; (DS 3) */
405 tmpl_idx[0] = 0; /* time */
406 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
408 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
415 unsigned int tmpl_len;
417 tmpl_cnt = 1; /* the first entry is the time */
418 tmpl_len = strlen(template);
419 for(i=0;i<=tmpl_len ;i++) {
420 if (template[i] == ':' || template[i] == '\0') {
422 if (tmpl_cnt>rrd.stat_head->ds_cnt){
423 rrd_set_error("Template contains more DS definitions than RRD");
424 free(updvals); free(pdp_temp);
425 free(tmpl_idx); rrd_free(&rrd);
426 fclose(rrd_file); return(-1);
428 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
429 rrd_set_error("unknown DS name '%s'",dsname);
430 free(updvals); free(pdp_temp);
431 free(tmpl_idx); rrd_free(&rrd);
432 fclose(rrd_file); return(-1);
434 /* the first element is always the time */
435 tmpl_idx[tmpl_cnt-1]++;
436 /* go to the next entry on the template */
437 dsname = &template[i+1];
438 /* fix the damage we did before */
447 if ((pdp_new = malloc(sizeof(rrd_value_t)
448 *rrd.stat_head->ds_cnt))==NULL){
449 rrd_set_error("allocating pdp_new ...");
458 /* loop through the arguments. */
459 for(arg_i=0; arg_i<argc;arg_i++) {
460 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
461 char *step_start = stepper;
463 char *parsetime_error = NULL;
464 enum {atstyle, normal} timesyntax;
465 struct time_value ds_tv;
466 if (stepper == NULL){
467 rrd_set_error("failed duplication argv entry");
475 /* initialize all ds input to unknown except the first one
476 which has always got to be set */
477 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
478 strcpy(stepper,argv[arg_i]);
480 /* separate all ds elements; first must be examined separately
481 due to alternate time syntax */
482 if ((p=strchr(stepper,'@'))!=NULL) {
483 timesyntax = atstyle;
486 } else if ((p=strchr(stepper,':'))!=NULL) {
491 rrd_set_error("expected timestamp not found in data source from %s:...",
497 updvals[tmpl_idx[ii]] = stepper;
499 if (*stepper == ':') {
503 updvals[tmpl_idx[ii]] = stepper+1;
509 if (ii != tmpl_cnt-1) {
510 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
511 tmpl_cnt-1, ii, argv[arg_i]);
516 /* get the time from the reading ... handle N */
517 if (timesyntax == atstyle) {
518 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
519 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
523 if (ds_tv.type == RELATIVE_TO_END_TIME ||
524 ds_tv.type == RELATIVE_TO_START_TIME) {
525 rrd_set_error("specifying time relative to the 'start' "
526 "or 'end' makes no sense here: %s",
532 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
533 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
535 } else if (strcmp(updvals[0],"N")==0){
536 gettimeofday(&tmp_time, 0);
537 normalize_time(&tmp_time);
538 current_time = tmp_time.tv_sec;
539 current_time_usec = tmp_time.tv_usec;
542 tmp = strtod(updvals[0], 0);
543 current_time = floor(tmp);
544 current_time_usec = (long)((tmp - current_time) * 1000000L);
546 /* dont do any correction for old version RRDs */
548 current_time_usec = 0;
550 if(current_time <= rrd.live_head->last_up){
551 rrd_set_error("illegal attempt to update using time %ld when "
552 "last update time is %ld (minimum one second step)",
553 current_time, rrd.live_head->last_up);
559 /* seek to the beginning of the rra's */
560 if (rra_current != rra_begin) {
561 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
562 rrd_set_error("seek error in rrd");
566 rra_current = rra_begin;
568 rra_start = rra_begin;
570 /* when was the current pdp started */
571 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
572 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
574 /* when did the last pdp_st occur */
575 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
576 occu_pdp_st = current_time - occu_pdp_age;
577 /* interval = current_time - rrd.live_head->last_up; */
578 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
580 if (occu_pdp_st > proc_pdp_st){
581 /* OK we passed the pdp_st moment*/
582 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
583 * occurred before the latest
585 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
586 post_int = occu_pdp_age; /* how much after it */
587 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
601 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
602 occu_pdp_age, occu_pdp_st,
603 interval, pre_int, post_int);
606 /* process the data sources and update the pdp_prep
607 * area accordingly */
608 for(i=0;i<rrd.stat_head->ds_cnt;i++){
610 dst_idx= dst_conv(rrd.ds_def[i].dst);
611 /* NOTE: DST_CDEF should never enter this if block, because
612 * updvals[i+1][0] is initialized to 'U'; unless the caller
613 * accidently specified a value for the DST_CDEF. To handle
614 * this case, an extra check is required. */
615 if((updvals[i+1][0] != 'U') &&
616 (dst_idx != DST_CDEF) &&
617 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
619 /* the data source type defines how to process the data */
620 /* pdp_new contains rate * time ... eg the bytes
621 * transferred during the interval. Doing it this way saves
622 * a lot of math operations */
628 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
629 for(ii=0;updvals[i+1][ii] != '\0';ii++){
630 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
631 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
635 if (rrd_test_error()){
638 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
639 if(dst_idx == DST_COUNTER) {
640 /* simple overflow catcher sugestet by andres kroonmaa */
641 /* this will fail terribly for non 32 or 64 bit counters ... */
642 /* are there any others in SNMP land ? */
643 if (pdp_new[i] < (double)0.0 )
644 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
645 if (pdp_new[i] < (double)0.0 )
646 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
648 rate = pdp_new[i] / interval;
656 pdp_new[i] = strtod(updvals[i+1],&endptr);
658 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661 if (endptr[0] != '\0'){
662 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665 rate = pdp_new[i] / interval;
669 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
671 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
674 if (endptr[0] != '\0'){
675 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
678 rate = pdp_new[i] / interval;
681 rrd_set_error("rrd contains unknown DS type : '%s'",
685 /* break out of this for loop if the error string is set */
686 if (rrd_test_error()){
689 /* make sure pdp_temp is neither too large or too small
690 * if any of these occur it becomes unknown ...
692 if ( ! isnan(rate) &&
693 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
694 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
695 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
696 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
700 /* no news is news all the same */
704 /* make a copy of the command line argument for the next run */
712 rrd.pdp_prep[i].last_ds,
713 updvals[i+1], pdp_new[i]);
715 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
716 strncpy(rrd.pdp_prep[i].last_ds,
717 updvals[i+1],LAST_DS_LEN-1);
718 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
721 /* break out of the argument parsing loop if the error_string is set */
722 if (rrd_test_error()){
726 /* has a pdp_st moment occurred since the last run ? */
728 if (proc_pdp_st == occu_pdp_st){
729 /* no we have not passed a pdp_st moment. therefore update is simple */
731 for(i=0;i<rrd.stat_head->ds_cnt;i++){
732 if(isnan(pdp_new[i]))
733 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
735 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
742 rrd.pdp_prep[i].scratch[PDP_val].u_val,
743 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 /* an pdp_st has occurred. */
749 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
750 * occurred up to the last run.
751 pdp_new[] contains rate*seconds from the latest run.
752 pdp_temp[] will contain the rate for cdp */
754 for(i=0;i<rrd.stat_head->ds_cnt;i++){
755 /* update pdp_prep to the current pdp_st */
756 if(isnan(pdp_new[i]))
757 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
759 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
760 pdp_new[i]/(double)interval*(double)pre_int;
762 /* if too much of the pdp_prep is unknown we dump it */
763 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
764 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
765 (occu_pdp_st-proc_pdp_st <=
766 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
769 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
770 / (double)( occu_pdp_st
772 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
775 /* process CDEF data sources; remember each CDEF DS can
776 * only reference other DS with a lower index number */
777 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
779 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
780 /* substitue data values for OP_VARIABLE nodes */
781 for (ii = 0; rpnp[ii].op != OP_END; ii++)
783 if (rpnp[ii].op == OP_VARIABLE) {
784 rpnp[ii].op = OP_NUMBER;
785 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
788 /* run the rpn calculator */
789 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
791 break; /* exits the data sources pdp_temp loop */
795 /* make pdp_prep ready for the next run */
796 if(isnan(pdp_new[i])){
797 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
798 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
800 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
801 rrd.pdp_prep[i].scratch[PDP_val].u_val =
802 pdp_new[i]/(double)interval*(double)post_int;
810 "new_unkn_sec %5lu\n",
812 rrd.pdp_prep[i].scratch[PDP_val].u_val,
813 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
817 /* if there were errors during the last loop, bail out here */
818 if (rrd_test_error()){
823 /* compute the number of elapsed pdp_st moments */
824 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
826 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
828 if (rra_step_cnt == NULL)
830 rra_step_cnt = (unsigned long *)
831 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
834 for(i = 0, rra_start = rra_begin;
835 i < rrd.stat_head->rra_cnt;
836 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
839 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
840 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
841 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
842 if (start_pdp_offset <= elapsed_pdp_st) {
843 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
844 rrd.rra_def[i].pdp_cnt + 1;
849 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
851 /* If this is a bulk update, we need to skip ahead in the seasonal
852 * arrays so that they will be correct for the next observed value;
853 * note that for the bulk update itself, no update will occur to
854 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
856 if (rra_step_cnt[i] > 2)
858 /* skip update by resetting rra_step_cnt[i],
859 * note that this is not data source specific; this is due
860 * to the bulk update, not a DNAN value for the specific data
863 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
864 &last_seasonal_coef);
865 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
869 /* periodically run a smoother for seasonal effects */
870 /* Need to use first cdp parameter buffer to track
871 * burnin (burnin requires a specific smoothing schedule).
872 * The CDP_init_seasonal parameter is really an RRA level,
873 * not a data source within RRA level parameter, but the rra_def
874 * is read only for rrd_update (not flushed to disk). */
875 iii = i*(rrd.stat_head -> ds_cnt);
876 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
879 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
880 > rrd.rra_def[i].row_cnt - 1) {
881 /* mark off one of the burnin cycles */
882 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
886 /* someone has no doubt invented a trick to deal with this
887 * wrap around, but at least this code is clear. */
888 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
889 rrd.rra_ptr[i].cur_row)
891 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
892 * mapping between PDP and CDP */
893 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
894 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
898 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
899 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
900 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 /* can't rely on negative numbers because we are working with
907 /* Don't need modulus here. If we've wrapped more than once, only
908 * one smooth is executed at the end. */
909 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
910 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
911 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
916 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
917 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
924 rra_current = ftell(rrd_file);
925 } /* if cf is DEVSEASONAL or SEASONAL */
927 if (rrd_test_error()) break;
929 /* update CDP_PREP areas */
930 /* loop over data soures within each RRA */
932 ii < rrd.stat_head->ds_cnt;
936 /* iii indexes the CDP prep area for this data source within the RRA */
937 iii=i*rrd.stat_head->ds_cnt+ii;
939 if (rrd.rra_def[i].pdp_cnt > 1) {
941 if (rra_step_cnt[i] > 0) {
942 /* If we are in this block, as least 1 CDP value will be written to
943 * disk, this is the CDP_primary_val entry. If more than 1 value needs
944 * to be written, then the "fill in" value is the CDP_secondary_val
946 if (isnan(pdp_temp[ii]))
948 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
949 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
951 /* CDP_secondary value is the RRA "fill in" value for intermediary
952 * CDP data entries. No matter the CF, the value is the same because
953 * the average, max, min, and last of a list of identical values is
954 * the same, namely, the value itself. */
955 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
958 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
959 > rrd.rra_def[i].pdp_cnt*
960 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
962 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
963 /* initialize carry over */
964 if (current_cf == CF_AVERAGE) {
965 if (isnan(pdp_temp[ii])) {
966 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
968 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
969 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
972 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
975 rrd_value_t cum_val, cur_val;
976 switch (current_cf) {
978 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
979 cur_val = IFDNAN(pdp_temp[ii],0.0);
980 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
981 (cum_val + cur_val * start_pdp_offset) /
982 (rrd.rra_def[i].pdp_cnt
983 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
984 /* initialize carry over value */
985 if (isnan(pdp_temp[ii])) {
986 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
989 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
993 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
994 cur_val = IFDNAN(pdp_temp[ii],-DINF);
996 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
997 isnan(pdp_temp[ii])) {
999 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1004 if (cur_val > cum_val)
1005 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1007 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1008 /* initialize carry over value */
1009 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1012 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1013 cur_val = IFDNAN(pdp_temp[ii],DINF);
1015 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1016 isnan(pdp_temp[ii])) {
1018 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1023 if (cur_val < cum_val)
1024 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1026 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1027 /* initialize carry over value */
1028 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1032 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1033 /* initialize carry over value */
1034 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1037 } /* endif meets xff value requirement for a valid value */
1038 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1039 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1040 if (isnan(pdp_temp[ii]))
1041 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1042 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1044 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1045 } else /* rra_step_cnt[i] == 0 */
1048 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1049 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1052 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1053 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1056 if (isnan(pdp_temp[ii])) {
1057 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1058 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1060 if (current_cf == CF_AVERAGE) {
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1064 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1067 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1068 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1071 switch (current_cf) {
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1077 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1078 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1081 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1082 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1092 if (elapsed_pdp_st > 2)
1094 switch (current_cf) {
1097 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1098 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1101 case CF_DEVSEASONAL:
1102 /* need to update cached seasonal values, so they are consistent
1103 * with the bulk update */
1104 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1105 * CDP_last_deviation are the same. */
1106 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1107 last_seasonal_coef[ii];
1108 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1112 /* need to update the null_count and last_null_count.
1113 * even do this for non-DNAN pdp_temp because the
1114 * algorithm is not learning from batch updates. */
1115 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1117 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1121 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1122 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1125 /* do not count missed bulk values as failures */
1126 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1127 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1128 /* need to reset violations buffer.
1129 * could do this more carefully, but for now, just
1130 * assume a bulk update wipes away all violations. */
1131 erase_violations(&rrd, iii, i);
1135 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1137 if (rrd_test_error()) break;
1139 } /* endif data sources loop */
1140 } /* end RRA Loop */
1142 /* this loop is only entered if elapsed_pdp_st < 3 */
1143 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1144 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1146 for(i = 0, rra_start = rra_begin;
1147 i < rrd.stat_head->rra_cnt;
1148 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1151 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1153 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1154 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1156 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1157 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1159 rra_current = ftell(rrd_file);
1161 if (rrd_test_error()) break;
1162 /* loop over data soures within each RRA */
1164 ii < rrd.stat_head->ds_cnt;
1167 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1168 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1169 scratch_idx, seasonal_coef);
1171 } /* end RRA Loop */
1172 if (rrd_test_error()) break;
1173 } /* end elapsed_pdp_st loop */
1175 if (rrd_test_error()) break;
1177 /* Ready to write to disk */
1178 /* Move sequentially through the file, writing one RRA at a time.
1179 * Note this architecture divorces the computation of CDP with
1180 * flushing updated RRA entries to disk. */
1181 for(i = 0, rra_start = rra_begin;
1182 i < rrd.stat_head->rra_cnt;
1183 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1185 /* is there anything to write for this RRA? If not, continue. */
1186 if (rra_step_cnt[i] == 0) continue;
1188 /* write the first row */
1190 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1192 rrd.rra_ptr[i].cur_row++;
1193 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1194 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1195 /* positition on the first row */
1196 rra_pos_tmp = rra_start +
1197 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1198 if(rra_pos_tmp != rra_current) {
1199 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1200 rrd_set_error("seek error in rrd");
1203 rra_current = rra_pos_tmp;
1207 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1209 scratch_idx = CDP_primary_val;
1210 if (pcdp_summary != NULL)
1212 rra_time = (current_time - current_time
1213 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1214 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1216 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1217 pcdp_summary, &rra_time);
1218 if (rrd_test_error()) break;
1220 /* write other rows of the bulk update, if any */
1221 scratch_idx = CDP_secondary_val;
1222 for ( ; rra_step_cnt[i] > 1;
1223 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1225 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1228 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1229 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1232 rrd.rra_ptr[i].cur_row = 0;
1233 /* seek back to beginning of current rra */
1234 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1236 rrd_set_error("seek error in rrd");
1240 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1242 rra_current = rra_start;
1244 if (pcdp_summary != NULL)
1246 rra_time = (current_time - current_time
1247 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1248 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1250 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1251 pcdp_summary, &rra_time);
1254 if (rrd_test_error())
1258 /* break out of the argument parsing loop if error_string is set */
1259 if (rrd_test_error()){
1264 } /* endif a pdp_st has occurred */
1265 rrd.live_head->last_up = current_time;
1266 rrd.live_head->last_up_usec = current_time_usec;
1268 } /* function argument loop */
1270 if (seasonal_coef != NULL) free(seasonal_coef);
1271 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1272 if (rra_step_cnt != NULL) free(rra_step_cnt);
1273 rpnstack_free(&rpnstack);
1275 /* if we got here and if there is an error and if the file has not been
1276 * written to, then close things up and return. */
1277 if (rrd_test_error()) {
1287 /* aargh ... that was tough ... so many loops ... anyway, its done.
1288 * we just need to write back the live header portion now*/
1290 if (fseek(rrd_file, (sizeof(stat_head_t)
1291 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1292 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1294 rrd_set_error("seek rrd for live header writeback");
1305 if(fwrite( rrd.live_head,
1306 sizeof(live_head_t), 1, rrd_file) != 1){
1307 rrd_set_error("fwrite live_head to rrd");
1318 if(fwrite( &rrd.live_head->last_up,
1319 sizeof(time_t), 1, rrd_file) != 1){
1320 rrd_set_error("fwrite live_head to rrd");
1332 if(fwrite( rrd.pdp_prep,
1334 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1335 rrd_set_error("ftwrite pdp_prep to rrd");
1345 if(fwrite( rrd.cdp_prep,
1347 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1348 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1350 rrd_set_error("ftwrite cdp_prep to rrd");
1360 if(fwrite( rrd.rra_ptr,
1362 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1363 rrd_set_error("fwrite rra_ptr to rrd");
1373 /* OK now close the files and free the memory */
1374 if(fclose(rrd_file) != 0){
1375 rrd_set_error("closing rrd");
1384 /* calling the smoothing code here guarantees at most
1385 * one smoothing operation per rrd_update call. Unfortunately,
1386 * it is possible with bulk updates, or a long-delayed update
1387 * for smoothing to occur off-schedule. This really isn't
1388 * critical except during the burning cycles. */
1389 if (schedule_smooth)
1392 rrd_file = fopen(filename,"r+");
1394 rrd_file = fopen(filename,"rb+");
1396 rra_start = rra_begin;
1397 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1399 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1400 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1403 fprintf(stderr,"Running smoother for rra %ld\n",i);
1405 apply_smoother(&rrd,i,rra_start,rrd_file);
1406 if (rrd_test_error())
1409 rra_start += rrd.rra_def[i].row_cnt
1410 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1423 * get exclusive lock to whole file.
1424 * lock gets removed when we close the file
1426 * returns 0 on success
1429 LockRRD(FILE *rrdfile)
1431 int rrd_fd; /* File descriptor for RRD */
1434 rrd_fd = fileno(rrdfile);
1439 lock.l_type = F_WRLCK; /* exclusive write lock */
1440 lock.l_len = 0; /* whole file */
1441 lock.l_start = 0; /* start of file */
1442 lock.l_whence = SEEK_SET; /* end of file */
1444 stat = fcntl(rrd_fd, F_SETLK, &lock);
1448 if ( _fstat( rrd_fd, &st ) == 0 ) {
1449 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1461 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1462 unsigned short CDP_scratch_idx, FILE *rrd_file,
1463 info_t *pcdp_summary, time_t *rra_time)
1465 unsigned long ds_idx, cdp_idx;
1468 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1470 /* compute the cdp index */
1471 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1473 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1474 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1475 rrd -> rra_def[rra_idx].cf_nam);
1477 if (pcdp_summary != NULL)
1479 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1480 /* append info to the return hash */
1481 pcdp_summary = info_push(pcdp_summary,
1482 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1483 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1484 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1487 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1488 sizeof(rrd_value_t),1,rrd_file) != 1)
1490 rrd_set_error("writing rrd");
1493 *rra_current += sizeof(rrd_value_t);
1495 return (pcdp_summary);