+ rate = pdp_new[ds_idx] / interval;
+ break;
+ default:
+ rrd_set_error("rrd contains unknown DS type : '%s'",
+ rrd->ds_def[ds_idx].dst);
+ return -1;
+ }
+ /* break out of this for loop if the error string is set */
+ if (rrd_test_error()) {
+ return -1;
+ }
+ /* make sure pdp_temp is neither too large or too small
+ * if any of these occur it becomes unknown ...
+ * sorry folks ... */
+ if (!isnan(rate) &&
+ ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
+ rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
+ (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
+ rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
+ pdp_new[ds_idx] = DNAN;
+ }
+ } else {
+ /* no news is news all the same */
+ pdp_new[ds_idx] = DNAN;
+ }
+
+
+ /* make a copy of the command line argument for the next run */
+#ifdef DEBUG
+ fprintf(stderr, "prep ds[%lu]\t"
+ "last_arg '%s'\t"
+ "this_arg '%s'\t"
+ "pdp_new %10.2f\n",
+ ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
+ pdp_new[ds_idx]);
+#endif
+ strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
+ LAST_DS_LEN - 1);
+ rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
+ }
+ return 0;
+}
+
+/*
+ * How many PDP steps have elapsed since the last update? Returns the answer,
+ * and stores the time between the last update and the last PDP in pre_time,
+ * and the time between the last PDP and the current time in post_int.
+ */
+static int calculate_elapsed_steps(
+ rrd_t *rrd,
+ unsigned long current_time,
+ unsigned long current_time_usec,
+ double interval,
+ double *pre_int,
+ double *post_int,
+ unsigned long *proc_pdp_cnt)
+{
+ unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
+ unsigned long occu_pdp_st; /* when was the pdp_st before the last update
+ * time */
+ unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
+ * when it was last updated */
+ unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
+
+ /* when was the current pdp started */
+ proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
+ proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
+
+ /* when did the last pdp_st occur */
+ occu_pdp_age = current_time % rrd->stat_head->pdp_step;
+ occu_pdp_st = current_time - occu_pdp_age;
+
+ if (occu_pdp_st > proc_pdp_st) {
+ /* OK we passed the pdp_st moment */
+ *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
+ * occurred before the latest
+ * pdp_st moment*/
+ *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
+ *post_int = occu_pdp_age; /* how much after it */
+ *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
+ } else {
+ *pre_int = interval;
+ *post_int = 0;
+ }
+
+ *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
+
+#ifdef DEBUG
+ printf("proc_pdp_age %lu\t"
+ "proc_pdp_st %lu\t"
+ "occu_pfp_age %lu\t"
+ "occu_pdp_st %lu\t"
+ "int %lf\t"
+ "pre_int %lf\t"
+ "post_int %lf\n", proc_pdp_age, proc_pdp_st,
+ occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
+#endif
+
+ /* compute the number of elapsed pdp_st moments */
+ return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
+}
+
+/*
+ * Increment the PDP values by the values in pdp_new, or else initialize them.
+ */
+static void simple_update(
+ rrd_t *rrd,
+ double interval,
+ rrd_value_t *pdp_new)
+{
+ int i;
+
+ for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
+ if (isnan(pdp_new[i])) {
+ /* this is not really accurate if we use subsecond data arrival time
+ should have thought of it when going subsecond resolution ...
+ sorry next format change we will have it! */
+ rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
+ floor(interval);
+ } else {
+ if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
+ rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
+ } else {
+ rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
+ }
+ }
+#ifdef DEBUG
+ fprintf(stderr,
+ "NO PDP ds[%i]\t"
+ "value %10.2f\t"
+ "unkn_sec %5lu\n",
+ i,
+ rrd->pdp_prep[i].scratch[PDP_val].u_val,
+ rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
+#endif
+ }
+}
+
+/*
+ * Call process_pdp_st for each DS.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int process_all_pdp_st(
+ rrd_t *rrd,
+ double interval,
+ double pre_int,
+ double post_int,
+ unsigned long elapsed_pdp_st,
+ rrd_value_t *pdp_new,
+ rrd_value_t *pdp_temp)
+{
+ unsigned long ds_idx;
+
+ /* in pdp_prep[].scratch[PDP_val].u_val we have collected
+ rate*seconds which occurred up to the last run.
+ pdp_new[] contains rate*seconds from the latest run.
+ pdp_temp[] will contain the rate for cdp */
+
+ for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
+ if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
+ elapsed_pdp_st * rrd->stat_head->pdp_step,
+ pdp_new, pdp_temp) == -1) {
+ return -1;
+ }
+#ifdef DEBUG
+ fprintf(stderr, "PDP UPD ds[%lu]\t"
+ "pdp_temp %10.2f\t"
+ "new_prep %10.2f\t"
+ "new_unkn_sec %5lu\n",
+ ds_idx, pdp_temp[ds_idx],
+ rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
+ rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
+#endif
+ }
+ return 0;
+}
+
+/*
+ * Process an update that occurs after one of the PDP moments.
+ * Increments the PDP value, sets NAN if time greater than the
+ * heartbeats have elapsed, processes CDEFs.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int process_pdp_st(
+ rrd_t *rrd,
+ unsigned long ds_idx,
+ double interval,
+ double pre_int,
+ double post_int,
+ long diff_pdp_st,
+ rrd_value_t *pdp_new,
+ rrd_value_t *pdp_temp)
+{
+ int i;
+
+ /* update pdp_prep to the current pdp_st. */
+ double pre_unknown = 0.0;
+ unival *scratch = rrd->pdp_prep[ds_idx].scratch;
+ unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
+
+ rpnstack_t rpnstack; /* used for COMPUTE DS */
+
+ rpnstack_init(&rpnstack);
+
+
+ if (isnan(pdp_new[ds_idx])) {
+ /* a final bit of unknown to be added bevore calculation
+ we use a temporary variable for this so that we
+ don't have to turn integer lines before using the value */
+ pre_unknown = pre_int;
+ } else {
+ if (isnan(scratch[PDP_val].u_val)) {
+ scratch[PDP_val].u_val = 0;
+ }
+ scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
+ }
+
+ /* if too much of the pdp_prep is unknown we dump it */
+ /* if the interval is larger thatn mrhb we get NAN */
+ if ((interval > mrhb) ||
+ (diff_pdp_st <= (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
+ pdp_temp[ds_idx] = DNAN;
+ } else {
+ pdp_temp[ds_idx] = scratch[PDP_val].u_val /
+ ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
+ pre_unknown);
+ }
+
+ /* process CDEF data sources; remember each CDEF DS can
+ * only reference other DS with a lower index number */
+ if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
+ rpnp_t *rpnp;
+
+ rpnp =
+ rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
+ /* substitute data values for OP_VARIABLE nodes */
+ for (i = 0; rpnp[i].op != OP_END; i++) {
+ if (rpnp[i].op == OP_VARIABLE) {
+ rpnp[i].op = OP_NUMBER;
+ rpnp[i].val = pdp_temp[rpnp[i].ptr];
+ }
+ }
+ /* run the rpn calculator */
+ if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
+ free(rpnp);
+ rpnstack_free(&rpnstack);
+ return -1;
+ }
+ }
+
+ /* make pdp_prep ready for the next run */
+ if (isnan(pdp_new[ds_idx])) {
+ /* this is not realy accurate if we use subsecond data arival time
+ should have thought of it when going subsecond resolution ...
+ sorry next format change we will have it! */
+ scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
+ scratch[PDP_val].u_val = DNAN;
+ } else {
+ scratch[PDP_unkn_sec_cnt].u_cnt = 0;
+ scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
+ }
+ rpnstack_free(&rpnstack);
+ return 0;
+}
+
+/*
+ * Iterate over all the RRAs for a given DS and:
+ * 1. Decide whether to schedule a smooth later
+ * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
+ * 3. Update the CDP
+ *
+ * Returns 0 on success, -1 on error
+ */
+static int update_all_cdp_prep(
+ rrd_t *rrd,
+ unsigned long *rra_step_cnt,
+ unsigned long rra_begin,
+ rrd_file_t *rrd_file,
+ unsigned long elapsed_pdp_st,
+ unsigned long proc_pdp_cnt,
+ rrd_value_t **last_seasonal_coef,
+ rrd_value_t **seasonal_coef,
+ rrd_value_t *pdp_temp,
+ unsigned long *rra_current,
+ unsigned long *skip_update,
+ int *schedule_smooth)
+{
+ unsigned long rra_idx;
+
+ /* index into the CDP scratch array */
+ enum cf_en current_cf;
+ unsigned long rra_start;
+
+ /* number of rows to be updated in an RRA for a data value. */
+ unsigned long start_pdp_offset;
+
+ rra_start = rra_begin;
+ for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
+ current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
+ start_pdp_offset =
+ rrd->rra_def[rra_idx].pdp_cnt -
+ proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
+ skip_update[rra_idx] = 0;
+ if (start_pdp_offset <= elapsed_pdp_st) {
+ rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
+ rrd->rra_def[rra_idx].pdp_cnt + 1;
+ } else {
+ rra_step_cnt[rra_idx] = 0;
+ }
+
+ if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
+ /* If this is a bulk update, we need to skip ahead in the seasonal arrays
+ * so that they will be correct for the next observed value; note that for
+ * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
+ * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
+ if (rra_step_cnt[rra_idx] > 1) {
+ skip_update[rra_idx] = 1;
+ lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
+ elapsed_pdp_st, last_seasonal_coef);
+ lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
+ elapsed_pdp_st + 1, seasonal_coef);
+ }
+ /* periodically run a smoother for seasonal effects */
+ if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
+#ifdef DEBUG
+ fprintf(stderr,
+ "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
+ rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
+ rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
+ u_cnt);
+#endif
+ *schedule_smooth = 1;
+ }
+ *rra_current = rrd_tell(rrd_file);
+ }
+ if (rrd_test_error())
+ return -1;
+
+ if (update_cdp_prep
+ (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
+ pdp_temp, *last_seasonal_coef, *seasonal_coef,
+ current_cf) == -1) {
+ return -1;
+ }
+ rra_start +=
+ rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
+ sizeof(rrd_value_t);
+ }
+ return 0;
+}
+
+/*
+ * Are we due for a smooth? Also increments our position in the burn-in cycle.
+ */
+static int do_schedule_smooth(
+ rrd_t *rrd,
+ unsigned long rra_idx,
+ unsigned long elapsed_pdp_st)
+{
+ unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
+ unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
+ unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
+ unsigned long seasonal_smooth_idx =
+ rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
+ unsigned long *init_seasonal =
+ &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
+
+ /* Need to use first cdp parameter buffer to track burnin (burnin requires
+ * a specific smoothing schedule). The CDP_init_seasonal parameter is
+ * really an RRA level, not a data source within RRA level parameter, but
+ * the rra_def is read only for rrd_update (not flushed to disk). */
+ if (*init_seasonal > BURNIN_CYCLES) {
+ /* someone has no doubt invented a trick to deal with this wrap around,
+ * but at least this code is clear. */
+ if (seasonal_smooth_idx > cur_row) {
+ /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
+ * between PDP and CDP */
+ return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
+ }
+ /* can't rely on negative numbers because we are working with
+ * unsigned values */
+ return (cur_row + elapsed_pdp_st >= row_cnt
+ && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
+ }
+ /* mark off one of the burn-in cycles */
+ return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
+}
+
+/*
+ * For a given RRA, iterate over the data sources and call the appropriate
+ * consolidation function.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int update_cdp_prep(
+ rrd_t *rrd,
+ unsigned long elapsed_pdp_st,
+ unsigned long start_pdp_offset,
+ unsigned long *rra_step_cnt,
+ int rra_idx,
+ rrd_value_t *pdp_temp,
+ rrd_value_t *last_seasonal_coef,
+ rrd_value_t *seasonal_coef,
+ int current_cf)
+{
+ unsigned long ds_idx, cdp_idx;
+
+ /* update CDP_PREP areas */
+ /* loop over data soures within each RRA */
+ for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
+
+ cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
+
+ if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
+ update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
+ pdp_temp[ds_idx], rra_step_cnt[rra_idx],
+ elapsed_pdp_st, start_pdp_offset,
+ rrd->rra_def[rra_idx].pdp_cnt,
+ rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
+ rra_idx, ds_idx);
+ } else {
+ /* Nothing to consolidate if there's one PDP per CDP. However, if
+ * we've missed some PDPs, let's update null counters etc. */
+ if (elapsed_pdp_st > 2) {
+ reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
+ seasonal_coef, rra_idx, ds_idx, cdp_idx,
+ current_cf);
+ }
+ }
+
+ if (rrd_test_error())
+ return -1;
+ } /* endif data sources loop */
+ return 0;
+}
+
+/*
+ * Given the new reading (pdp_temp_val), update or initialize the CDP value,
+ * primary value, secondary value, and # of unknowns.
+ */
+static void update_cdp(
+ unival *scratch,
+ int current_cf,
+ rrd_value_t pdp_temp_val,
+ unsigned long rra_step_cnt,
+ unsigned long elapsed_pdp_st,
+ unsigned long start_pdp_offset,
+ unsigned long pdp_cnt,
+ rrd_value_t xff,
+ int i,
+ int ii)
+{
+ /* shorthand variables */
+ rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
+ rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
+ rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
+ unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
+
+ if (rra_step_cnt) {
+ /* If we are in this block, as least 1 CDP value will be written to
+ * disk, this is the CDP_primary_val entry. If more than 1 value needs
+ * to be written, then the "fill in" value is the CDP_secondary_val
+ * entry. */
+ if (isnan(pdp_temp_val)) {
+ *cdp_unkn_pdp_cnt += start_pdp_offset;
+ *cdp_secondary_val = DNAN;
+ } else {
+ /* CDP_secondary value is the RRA "fill in" value for intermediary
+ * CDP data entries. No matter the CF, the value is the same because
+ * the average, max, min, and last of a list of identical values is
+ * the same, namely, the value itself. */
+ *cdp_secondary_val = pdp_temp_val;
+ }
+
+ if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
+ *cdp_primary_val = DNAN;
+ if (current_cf == CF_AVERAGE) {
+ *cdp_val =
+ initialize_average_carry_over(pdp_temp_val,
+ elapsed_pdp_st,
+ start_pdp_offset, pdp_cnt);