1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
14 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
18 /* private functions */
19 static unsigned long MyMod(
25 unsigned long rra_idx,
26 unsigned long rra_start,
29 rrd_value_t **seasonal_coef)
31 unsigned long pos_tmp;
33 /* rra_ptr[].cur_row points to the rra row to be written; this function
34 * reads cur_row + offset */
35 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
37 /* handle wrap around */
38 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
39 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
41 /* rra_start points to the appropriate rra block in the file */
42 /* compute the pointer to the appropriate location in the file */
45 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
47 /* allocate memory if need be */
48 if (*seasonal_coef == NULL)
50 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
52 if (*seasonal_coef == NULL) {
53 rrd_set_error("memory allocation failure: seasonal coef");
57 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
59 (rrd_file, *seasonal_coef,
60 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
61 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
63 /* we can safely ignore the rule requiring a seek operation between read
64 * and write, because this read moves the file pointer to somewhere
65 * in the file other than the next write location.
69 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
73 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
80 /* For the specified CDP prep area and the FAILURES RRA,
81 * erase all history of past violations.
83 void erase_violations(
85 unsigned long cdp_idx,
86 unsigned long rra_idx)
89 char *violations_array;
91 /* check that rra_idx is a CF_FAILURES array */
92 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
94 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
95 rrd->rra_def[rra_idx].cf_nam);
100 fprintf(stderr, "scratch buffer before erase:\n");
101 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
102 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
104 fprintf(stderr, "\n");
107 /* WARNING: an array of longs on disk is treated as an array of chars
109 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
110 /* erase everything in the part of the CDP scratch array that will be
111 * used to store violations for the current window */
112 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
113 violations_array[i - 1] = 0;
116 fprintf(stderr, "scratch buffer after erase:\n");
117 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
118 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
120 fprintf(stderr, "\n");
124 /* Smooth a periodic array with a moving average: equal weights and
125 * length = 5% of the period. */
128 unsigned long rra_idx,
129 unsigned long rra_start,
130 rrd_file_t *rrd_file)
132 unsigned long i, j, k;
133 unsigned long totalbytes;
134 rrd_value_t *rrd_values;
135 unsigned long row_length = rrd->stat_head->ds_cnt;
136 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
137 unsigned long offset;
139 rrd_value_t *working_average;
140 rrd_value_t *baseline;
142 offset = floor(0.025 * row_count);
144 return 0; /* no smoothing */
146 /* allocate memory */
147 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
148 rrd_values = (rrd_value_t *) malloc(totalbytes);
149 if (rrd_values == NULL) {
150 rrd_set_error("apply smoother: memory allocation failure");
154 /* rra_start is at the beginning of this rra */
155 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
156 rrd_set_error("seek to rra %d failed", rra_start);
161 /* could read all data in a single block, but we need to
162 * check for NA values */
163 for (i = 0; i < row_count; ++i) {
164 for (j = 0; j < row_length; ++j) {
166 (rrd_file, &(rrd_values[i * row_length + j]),
167 sizeof(rrd_value_t) * 1)
168 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
169 rrd_set_error("reading value failed: %s",
170 rrd_strerror(errno));
172 if (isnan(rrd_values[i * row_length + j])) {
173 /* can't apply smoothing, still uninitialized values */
176 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
185 /* allocate queues, one for each data source */
186 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
187 for (i = 0; i < row_length; ++i) {
188 queue_alloc(&(buffers[i]), 2 * offset + 1);
190 /* need working average initialized to 0 */
191 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
192 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
194 /* compute sums of the first 2*offset terms */
195 for (i = 0; i < 2 * offset; ++i) {
196 k = MyMod(i - offset, row_count);
197 for (j = 0; j < row_length; ++j) {
198 queue_push(buffers[j], rrd_values[k * row_length + j]);
199 working_average[j] += rrd_values[k * row_length + j];
203 /* compute moving averages */
204 for (i = offset; i < row_count + offset; ++i) {
205 for (j = 0; j < row_length; ++j) {
206 k = MyMod(i, row_count);
207 /* add a term to the sum */
208 working_average[j] += rrd_values[k * row_length + j];
209 queue_push(buffers[j], rrd_values[k * row_length + j]);
211 /* reset k to be the center of the window */
212 k = MyMod(i - offset, row_count);
213 /* overwrite rdd_values entry, the old value is already
214 * saved in buffers */
215 rrd_values[k * row_length + j] =
216 working_average[j] / (2 * offset + 1);
217 baseline[j] += rrd_values[k * row_length + j];
219 /* remove a term from the sum */
220 working_average[j] -= queue_pop(buffers[j]);
224 for (i = 0; i < row_length; ++i) {
225 queue_dealloc(buffers[i]);
226 baseline[i] /= row_count;
229 free(working_average);
231 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
234 rrd_value_t seasonal_coef,
235 rrd_value_t intercept);
237 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
239 init_seasonality = hw_additive_init_seasonality;
242 init_seasonality = hw_multiplicative_init_seasonality;
245 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
246 "valid dependency: %s",
247 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
251 for (j = 0; j < row_length; ++j) {
252 for (i = 0; i < row_count; ++i) {
253 rrd_values[i * row_length + j] =
254 init_seasonality(rrd_values[i * row_length + j],
257 /* update the baseline coefficient,
258 * first, compute the cdp_index. */
259 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
260 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
263 /* flush cdp to disk */
265 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
266 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
267 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
268 sizeof(live_head_t) +
269 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
270 rrd_set_error("apply_smoother: seek to cdp_prep failed");
274 if (rrd_write(rrd_file, rrd->cdp_prep,
276 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
277 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
278 (rrd->stat_head->ds_cnt))) {
279 rrd_set_error("apply_smoother: cdp_prep write failed");
285 /* endif CF_SEASONAL */
286 /* flush updated values to disk */
288 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
289 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
293 /* write as a single block */
295 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
296 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
297 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
308 /* Reset aberrant behavior model coefficients, including intercept, slope,
309 * seasonal, and seasonal deviation for the specified data source. */
310 void reset_aberrant_coefficients(
312 rrd_file_t *rrd_file,
313 unsigned long ds_idx)
315 unsigned long cdp_idx, rra_idx, i;
316 unsigned long cdp_start, rra_start;
317 rrd_value_t nan_buffer = DNAN;
319 /* compute the offset for the cdp area */
320 cdp_start = sizeof(stat_head_t) +
321 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
322 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
323 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
324 /* compute the offset for the first rra */
325 rra_start = cdp_start +
326 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
327 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
329 /* loop over the RRAs */
330 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
331 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
332 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
335 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
339 /* don't use init_seasonal because it will reset burn-in, which
340 * means different data sources will be calling for the smoother
341 * at different times. */
342 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
343 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
344 /* move to first entry of data source for this rra */
345 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
347 /* entries for the same data source are not contiguous,
348 * temporal entries are contiguous */
349 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
350 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
351 != sizeof(rrd_value_t) * 1) {
353 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
354 ds_idx, rrd->rra_def[rra_idx].cf_nam);
357 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
358 sizeof(rrd_value_t), SEEK_CUR);
362 erase_violations(rrd, cdp_idx, rra_idx);
367 /* move offset to the next rra */
368 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
371 rrd_seek(rrd_file, cdp_start, SEEK_SET);
372 if (rrd_write(rrd_file, rrd->cdp_prep,
374 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
375 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
376 (rrd->stat_head->ds_cnt))) {
377 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
381 void init_hwpredict_cdp(
384 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
385 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
386 cdp->scratch[CDP_hw_slope].u_val = DNAN;
387 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
388 cdp->scratch[CDP_null_count].u_cnt = 1;
389 cdp->scratch[CDP_last_null_count].u_cnt = 1;
392 void init_seasonal_cdp(
395 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
396 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
397 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
400 int update_aberrant_CF(
403 enum cf_en current_cf,
404 unsigned long cdp_idx,
405 unsigned long rra_idx,
406 unsigned long ds_idx,
407 unsigned short CDP_scratch_idx,
408 rrd_value_t *seasonal_coef)
410 static hw_functions_t hw_multiplicative_functions = {
411 hw_multiplicative_calculate_prediction,
412 hw_multiplicative_calculate_intercept,
414 hw_multiplicative_calculate_seasonality,
415 hw_multiplicative_init_seasonality,
416 hw_calculate_seasonal_deviation,
417 hw_init_seasonal_deviation,
418 1.0 // identity value
421 static hw_functions_t hw_additive_functions = {
422 hw_additive_calculate_prediction,
423 hw_additive_calculate_intercept,
425 hw_additive_calculate_seasonality,
426 hw_additive_init_seasonality,
427 hw_calculate_seasonal_deviation,
428 hw_init_seasonal_deviation,
429 0.0 // identity value
432 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
433 switch (current_cf) {
435 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
436 CDP_scratch_idx, &hw_additive_functions);
438 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
440 &hw_multiplicative_functions);
442 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
445 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
447 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
448 CDP_scratch_idx, seasonal_coef,
449 &hw_additive_functions);
451 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
452 CDP_scratch_idx, seasonal_coef,
453 &hw_multiplicative_functions);
458 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
460 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
461 CDP_scratch_idx, seasonal_coef,
462 &hw_additive_functions);
464 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
465 CDP_scratch_idx, seasonal_coef,
466 &hw_multiplicative_functions);
472 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
475 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
476 CDP_scratch_idx, &hw_additive_functions);
478 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
480 &hw_multiplicative_functions);
491 static unsigned long MyMod(
495 unsigned long new_val;
498 new_val = ((unsigned long) abs(val)) % mod;
500 new_val = (val % mod);
503 return (mod - new_val);
508 /* a standard fixed-capacity FIF0 queue implementation
509 * No overflow checking is performed. */
514 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
517 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
518 if ((*q)->queue == NULL) {
522 (*q)->capacity = capacity;
523 (*q)->head = capacity;
531 return (q->head % q->capacity == q->tail);
538 q->queue[(q->tail)++] = value;
539 q->tail = q->tail % q->capacity;
542 rrd_value_t queue_pop(
545 q->head = q->head % q->capacity;
546 return q->queue[(q->head)++];