1 /*****************************************************************************
2 * RRDtool 1.4.2 Copyright by Tobi Oetiker, 1997-2009
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
13 #include "rrd_hw_math.h"
14 #include "rrd_hw_update.h"
16 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
20 /* private functions */
21 static unsigned long MyMod(
27 unsigned long rra_idx,
28 unsigned long rra_start,
31 rrd_value_t **seasonal_coef)
33 unsigned long pos_tmp;
35 /* rra_ptr[].cur_row points to the rra row to be written; this function
36 * reads cur_row + offset */
37 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
39 /* handle wrap around */
40 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
41 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
43 /* rra_start points to the appropriate rra block in the file */
44 /* compute the pointer to the appropriate location in the file */
47 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
49 /* allocate memory if need be */
50 if (*seasonal_coef == NULL)
52 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
54 if (*seasonal_coef == NULL) {
55 rrd_set_error("memory allocation failure: seasonal coef");
59 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
61 (rrd_file, *seasonal_coef,
62 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
63 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
65 /* we can safely ignore the rule requiring a seek operation between read
66 * and write, because this read moves the file pointer to somewhere
67 * in the file other than the next write location.
71 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
75 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
82 /* For the specified CDP prep area and the FAILURES RRA,
83 * erase all history of past violations.
85 void erase_violations(
87 unsigned long cdp_idx,
88 unsigned long rra_idx)
91 char *violations_array;
93 /* check that rra_idx is a CF_FAILURES array */
94 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
96 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
97 rrd->rra_def[rra_idx].cf_nam);
102 fprintf(stderr, "scratch buffer before erase:\n");
103 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
104 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
106 fprintf(stderr, "\n");
109 /* WARNING: an array of longs on disk is treated as an array of chars
111 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
112 /* erase everything in the part of the CDP scratch array that will be
113 * used to store violations for the current window */
114 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
115 violations_array[i - 1] = 0;
118 fprintf(stderr, "scratch buffer after erase:\n");
119 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
120 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
122 fprintf(stderr, "\n");
126 /* Smooth a periodic array with a moving average: equal weights and
127 * length = 5% of the period. */
130 unsigned long rra_idx,
131 unsigned long rra_start,
132 rrd_file_t *rrd_file)
134 unsigned long i, j, k;
135 unsigned long totalbytes;
136 rrd_value_t *rrd_values;
137 unsigned long row_length = rrd->stat_head->ds_cnt;
138 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
139 unsigned long offset;
141 rrd_value_t *working_average;
142 rrd_value_t *baseline;
144 if (atoi(rrd->stat_head->version) >= 4) {
145 offset = floor(rrd->rra_def[rra_idx].
146 par[RRA_seasonal_smoothing_window].
147 u_val / 2 * row_count);
149 offset = floor(0.05 / 2 * row_count);
153 return 0; /* no smoothing */
155 /* allocate memory */
156 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
157 rrd_values = (rrd_value_t *) malloc(totalbytes);
158 if (rrd_values == NULL) {
159 rrd_set_error("apply smoother: memory allocation failure");
163 /* rra_start is at the beginning of this rra */
164 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
165 rrd_set_error("seek to rra %d failed", rra_start);
170 /* could read all data in a single block, but we need to
171 * check for NA values */
172 for (i = 0; i < row_count; ++i) {
173 for (j = 0; j < row_length; ++j) {
175 (rrd_file, &(rrd_values[i * row_length + j]),
176 sizeof(rrd_value_t) * 1)
177 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
178 rrd_set_error("reading value failed: %s",
179 rrd_strerror(errno));
181 if (isnan(rrd_values[i * row_length + j])) {
182 /* can't apply smoothing, still uninitialized values */
185 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
194 /* allocate queues, one for each data source */
195 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
196 for (i = 0; i < row_length; ++i) {
197 queue_alloc(&(buffers[i]), 2 * offset + 1);
199 /* need working average initialized to 0 */
200 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203 /* compute sums of the first 2*offset terms */
204 for (i = 0; i < 2 * offset; ++i) {
205 k = MyMod(i - offset, row_count);
206 for (j = 0; j < row_length; ++j) {
207 queue_push(buffers[j], rrd_values[k * row_length + j]);
208 working_average[j] += rrd_values[k * row_length + j];
212 /* compute moving averages */
213 for (i = offset; i < row_count + offset; ++i) {
214 for (j = 0; j < row_length; ++j) {
215 k = MyMod(i, row_count);
216 /* add a term to the sum */
217 working_average[j] += rrd_values[k * row_length + j];
218 queue_push(buffers[j], rrd_values[k * row_length + j]);
220 /* reset k to be the center of the window */
221 k = MyMod(i - offset, row_count);
222 /* overwrite rdd_values entry, the old value is already
223 * saved in buffers */
224 rrd_values[k * row_length + j] =
225 working_average[j] / (2 * offset + 1);
226 baseline[j] += rrd_values[k * row_length + j];
228 /* remove a term from the sum */
229 working_average[j] -= queue_pop(buffers[j]);
233 for (i = 0; i < row_length; ++i) {
234 queue_dealloc(buffers[i]);
235 baseline[i] /= row_count;
238 free(working_average);
240 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
243 rrd_value_t seasonal_coef,
244 rrd_value_t intercept);
246 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
248 init_seasonality = hw_additive_init_seasonality;
251 init_seasonality = hw_multiplicative_init_seasonality;
254 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
255 "valid dependency: %s",
256 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
260 for (j = 0; j < row_length; ++j) {
261 for (i = 0; i < row_count; ++i) {
262 rrd_values[i * row_length + j] =
263 init_seasonality(rrd_values[i * row_length + j],
266 /* update the baseline coefficient,
267 * first, compute the cdp_index. */
268 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
269 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
272 /* flush cdp to disk */
273 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
274 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
275 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
276 sizeof(live_head_t) +
277 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
278 rrd_set_error("apply_smoother: seek to cdp_prep failed");
282 if (rrd_write(rrd_file, rrd->cdp_prep,
284 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
285 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
286 (rrd->stat_head->ds_cnt))) {
287 rrd_set_error("apply_smoother: cdp_prep write failed");
293 /* endif CF_SEASONAL */
294 /* flush updated values to disk */
295 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
296 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
300 /* write as a single block */
302 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
303 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
304 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
314 /* Reset aberrant behavior model coefficients, including intercept, slope,
315 * seasonal, and seasonal deviation for the specified data source. */
316 void reset_aberrant_coefficients(
318 rrd_file_t *rrd_file,
319 unsigned long ds_idx)
321 unsigned long cdp_idx, rra_idx, i;
322 unsigned long cdp_start, rra_start;
323 rrd_value_t nan_buffer = DNAN;
325 /* compute the offset for the cdp area */
326 cdp_start = sizeof(stat_head_t) +
327 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
328 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
329 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
330 /* compute the offset for the first rra */
331 rra_start = cdp_start +
332 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
333 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
335 /* loop over the RRAs */
336 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
337 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
338 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
341 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
345 /* don't use init_seasonal because it will reset burn-in, which
346 * means different data sources will be calling for the smoother
347 * at different times. */
348 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
349 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
350 /* move to first entry of data source for this rra */
351 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
353 /* entries for the same data source are not contiguous,
354 * temporal entries are contiguous */
355 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
356 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
357 != sizeof(rrd_value_t) * 1) {
359 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
360 ds_idx, rrd->rra_def[rra_idx].cf_nam);
363 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
364 sizeof(rrd_value_t), SEEK_CUR);
368 erase_violations(rrd, cdp_idx, rra_idx);
373 /* move offset to the next rra */
374 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
377 rrd_seek(rrd_file, cdp_start, SEEK_SET);
378 if (rrd_write(rrd_file, rrd->cdp_prep,
380 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
381 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
382 (rrd->stat_head->ds_cnt))) {
383 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
387 void init_hwpredict_cdp(
390 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
391 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
392 cdp->scratch[CDP_hw_slope].u_val = DNAN;
393 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
394 cdp->scratch[CDP_null_count].u_cnt = 1;
395 cdp->scratch[CDP_last_null_count].u_cnt = 1;
398 void init_seasonal_cdp(
401 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
402 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
403 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
406 int update_aberrant_CF(
409 enum cf_en current_cf,
410 unsigned long cdp_idx,
411 unsigned long rra_idx,
412 unsigned long ds_idx,
413 unsigned short CDP_scratch_idx,
414 rrd_value_t *seasonal_coef)
416 static hw_functions_t hw_multiplicative_functions = {
417 hw_multiplicative_calculate_prediction,
418 hw_multiplicative_calculate_intercept,
420 hw_multiplicative_calculate_seasonality,
421 hw_multiplicative_init_seasonality,
422 hw_calculate_seasonal_deviation,
423 hw_init_seasonal_deviation,
424 1.0 /* identity value */
427 static hw_functions_t hw_additive_functions = {
428 hw_additive_calculate_prediction,
429 hw_additive_calculate_intercept,
431 hw_additive_calculate_seasonality,
432 hw_additive_init_seasonality,
433 hw_calculate_seasonal_deviation,
434 hw_init_seasonal_deviation,
435 0.0 /* identity value */
438 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
439 switch (current_cf) {
441 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
442 CDP_scratch_idx, &hw_additive_functions);
444 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
446 &hw_multiplicative_functions);
448 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
451 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
453 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
454 CDP_scratch_idx, seasonal_coef,
455 &hw_additive_functions);
457 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
458 CDP_scratch_idx, seasonal_coef,
459 &hw_multiplicative_functions);
464 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
466 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
467 CDP_scratch_idx, seasonal_coef,
468 &hw_additive_functions);
470 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
471 CDP_scratch_idx, seasonal_coef,
472 &hw_multiplicative_functions);
478 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
481 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
482 CDP_scratch_idx, &hw_additive_functions);
484 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
486 &hw_multiplicative_functions);
497 static unsigned long MyMod(
501 unsigned long new_val;
504 new_val = ((unsigned long) abs(val)) % mod;
506 new_val = (val % mod);
509 return (mod - new_val);
514 /* a standard fixed-capacity FIF0 queue implementation
515 * No overflow checking is performed. */
520 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
523 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
524 if ((*q)->queue == NULL) {
528 (*q)->capacity = capacity;
529 (*q)->head = capacity;
537 return (q->head % q->capacity == q->tail);
544 q->queue[(q->tail)++] = value;
545 q->tail = q->tail % q->capacity;
548 rrd_value_t queue_pop(
551 q->head = q->head % q->capacity;
552 return q->queue[(q->head)++];