1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
14 /* private functions */
20 unsigned long cdp_idx,
21 unsigned long rra_idx,
23 unsigned short CDP_scratch_idx);
26 unsigned long cdp_idx,
27 unsigned long rra_idx,
29 unsigned short CDP_scratch_idx,
30 rrd_value_t *seasonal_coef);
31 int update_devpredict(
33 unsigned long cdp_idx,
34 unsigned long rra_idx,
36 unsigned short CDP_scratch_idx);
37 int update_devseasonal(
39 unsigned long cdp_idx,
40 unsigned long rra_idx,
42 unsigned short CDP_scratch_idx,
43 rrd_value_t *seasonal_dev);
46 unsigned long cdp_idx,
47 unsigned long rra_idx,
49 unsigned short CDP_scratch_idx);
53 unsigned long cdp_idx,
54 unsigned long rra_idx,
56 unsigned short CDP_scratch_idx)
58 rrd_value_t prediction, seasonal_coef;
59 unsigned long dependent_rra_idx, seasonal_cdp_idx;
60 unival *coefs = rrd->cdp_prep[cdp_idx].scratch;
61 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
63 /* save coefficients from current prediction */
64 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_hw_intercept].u_val;
65 coefs[CDP_hw_last_slope].u_val = coefs[CDP_hw_slope].u_val;
66 coefs[CDP_last_null_count].u_cnt = coefs[CDP_null_count].u_cnt;
68 /* retrieve the current seasonal coef */
69 dependent_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
70 seasonal_cdp_idx = dependent_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
71 if (dependent_rra_idx < rra_idx)
73 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
77 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
79 /* compute the prediction */
80 if (isnan(coefs[CDP_hw_intercept].u_val)
81 || isnan(coefs[CDP_hw_slope].u_val)
82 || isnan(seasonal_coef)) {
85 /* bootstrap initialization of slope and intercept */
86 if (isnan(coefs[CDP_hw_intercept].u_val) &&
87 !isnan(coefs[CDP_scratch_idx].u_val)) {
89 fprintf(stderr, "Initialization of slope/intercept\n");
91 coefs[CDP_hw_intercept].u_val = coefs[CDP_scratch_idx].u_val;
92 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_scratch_idx].u_val;
93 /* initialize the slope to 0 */
94 coefs[CDP_hw_slope].u_val = 0.0;
95 coefs[CDP_hw_last_slope].u_val = 0.0;
96 /* initialize null count to 1 */
97 coefs[CDP_null_count].u_cnt = 1;
98 coefs[CDP_last_null_count].u_cnt = 1;
100 /* if seasonal coefficient is NA, then don't update intercept, slope */
102 prediction = coefs[CDP_hw_intercept].u_val +
103 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
106 fprintf(stderr, "computed prediction: %f\n", prediction);
108 if (isnan(coefs[CDP_scratch_idx].u_val)) {
109 /* NA value, no updates of intercept, slope;
110 * increment the null count */
111 (coefs[CDP_null_count].u_cnt)++;
114 fprintf(stderr, "Updating intercept, slope\n");
116 /* update the intercept */
117 coefs[CDP_hw_intercept].u_val =
118 (current_rra->par[RRA_hw_alpha].u_val) *
119 (coefs[CDP_scratch_idx].u_val - seasonal_coef) + (1 -
124 (coefs[CDP_hw_intercept].u_val +
125 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
126 /* update the slope */
127 coefs[CDP_hw_slope].u_val =
128 (current_rra->par[RRA_hw_beta].u_val) *
129 (coefs[CDP_hw_intercept].u_val -
130 coefs[CDP_hw_last_intercept].u_val) + (1 -
134 (coefs[CDP_hw_slope].u_val);
135 /* reset the null count */
136 coefs[CDP_null_count].u_cnt = 1;
140 /* store the prediction for writing */
141 coefs[CDP_scratch_idx].u_val = prediction;
147 unsigned long rra_idx,
148 unsigned long rra_start,
149 rrd_file_t *rrd_file,
150 unsigned long offset,
151 rrd_value_t **seasonal_coef)
153 unsigned long pos_tmp;
155 /* rra_ptr[].cur_row points to the rra row to be written; this function
156 * reads cur_row + offset */
157 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
159 /* handle wrap around */
160 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
161 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
163 /* rra_start points to the appropriate rra block in the file */
164 /* compute the pointer to the appropriate location in the file */
167 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
169 /* allocate memory if need be */
170 if (*seasonal_coef == NULL)
172 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
173 sizeof(rrd_value_t));
174 if (*seasonal_coef == NULL) {
175 rrd_set_error("memory allocation failure: seasonal coef");
179 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
181 (rrd_file, *seasonal_coef,
182 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
183 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
185 /* we can safely ignore the rule requiring a seek operation between read
186 * and write, because this read moves the file pointer to somewhere
187 * in the file other than the next write location.
191 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
195 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
204 unsigned long cdp_idx,
205 unsigned long rra_idx,
206 unsigned long ds_idx,
207 unsigned short CDP_scratch_idx,
208 rrd_value_t *seasonal_coef)
210 /* TODO: extract common if subblocks in the wake of I/O optimization */
211 rrd_value_t intercept, seasonal;
212 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
214 &(rrd->rra_def[current_rra->par[RRA_dependent_rra_idx].u_cnt]);
215 /* obtain cdp_prep index for HWPREDICT */
216 unsigned long hw_cdp_idx = (current_rra->par[RRA_dependent_rra_idx].u_cnt)
217 * (rrd->stat_head->ds_cnt) + ds_idx;
218 unival *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
220 /* update seasonal coefficient in cdp prep areas */
221 seasonal = rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val;
222 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = seasonal;
223 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val =
224 seasonal_coef[ds_idx];
226 /* update seasonal value for disk */
227 if (current_rra->par[RRA_dependent_rra_idx].u_cnt < rra_idx)
228 /* associated HWPREDICT has already been updated */
229 /* check for possible NA values */
230 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
231 /* no update, store the old value unchanged,
232 * doesn't matter if it is NA */
233 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
234 } else if (isnan(coefs[CDP_hw_last_intercept].u_val)
235 || isnan(coefs[CDP_hw_last_slope].u_val)) {
236 /* this should never happen, as HWPREDICT was already updated */
237 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
238 } else if (isnan(seasonal)) {
239 /* initialization: intercept is not currently being updated */
241 fprintf(stderr, "Initialization of seasonal coef %lu\n",
242 rrd->rra_ptr[rra_idx].cur_row);
244 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
245 -= coefs[CDP_hw_last_intercept].u_val;
247 intercept = coefs[CDP_hw_intercept].u_val;
250 "Updating seasonal, params: gamma %f, new intercept %f, old seasonal %f\n",
251 current_rra->par[RRA_seasonal_gamma].u_val,
252 intercept, seasonal);
254 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
255 (current_rra->par[RRA_seasonal_gamma].u_val) *
256 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
258 current_rra->par[RRA_seasonal_gamma].u_val) *
261 /* SEASONAL array is updated first, which means the new intercept
262 * hasn't be computed; so we compute it here. */
264 /* check for possible NA values */
265 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
266 /* no update, simple store the old value unchanged,
267 * doesn't matter if it is NA */
268 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
269 } else if (isnan(coefs[CDP_hw_intercept].u_val)
270 || isnan(coefs[CDP_hw_slope].u_val)) {
271 /* Initialization of slope and intercept will occur.
272 * force seasonal coefficient to 0. */
273 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
274 } else if (isnan(seasonal)) {
275 /* initialization: intercept will not be updated
276 * CDP_hw_intercept = CDP_hw_last_intercept; just need to
277 * subtract this baseline value. */
279 fprintf(stderr, "Initialization of seasonal coef %lu\n",
280 rrd->rra_ptr[rra_idx].cur_row);
282 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -=
283 coefs[CDP_hw_intercept].u_val;
285 /* Note that we must get CDP_scratch_idx from SEASONAL array, as CDP_scratch_idx
286 * for HWPREDICT array will be DNAN. */
287 intercept = (hw_rra->par[RRA_hw_alpha].u_val) *
288 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
291 hw_rra->par[RRA_hw_alpha].u_val) *
292 (coefs[CDP_hw_intercept].u_val +
293 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
294 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
295 (current_rra->par[RRA_seasonal_gamma].u_val) *
296 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
298 current_rra->par[RRA_seasonal_gamma].u_val) *
303 fprintf(stderr, "seasonal coefficient set= %f\n",
304 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
309 int update_devpredict(
311 unsigned long cdp_idx,
312 unsigned long rra_idx,
313 unsigned long ds_idx,
314 unsigned short CDP_scratch_idx)
316 /* there really isn't any "update" here; the only reason this information
317 * is stored separately from DEVSEASONAL is to preserve deviation predictions
318 * for a longer duration than one seasonal cycle. */
319 unsigned long seasonal_cdp_idx =
320 (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
321 * (rrd->stat_head->ds_cnt) + ds_idx;
323 if (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx) {
324 /* associated DEVSEASONAL array already updated */
325 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
327 rrd->cdp_prep[seasonal_cdp_idx].
328 scratch[CDP_last_seasonal_deviation].u_val;
330 /* associated DEVSEASONAL not yet updated */
331 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
333 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_seasonal_deviation].
339 int update_devseasonal(
341 unsigned long cdp_idx,
342 unsigned long rra_idx,
343 unsigned long ds_idx,
344 unsigned short CDP_scratch_idx,
345 rrd_value_t *seasonal_dev)
347 rrd_value_t prediction = 0, seasonal_coef = DNAN;
348 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
350 /* obtain cdp_prep index for HWPREDICT */
351 unsigned long hw_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
352 unsigned long hw_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
353 unsigned long seasonal_cdp_idx;
354 unival *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
356 rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val =
357 rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val;
358 /* retrieve the next seasonal deviation value, could be NA */
359 rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val =
360 seasonal_dev[ds_idx];
362 /* retrieve the current seasonal_coef (not to be confused with the
363 * current seasonal deviation). Could make this more readable by introducing
364 * some wrapper functions. */
366 (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt)
367 * (rrd->stat_head->ds_cnt) + ds_idx;
368 if (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
369 /* SEASONAL array already updated */
371 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
374 /* SEASONAL array not yet updated */
376 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
378 /* compute the abs value of the difference between the prediction and
380 if (hw_rra_idx < rra_idx) {
381 /* associated HWPREDICT has already been updated */
382 if (isnan(coefs[CDP_hw_last_intercept].u_val) ||
383 isnan(coefs[CDP_hw_last_slope].u_val) || isnan(seasonal_coef)) {
384 /* one of the prediction values is uinitialized */
385 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
388 prediction = coefs[CDP_hw_last_intercept].u_val +
389 (coefs[CDP_hw_last_slope].u_val) *
390 (coefs[CDP_last_null_count].u_cnt)
394 /* associated HWPREDICT has NOT been updated */
395 if (isnan(coefs[CDP_hw_intercept].u_val) ||
396 isnan(coefs[CDP_hw_slope].u_val) || isnan(seasonal_coef)) {
397 /* one of the prediction values is uinitialized */
398 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
401 prediction = coefs[CDP_hw_intercept].u_val +
402 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
407 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
408 /* no update, store existing value unchanged, doesn't
409 * matter if it is NA */
410 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
411 rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
414 (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
418 fprintf(stderr, "Initialization of seasonal deviation\n");
420 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
422 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
424 /* exponential smoothing update */
425 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
426 (rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
428 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)
430 rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
431 (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
437 /* Check for a failure based on a threshold # of violations within the specified
441 unsigned long cdp_idx,
442 unsigned long rra_idx,
443 unsigned long ds_idx,
444 unsigned short CDP_scratch_idx)
446 /* detection of a violation depends on 3 RRAs:
447 * HWPREDICT, SEASONAL, and DEVSEASONAL */
448 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
449 unsigned long dev_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
450 rra_def_t *dev_rra = &(rrd->rra_def[dev_rra_idx]);
451 unsigned long hw_rra_idx = dev_rra->par[RRA_dependent_rra_idx].u_cnt;
452 rra_def_t *hw_rra = &(rrd->rra_def[hw_rra_idx]);
453 unsigned long seasonal_rra_idx = hw_rra->par[RRA_dependent_rra_idx].u_cnt;
454 unsigned long temp_cdp_idx;
455 rrd_value_t deviation = DNAN;
456 rrd_value_t seasonal_coef = DNAN;
457 rrd_value_t prediction = DNAN;
459 unsigned short violation_cnt = 0, i;
460 char *violations_array;
462 /* usual checks to determine the order of the RRAs */
463 temp_cdp_idx = dev_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
464 if (rra_idx < seasonal_rra_idx) {
465 /* DEVSEASONAL not yet updated */
467 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
469 /* DEVSEASONAL already updated */
471 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_seasonal_deviation].
474 if (!isnan(deviation)) {
476 temp_cdp_idx = seasonal_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
477 if (rra_idx < seasonal_rra_idx) {
478 /* SEASONAL not yet updated */
480 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_seasonal].u_val;
482 /* SEASONAL already updated */
484 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_seasonal].
487 /* in this code block, we know seasonal coef is not DNAN, because deviation is not
490 temp_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
491 if (rra_idx < hw_rra_idx) {
492 /* HWPREDICT not yet updated */
494 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_intercept].u_val +
495 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_slope].u_val)
496 * (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_null_count].u_cnt)
499 /* HWPREDICT already updated */
501 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_intercept].
503 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_slope].u_val)
505 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_null_count].
510 /* determine if the observed value is a violation */
511 if (!isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
512 if (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val >
514 (current_rra->par[RRA_delta_pos].u_val) * deviation
515 || rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val <
517 (current_rra->par[RRA_delta_neg].u_val) * deviation)
520 violation = 1; /* count DNAN values as violations */
525 /* determine if a failure has occurred and update the failure array */
526 violation_cnt = violation;
527 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
528 for (i = current_rra->par[RRA_window_len].u_cnt; i > 1; i--) {
530 violations_array[i - 1] = violations_array[i - 2];
531 violation_cnt += violations_array[i - 1];
533 violations_array[0] = violation;
535 if (violation_cnt < current_rra->par[RRA_failure_threshold].u_cnt)
537 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
539 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 1.0;
541 return (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
544 /* For the specified CDP prep area and the FAILURES RRA,
545 * erase all history of past violations.
547 void erase_violations(
549 unsigned long cdp_idx,
550 unsigned long rra_idx)
553 char *violations_array;
555 /* check that rra_idx is a CF_FAILURES array */
556 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
558 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
559 rrd->rra_def[rra_idx].cf_nam);
564 fprintf(stderr, "scratch buffer before erase:\n");
565 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
566 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
568 fprintf(stderr, "\n");
571 /* WARNING: an array of longs on disk is treated as an array of chars
573 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
574 /* erase everything in the part of the CDP scratch array that will be
575 * used to store violations for the current window */
576 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
577 violations_array[i - 1] = 0;
580 fprintf(stderr, "scratch buffer after erase:\n");
581 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
582 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
584 fprintf(stderr, "\n");
588 /* Smooth a periodic array with a moving average: equal weights and
589 * length = 5% of the period. */
592 unsigned long rra_idx,
593 unsigned long rra_start,
594 rrd_file_t *rrd_file)
596 unsigned long i, j, k;
597 unsigned long totalbytes;
598 rrd_value_t *rrd_values;
599 unsigned long row_length = rrd->stat_head->ds_cnt;
600 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
601 unsigned long offset;
603 rrd_value_t *working_average;
604 rrd_value_t *baseline;
606 offset = floor(0.025 * row_count);
608 return 0; /* no smoothing */
610 /* allocate memory */
611 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
612 rrd_values = (rrd_value_t *) malloc(totalbytes);
613 if (rrd_values == NULL) {
614 rrd_set_error("apply smoother: memory allocation failure");
618 /* rra_start is at the beginning of this rra */
619 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
620 rrd_set_error("seek to rra %d failed", rra_start);
625 /* could read all data in a single block, but we need to
626 * check for NA values */
627 for (i = 0; i < row_count; ++i) {
628 for (j = 0; j < row_length; ++j) {
630 (rrd_file, &(rrd_values[i * row_length + j]),
631 sizeof(rrd_value_t) * 1)
632 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
633 rrd_set_error("reading value failed: %s",
634 rrd_strerror(errno));
636 if (isnan(rrd_values[i * row_length + j])) {
637 /* can't apply smoothing, still uninitialized values */
640 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
649 /* allocate queues, one for each data source */
650 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
651 for (i = 0; i < row_length; ++i) {
652 queue_alloc(&(buffers[i]), 2 * offset + 1);
654 /* need working average initialized to 0 */
655 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
656 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
658 /* compute sums of the first 2*offset terms */
659 for (i = 0; i < 2 * offset; ++i) {
660 k = MyMod(i - offset, row_count);
661 for (j = 0; j < row_length; ++j) {
662 queue_push(buffers[j], rrd_values[k * row_length + j]);
663 working_average[j] += rrd_values[k * row_length + j];
667 /* compute moving averages */
668 for (i = offset; i < row_count + offset; ++i) {
669 for (j = 0; j < row_length; ++j) {
670 k = MyMod(i, row_count);
671 /* add a term to the sum */
672 working_average[j] += rrd_values[k * row_length + j];
673 queue_push(buffers[j], rrd_values[k * row_length + j]);
675 /* reset k to be the center of the window */
676 k = MyMod(i - offset, row_count);
677 /* overwrite rdd_values entry, the old value is already
678 * saved in buffers */
679 rrd_values[k * row_length + j] =
680 working_average[j] / (2 * offset + 1);
681 baseline[j] += rrd_values[k * row_length + j];
683 /* remove a term from the sum */
684 working_average[j] -= queue_pop(buffers[j]);
688 for (i = 0; i < row_length; ++i) {
689 queue_dealloc(buffers[i]);
690 baseline[i] /= row_count;
693 free(working_average);
695 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
696 for (j = 0; j < row_length; ++j) {
697 for (i = 0; i < row_count; ++i) {
698 rrd_values[i * row_length + j] -= baseline[j];
700 /* update the baseline coefficient,
701 * first, compute the cdp_index. */
702 offset = (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
704 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
707 /* flush cdp to disk */
709 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
710 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
711 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
712 sizeof(live_head_t) +
713 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
714 rrd_set_error("apply_smoother: seek to cdp_prep failed");
718 if (rrd_write(rrd_file, rrd->cdp_prep,
720 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
721 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
722 (rrd->stat_head->ds_cnt))) {
723 rrd_set_error("apply_smoother: cdp_prep write failed");
729 /* endif CF_SEASONAL */
730 /* flush updated values to disk */
732 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
733 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
737 /* write as a single block */
739 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
740 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
741 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
752 /* Reset aberrant behavior model coefficients, including intercept, slope,
753 * seasonal, and seasonal deviation for the specified data source. */
754 void reset_aberrant_coefficients(
756 rrd_file_t *rrd_file,
757 unsigned long ds_idx)
759 unsigned long cdp_idx, rra_idx, i;
760 unsigned long cdp_start, rra_start;
761 rrd_value_t nan_buffer = DNAN;
763 /* compute the offset for the cdp area */
764 cdp_start = sizeof(stat_head_t) +
765 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
766 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
767 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
768 /* compute the offset for the first rra */
769 rra_start = cdp_start +
770 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
771 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
773 /* loop over the RRAs */
774 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
775 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
776 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
778 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
782 /* don't use init_seasonal because it will reset burn-in, which
783 * means different data sources will be calling for the smoother
784 * at different times. */
785 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
786 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
787 /* move to first entry of data source for this rra */
788 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
790 /* entries for the same data source are not contiguous,
791 * temporal entries are contiguous */
792 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
793 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
794 != sizeof(rrd_value_t) * 1) {
796 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
797 ds_idx, rrd->rra_def[rra_idx].cf_nam);
800 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
801 sizeof(rrd_value_t), SEEK_CUR);
805 erase_violations(rrd, cdp_idx, rra_idx);
810 /* move offset to the next rra */
811 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
814 rrd_seek(rrd_file, cdp_start, SEEK_SET);
815 if (rrd_write(rrd_file, rrd->cdp_prep,
817 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
818 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
819 (rrd->stat_head->ds_cnt))) {
820 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
821 return; /*XXX: delme */
825 void init_hwpredict_cdp(
828 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
829 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
830 cdp->scratch[CDP_hw_slope].u_val = DNAN;
831 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
832 cdp->scratch[CDP_null_count].u_cnt = 1;
833 cdp->scratch[CDP_last_null_count].u_cnt = 1;
836 void init_seasonal_cdp(
839 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
840 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
841 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
844 int update_aberrant_CF(
847 enum cf_en current_cf,
848 unsigned long cdp_idx,
849 unsigned long rra_idx,
850 unsigned long ds_idx,
851 unsigned short CDP_scratch_idx,
852 rrd_value_t *seasonal_coef)
854 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
855 switch (current_cf) {
860 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
863 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
866 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx, CDP_scratch_idx,
869 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
870 CDP_scratch_idx, seasonal_coef);
872 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
882 unsigned long new_val;
885 new_val = ((unsigned long) abs(val)) % mod;
887 new_val = (val % mod);
890 return (mod - new_val);
895 /* a standard fixed-capacity FIF0 queue implementation
896 * No overflow checking is performed. */
901 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
904 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
905 if ((*q)->queue == NULL) {
909 (*q)->capacity = capacity;
910 (*q)->head = capacity;
918 return (q->head % q->capacity == q->tail);
925 q->queue[(q->tail)++] = value;
926 q->tail = q->tail % q->capacity;
929 rrd_value_t queue_pop(
932 q->head = q->head % q->capacity;
933 return q->queue[(q->head)++];