1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.9 2003/04/25 18:35:08 jake
9 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
11 * Revision 1.8 2003/03/31 21:22:12 oetiker
12 * enables RRDtool updates with microsecond or in case of windows millisecond
13 * precision. This is needed to reduce time measurement error when archive step
14 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
16 * Revision 1.7 2003/02/13 07:05:27 oetiker
17 * Find attached the patch I promised to send to you. Please note that there
18 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
19 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
20 * library is identical to librrd, but it contains support code for per-thread
21 * global variables currently used for error information only. This is similar
22 * to how errno per-thread variables are implemented. librrd_th must be linked
23 * alongside of libpthred
25 * There is also a new file "THREADS", holding some documentation.
27 * -- Peter Stamfest <peter@stamfest.at>
29 * Revision 1.6 2002/02/01 20:34:49 oetiker
30 * fixed version number and date/time
32 * Revision 1.5 2001/05/09 05:31:01 oetiker
33 * Bug fix: when update of multiple PDP/CDP RRAs coincided
34 * with interpolation of multiple PDPs an incorrect value was
35 * stored as the CDP. Especially evident for GAUGE data sources.
36 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
38 * Revision 1.4 2001/03/10 23:54:41 oetiker
39 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
40 * parser and calculator from rrd_graph and puts then in a new file,
41 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
42 * clean-up of aberrant behavior stuff, including a bug fix.
43 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
44 * -- Jake Brutlag <jakeb@corp.webtv.net>
46 * Revision 1.3 2001/03/04 13:01:55 oetiker
47 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
48 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
49 * This is backwards compatible! But new files using the Aberrant stuff are not readable
50 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
51 * -- Jake Brutlag <jakeb@corp.webtv.net>
53 * Revision 1.2 2001/03/04 11:14:25 oetiker
54 * added at-style-time@value:value syntax to rrd_update
55 * -- Dave Bodenstab <imdave@mcs.net>
57 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
60 *****************************************************************************/
63 #include <sys/types.h>
67 #include <sys/locking.h>
73 #include "rrd_rpncalc.h"
75 #include "rrd_is_thread_safe.h"
79 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
82 #include <sys/timeb.h>
85 time_t tv_sec; /* seconds */
86 long tv_usec; /* microseconds */
90 int tz_minuteswest; /* minutes W of Greenwich */
91 int tz_dsttime; /* type of dst correction */
94 static gettimeofday(struct timeval *t, struct __timezone *tz) {
96 struct timeb current_time;
98 _ftime(¤t_time);
100 t->tv_sec = current_time.time;
101 t->tv_usec = current_time.millitm * 1000;
106 * normilize time as returned by gettimeofday. usec part must
109 static void normalize_time(struct timeval *t)
113 t->tv_usec += 1000000L;
117 /* Local prototypes */
118 int LockRRD(FILE *rrd_file);
119 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
120 unsigned long *rra_current,
121 unsigned short CDP_scratch_idx, FILE *rrd_file,
122 info_t *pcdp_summary, time_t *rra_time);
123 int rrd_update_r(char *filename, char *template, int argc, char **argv);
124 int _rrd_update(char *filename, char *template, int argc, char **argv,
127 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
132 main(int argc, char **argv){
133 rrd_update(argc,argv);
134 if (rrd_test_error()) {
135 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
136 "Usage: rrdupdate filename\n"
137 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
138 "\t\t\ttime|N:value[:value...]\n\n"
139 "\t\t\tat-time@value[:value...]\n\n"
140 "\t\t\t[ time:value[:value...] ..]\n\n");
142 printf("ERROR: %s\n",rrd_get_error());
150 info_t *rrd_update_v(int argc, char **argv)
152 char *template = NULL;
153 info_t *result = NULL;
157 static struct option long_options[] =
159 {"template", required_argument, 0, 't'},
162 int option_index = 0;
164 opt = getopt_long(argc, argv, "t:",
165 long_options, &option_index);
176 rrd_set_error("unknown option '%s'",argv[optind-1]);
182 /* need at least 2 arguments: filename, data. */
183 if (argc-optind < 2) {
184 rrd_set_error("Not enough arguments");
188 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
189 rc.u_int = _rrd_update(argv[optind], template,
190 argc - optind - 1, argv + optind + 1, result);
191 result->value.u_int = rc.u_int;
197 rrd_update(int argc, char **argv)
199 char *template = NULL;
203 static struct option long_options[] =
205 {"template", required_argument, 0, 't'},
208 int option_index = 0;
210 opt = getopt_long(argc, argv, "t:",
211 long_options, &option_index);
222 rrd_set_error("unknown option '%s'",argv[optind-1]);
227 /* need at least 2 arguments: filename, data. */
228 if (argc-optind < 2) {
229 rrd_set_error("Not enough arguments");
234 rc = rrd_update_r(argv[optind], template,
235 argc - optind - 1, argv + optind + 1);
240 rrd_update_r(char *filename, char *template, int argc, char **argv)
242 return _rrd_update(filename, template, argc, argv, NULL);
246 _rrd_update(char *filename, char *template, int argc, char **argv,
247 info_t *pcdp_summary)
252 unsigned long i,ii,iii=1;
254 unsigned long rra_begin; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer never changes value */
257 unsigned long rra_start; /* byte pointer to the rra
258 * area in the rrd file. this
259 * pointer changes as each rrd is
261 unsigned long rra_current; /* byte pointer to the current write
262 * spot in the rrd file. */
263 unsigned long rra_pos_tmp; /* temporary byte pointer. */
265 pre_int,post_int; /* interval between this and
267 unsigned long proc_pdp_st; /* which pdp_st was the last
269 unsigned long occu_pdp_st; /* when was the pdp_st
270 * before the last update
272 unsigned long proc_pdp_age; /* how old was the data in
273 * the pdp prep area when it
274 * was last updated */
275 unsigned long occu_pdp_age; /* how long ago was the last
277 rrd_value_t *pdp_new; /* prepare the incoming data
278 * to be added the the
280 rrd_value_t *pdp_temp; /* prepare the pdp values
281 * to be added the the
284 long *tmpl_idx; /* index representing the settings
285 transported by the template index */
286 unsigned long tmpl_cnt = 2; /* time and data */
291 time_t rra_time; /* time of update for a RRA */
292 unsigned long current_time_usec; /* microseconds part of current time */
293 struct timeval tmp_time; /* used for time conversion */
296 int schedule_smooth = 0;
297 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
298 /* a vector of future Holt-Winters seasonal coefs */
299 unsigned long elapsed_pdp_st;
300 /* number of elapsed PDP steps since last update */
301 unsigned long *rra_step_cnt = NULL;
302 /* number of rows to be updated in an RRA for a data
304 unsigned long start_pdp_offset;
305 /* number of PDP steps since the last update that
306 * are assigned to the first CDP to be generated
307 * since the last update. */
308 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
311 /* numeric id of the current consolidation function */
312 rpnstack_t rpnstack; /* used for COMPUTE DS */
313 int version; /* rrd version */
315 rpnstack_init(&rpnstack);
317 /* need at least 1 arguments: data. */
319 rrd_set_error("Not enough arguments");
325 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
328 /* initialize time */
329 version = atoi(rrd.stat_head->version);
330 gettimeofday(&tmp_time, 0);
331 normalize_time(&tmp_time);
332 current_time = tmp_time.tv_sec;
334 current_time_usec = tmp_time.tv_usec;
337 current_time_usec = 0;
340 rra_current = rra_start = rra_begin = ftell(rrd_file);
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and ouptut may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input oepration encounters
350 fseek(rrd_file, 0, SEEK_CUR);
353 /* get exclusive lock to whole file.
354 * lock gets removed when we close the file.
356 if (LockRRD(rrd_file) != 0) {
357 rrd_set_error("could not lock RRD");
363 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
364 rrd_set_error("allocating updvals pointer array");
370 if ((pdp_temp = malloc(sizeof(rrd_value_t)
371 *rrd.stat_head->ds_cnt))==NULL){
372 rrd_set_error("allocating pdp_temp ...");
379 if ((tmpl_idx = malloc(sizeof(unsigned long)
380 *(rrd.stat_head->ds_cnt+1)))==NULL){
381 rrd_set_error("allocating tmpl_idx ...");
388 /* initialize template redirector */
389 /* default config example (assume DS 1 is a CDEF DS)
390 tmpl_idx[0] -> 0; (time)
391 tmpl_idx[1] -> 1; (DS 0)
392 tmpl_idx[2] -> 3; (DS 2)
393 tmpl_idx[3] -> 4; (DS 3) */
394 tmpl_idx[0] = 0; /* time */
395 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
397 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
404 unsigned int tmpl_len;
406 tmpl_cnt = 1; /* the first entry is the time */
407 tmpl_len = strlen(template);
408 for(i=0;i<=tmpl_len ;i++) {
409 if (template[i] == ':' || template[i] == '\0') {
411 if (tmpl_cnt>rrd.stat_head->ds_cnt){
412 rrd_set_error("Template contains more DS definitions than RRD");
413 free(updvals); free(pdp_temp);
414 free(tmpl_idx); rrd_free(&rrd);
415 fclose(rrd_file); return(-1);
417 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
418 rrd_set_error("unknown DS name '%s'",dsname);
419 free(updvals); free(pdp_temp);
420 free(tmpl_idx); rrd_free(&rrd);
421 fclose(rrd_file); return(-1);
423 /* the first element is always the time */
424 tmpl_idx[tmpl_cnt-1]++;
425 /* go to the next entry on the template */
426 dsname = &template[i+1];
427 /* fix the damage we did before */
436 if ((pdp_new = malloc(sizeof(rrd_value_t)
437 *rrd.stat_head->ds_cnt))==NULL){
438 rrd_set_error("allocating pdp_new ...");
447 /* loop through the arguments. */
448 for(arg_i=0; arg_i<argc;arg_i++) {
449 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
450 char *step_start = stepper;
452 char *parsetime_error = NULL;
453 enum {atstyle, normal} timesyntax;
454 struct time_value ds_tv;
455 if (stepper == NULL){
456 rrd_set_error("failed duplication argv entry");
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
486 updvals[tmpl_idx[ii]] = stepper;
488 if (*stepper == ':') {
492 updvals[tmpl_idx[ii]] = stepper+1;
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp - current_time) * 1000000L);
535 /* dont do any correction for old version RRDs */
537 current_time_usec = 0;
539 if(current_time <= rrd.live_head->last_up){
540 rrd_set_error("illegal attempt to update using time %ld when "
541 "last update time is %ld (minimum one second step)",
542 current_time, rrd.live_head->last_up);
548 /* seek to the beginning of the rra's */
549 if (rra_current != rra_begin) {
550 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
551 rrd_set_error("seek error in rrd");
555 rra_current = rra_begin;
557 rra_start = rra_begin;
559 /* when was the current pdp started */
560 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
561 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
563 /* when did the last pdp_st occur */
564 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
565 occu_pdp_st = current_time - occu_pdp_age;
566 /* interval = current_time - rrd.live_head->last_up; */
567 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
569 if (occu_pdp_st > proc_pdp_st){
570 /* OK we passed the pdp_st moment*/
571 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572 * occurred before the latest
574 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575 post_int = occu_pdp_age; /* how much after it */
576 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
590 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
591 occu_pdp_age, occu_pdp_st,
592 interval, pre_int, post_int);
595 /* process the data sources and update the pdp_prep
596 * area accordingly */
597 for(i=0;i<rrd.stat_head->ds_cnt;i++){
599 dst_idx= dst_conv(rrd.ds_def[i].dst);
600 /* NOTE: DST_CDEF should never enter this if block, because
601 * updvals[i+1][0] is initialized to 'U'; unless the caller
602 * accidently specified a value for the DST_CDEF. To handle
603 * this case, an extra check is required. */
604 if((updvals[i+1][0] != 'U') &&
605 (dst_idx != DST_CDEF) &&
606 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
608 /* the data source type defines how to process the data */
609 /* pdp_new contains rate * time ... eg the bytes
610 * transferred during the interval. Doing it this way saves
611 * a lot of math operations */
617 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
619 if(dst_idx == DST_COUNTER) {
620 /* simple overflow catcher sugestet by andres kroonmaa */
621 /* this will fail terribly for non 32 or 64 bit counters ... */
622 /* are there any others in SNMP land ? */
623 if (pdp_new[i] < (double)0.0 )
624 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
625 if (pdp_new[i] < (double)0.0 )
626 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
628 rate = pdp_new[i] / interval;
635 pdp_new[i]= atof(updvals[i+1]);
636 rate = pdp_new[i] / interval;
639 pdp_new[i] = atof(updvals[i+1]) * interval;
640 rate = pdp_new[i] / interval;
643 rrd_set_error("rrd contains unknown DS type : '%s'",
647 /* break out of this for loop if the error string is set */
648 if (rrd_test_error()){
651 /* make sure pdp_temp is neither too large or too small
652 * if any of these occur it becomes unknown ...
654 if ( ! isnan(rate) &&
655 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
656 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
657 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
658 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
662 /* no news is news all the same */
666 /* make a copy of the command line argument for the next run */
674 rrd.pdp_prep[i].last_ds,
675 updvals[i+1], pdp_new[i]);
677 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
678 strncpy(rrd.pdp_prep[i].last_ds,
679 updvals[i+1],LAST_DS_LEN-1);
680 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
683 /* break out of the argument parsing loop if the error_string is set */
684 if (rrd_test_error()){
688 /* has a pdp_st moment occurred since the last run ? */
690 if (proc_pdp_st == occu_pdp_st){
691 /* no we have not passed a pdp_st moment. therefore update is simple */
693 for(i=0;i<rrd.stat_head->ds_cnt;i++){
694 if(isnan(pdp_new[i]))
695 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
697 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
704 rrd.pdp_prep[i].scratch[PDP_val].u_val,
705 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
709 /* an pdp_st has occurred. */
711 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
712 * occurred up to the last run.
713 pdp_new[] contains rate*seconds from the latest run.
714 pdp_temp[] will contain the rate for cdp */
716 for(i=0;i<rrd.stat_head->ds_cnt;i++){
717 /* update pdp_prep to the current pdp_st */
718 if(isnan(pdp_new[i]))
719 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
721 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
722 pdp_new[i]/(double)interval*(double)pre_int;
724 /* if too much of the pdp_prep is unknown we dump it */
725 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
726 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
727 (occu_pdp_st-proc_pdp_st <=
728 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
731 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
732 / (double)( occu_pdp_st
734 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
737 /* process CDEF data sources; remember each CDEF DS can
738 * only reference other DS with a lower index number */
739 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
741 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
742 /* substitue data values for OP_VARIABLE nodes */
743 for (ii = 0; rpnp[ii].op != OP_END; ii++)
745 if (rpnp[ii].op == OP_VARIABLE) {
746 rpnp[ii].op = OP_NUMBER;
747 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
750 /* run the rpn calculator */
751 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
753 break; /* exits the data sources pdp_temp loop */
757 /* make pdp_prep ready for the next run */
758 if(isnan(pdp_new[i])){
759 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
760 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
762 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
763 rrd.pdp_prep[i].scratch[PDP_val].u_val =
764 pdp_new[i]/(double)interval*(double)post_int;
772 "new_unkn_sec %5lu\n",
774 rrd.pdp_prep[i].scratch[PDP_val].u_val,
775 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
779 /* if there were errors during the last loop, bail out here */
780 if (rrd_test_error()){
785 /* compute the number of elapsed pdp_st moments */
786 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
788 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
790 if (rra_step_cnt == NULL)
792 rra_step_cnt = (unsigned long *)
793 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
796 for(i = 0, rra_start = rra_begin;
797 i < rrd.stat_head->rra_cnt;
798 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
801 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
802 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
803 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
804 if (start_pdp_offset <= elapsed_pdp_st) {
805 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
806 rrd.rra_def[i].pdp_cnt + 1;
811 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
813 /* If this is a bulk update, we need to skip ahead in the seasonal
814 * arrays so that they will be correct for the next observed value;
815 * note that for the bulk update itself, no update will occur to
816 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
818 if (rra_step_cnt[i] > 2)
820 /* skip update by resetting rra_step_cnt[i],
821 * note that this is not data source specific; this is due
822 * to the bulk update, not a DNAN value for the specific data
825 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
826 &last_seasonal_coef);
827 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
831 /* periodically run a smoother for seasonal effects */
832 /* Need to use first cdp parameter buffer to track
833 * burnin (burnin requires a specific smoothing schedule).
834 * The CDP_init_seasonal parameter is really an RRA level,
835 * not a data source within RRA level parameter, but the rra_def
836 * is read only for rrd_update (not flushed to disk). */
837 iii = i*(rrd.stat_head -> ds_cnt);
838 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
841 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
842 > rrd.rra_def[i].row_cnt - 1) {
843 /* mark off one of the burnin cycles */
844 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
848 /* someone has no doubt invented a trick to deal with this
849 * wrap around, but at least this code is clear. */
850 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
851 rrd.rra_ptr[i].cur_row)
853 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
854 * mapping between PDP and CDP */
855 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
856 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
860 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
861 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
862 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
867 /* can't rely on negative numbers because we are working with
869 /* Don't need modulus here. If we've wrapped more than once, only
870 * one smooth is executed at the end. */
871 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
872 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
873 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
877 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
879 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
886 rra_current = ftell(rrd_file);
887 } /* if cf is DEVSEASONAL or SEASONAL */
889 if (rrd_test_error()) break;
891 /* update CDP_PREP areas */
892 /* loop over data soures within each RRA */
894 ii < rrd.stat_head->ds_cnt;
898 /* iii indexes the CDP prep area for this data source within the RRA */
899 iii=i*rrd.stat_head->ds_cnt+ii;
901 if (rrd.rra_def[i].pdp_cnt > 1) {
903 if (rra_step_cnt[i] > 0) {
904 /* If we are in this block, as least 1 CDP value will be written to
905 * disk, this is the CDP_primary_val entry. If more than 1 value needs
906 * to be written, then the "fill in" value is the CDP_secondary_val
908 if (isnan(pdp_temp[ii]))
910 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
911 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
913 /* CDP_secondary value is the RRA "fill in" value for intermediary
914 * CDP data entries. No matter the CF, the value is the same because
915 * the average, max, min, and last of a list of identical values is
916 * the same, namely, the value itself. */
917 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
920 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
921 > rrd.rra_def[i].pdp_cnt*
922 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
924 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
925 /* initialize carry over */
926 if (current_cf == CF_AVERAGE) {
927 if (isnan(pdp_temp[ii])) {
928 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
930 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
931 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
934 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
937 rrd_value_t cum_val, cur_val;
938 switch (current_cf) {
940 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
941 cur_val = IFDNAN(pdp_temp[ii],0.0);
942 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
943 (cum_val + cur_val * start_pdp_offset) /
944 (rrd.rra_def[i].pdp_cnt
945 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
946 /* initialize carry over value */
947 if (isnan(pdp_temp[ii])) {
948 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
950 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
951 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
955 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
956 cur_val = IFDNAN(pdp_temp[ii],-DINF);
958 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
959 isnan(pdp_temp[ii])) {
961 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
966 if (cur_val > cum_val)
967 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
969 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
970 /* initialize carry over value */
971 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
974 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
975 cur_val = IFDNAN(pdp_temp[ii],DINF);
977 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978 isnan(pdp_temp[ii])) {
980 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
985 if (cur_val < cum_val)
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989 /* initialize carry over value */
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
994 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
995 /* initialize carry over value */
996 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
999 } /* endif meets xff value requirement for a valid value */
1000 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1001 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1002 if (isnan(pdp_temp[ii]))
1003 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1004 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1006 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1007 } else /* rra_step_cnt[i] == 0 */
1010 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1011 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1014 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1015 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1018 if (isnan(pdp_temp[ii])) {
1019 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1020 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1022 if (current_cf == CF_AVERAGE) {
1023 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1026 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1029 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1030 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1033 switch (current_cf) {
1035 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1039 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1040 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1043 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1053 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1054 if (elapsed_pdp_st > 2)
1056 switch (current_cf) {
1059 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1060 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1063 case CF_DEVSEASONAL:
1064 /* need to update cached seasonal values, so they are consistent
1065 * with the bulk update */
1066 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1067 * CDP_last_deviation are the same. */
1068 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1069 last_seasonal_coef[ii];
1070 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1074 /* need to update the null_count and last_null_count.
1075 * even do this for non-DNAN pdp_temp because the
1076 * algorithm is not learning from batch updates. */
1077 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1079 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1083 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1084 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1087 /* do not count missed bulk values as failures */
1088 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1089 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1090 /* need to reset violations buffer.
1091 * could do this more carefully, but for now, just
1092 * assume a bulk update wipes away all violations. */
1093 erase_violations(&rrd, iii, i);
1097 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1099 if (rrd_test_error()) break;
1101 } /* endif data sources loop */
1102 } /* end RRA Loop */
1104 /* this loop is only entered if elapsed_pdp_st < 3 */
1105 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1106 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1108 for(i = 0, rra_start = rra_begin;
1109 i < rrd.stat_head->rra_cnt;
1110 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1113 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1115 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1116 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1118 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1119 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1121 rra_current = ftell(rrd_file);
1123 if (rrd_test_error()) break;
1124 /* loop over data soures within each RRA */
1126 ii < rrd.stat_head->ds_cnt;
1129 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1130 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1131 scratch_idx, seasonal_coef);
1133 } /* end RRA Loop */
1134 if (rrd_test_error()) break;
1135 } /* end elapsed_pdp_st loop */
1137 if (rrd_test_error()) break;
1139 /* Ready to write to disk */
1140 /* Move sequentially through the file, writing one RRA at a time.
1141 * Note this architecture divorces the computation of CDP with
1142 * flushing updated RRA entries to disk. */
1143 for(i = 0, rra_start = rra_begin;
1144 i < rrd.stat_head->rra_cnt;
1145 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1147 /* is there anything to write for this RRA? If not, continue. */
1148 if (rra_step_cnt[i] == 0) continue;
1150 /* write the first row */
1152 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1154 rrd.rra_ptr[i].cur_row++;
1155 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1156 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1157 /* positition on the first row */
1158 rra_pos_tmp = rra_start +
1159 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1160 if(rra_pos_tmp != rra_current) {
1161 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1162 rrd_set_error("seek error in rrd");
1165 rra_current = rra_pos_tmp;
1169 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1171 scratch_idx = CDP_primary_val;
1172 if (pcdp_summary != NULL)
1174 rra_time = (current_time - current_time
1175 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1176 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1178 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1179 pcdp_summary, &rra_time);
1180 if (rrd_test_error()) break;
1182 /* write other rows of the bulk update, if any */
1183 scratch_idx = CDP_secondary_val;
1184 for ( ; rra_step_cnt[i] > 1;
1185 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1187 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1190 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1191 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1194 rrd.rra_ptr[i].cur_row = 0;
1195 /* seek back to beginning of current rra */
1196 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1198 rrd_set_error("seek error in rrd");
1202 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1204 rra_current = rra_start;
1206 if (pcdp_summary != NULL)
1208 rra_time = (current_time - current_time
1209 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1210 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1212 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1213 pcdp_summary, &rra_time);
1216 if (rrd_test_error())
1220 /* break out of the argument parsing loop if error_string is set */
1221 if (rrd_test_error()){
1226 } /* endif a pdp_st has occurred */
1227 rrd.live_head->last_up = current_time;
1228 rrd.live_head->last_up_usec = current_time_usec;
1230 } /* function argument loop */
1232 if (seasonal_coef != NULL) free(seasonal_coef);
1233 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1234 if (rra_step_cnt != NULL) free(rra_step_cnt);
1235 rpnstack_free(&rpnstack);
1237 /* if we got here and if there is an error and if the file has not been
1238 * written to, then close things up and return. */
1239 if (rrd_test_error()) {
1249 /* aargh ... that was tough ... so many loops ... anyway, its done.
1250 * we just need to write back the live header portion now*/
1252 if (fseek(rrd_file, (sizeof(stat_head_t)
1253 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1254 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1256 rrd_set_error("seek rrd for live header writeback");
1267 if(fwrite( rrd.live_head,
1268 sizeof(live_head_t), 1, rrd_file) != 1){
1269 rrd_set_error("fwrite live_head to rrd");
1280 if(fwrite( &rrd.live_head->last_up,
1281 sizeof(time_t), 1, rrd_file) != 1){
1282 rrd_set_error("fwrite live_head to rrd");
1294 if(fwrite( rrd.pdp_prep,
1296 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1297 rrd_set_error("ftwrite pdp_prep to rrd");
1307 if(fwrite( rrd.cdp_prep,
1309 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1310 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1312 rrd_set_error("ftwrite cdp_prep to rrd");
1322 if(fwrite( rrd.rra_ptr,
1324 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1325 rrd_set_error("fwrite rra_ptr to rrd");
1335 /* OK now close the files and free the memory */
1336 if(fclose(rrd_file) != 0){
1337 rrd_set_error("closing rrd");
1346 /* calling the smoothing code here guarantees at most
1347 * one smoothing operation per rrd_update call. Unfortunately,
1348 * it is possible with bulk updates, or a long-delayed update
1349 * for smoothing to occur off-schedule. This really isn't
1350 * critical except during the burning cycles. */
1351 if (schedule_smooth)
1354 rrd_file = fopen(filename,"r+");
1356 rrd_file = fopen(filename,"rb+");
1358 rra_start = rra_begin;
1359 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1361 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1362 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1365 fprintf(stderr,"Running smoother for rra %ld\n",i);
1367 apply_smoother(&rrd,i,rra_start,rrd_file);
1368 if (rrd_test_error())
1371 rra_start += rrd.rra_def[i].row_cnt
1372 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1385 * get exclusive lock to whole file.
1386 * lock gets removed when we close the file
1388 * returns 0 on success
1391 LockRRD(FILE *rrdfile)
1393 int rrd_fd; /* File descriptor for RRD */
1396 rrd_fd = fileno(rrdfile);
1401 lock.l_type = F_WRLCK; /* exclusive write lock */
1402 lock.l_len = 0; /* whole file */
1403 lock.l_start = 0; /* start of file */
1404 lock.l_whence = SEEK_SET; /* end of file */
1406 stat = fcntl(rrd_fd, F_SETLK, &lock);
1410 if ( _fstat( rrd_fd, &st ) == 0 ) {
1411 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1423 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1424 unsigned short CDP_scratch_idx, FILE *rrd_file,
1425 info_t *pcdp_summary, time_t *rra_time)
1427 unsigned long ds_idx, cdp_idx;
1430 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1432 /* compute the cdp index */
1433 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1435 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1436 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1437 rrd -> rra_def[rra_idx].cf_nam);
1439 if (pcdp_summary != NULL)
1441 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1442 /* append info to the return hash */
1443 pcdp_summary = info_push(pcdp_summary,
1444 sprintf_alloc("[%d]RRA[%lu]DS[%s]",
1445 *rra_time, rra_idx, rrd->ds_def[ds_idx].ds_nam),
1448 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1449 sizeof(rrd_value_t),1,rrd_file) != 1)
1451 rrd_set_error("writing rrd");
1454 *rra_current += sizeof(rrd_value_t);
1456 return (pcdp_summary);