=cut
use Getopt::Long ('GetOptions');
-use Data::Dumper ();
our $InFile;
our $InDS = [];
our $OutFile;
our $OutDS = [];
+our $NewDSes = [];
our $NewRRAs = [];
our $Step = 0;
+our $Scale = 1.0;
+our $Shift = 0.0;
+
+our $Debug = 0;
+
=head1 OPTIONS
The following options can be passed on the command line:
been adjusted, take that into account when specifying I<steps> and I<rows>. For
an explanation of the format please see L<rrdcreate(1)>.
+=item B<--scale> I<factor>
+
+Scales the values by the factor I<factor>, i.E<nbsp>e. all values are
+multiplied by I<factor>.
+
+=item B<--shift> I<offset>
+
+Shifts all values by I<offset>, i.E<nbsp>e. I<offset> is added to all values.
+
=back
=cut
push (@$OutDS, $out_ds);
},
'step|s=i' => \$Step,
+ 'ds|d=s' => sub
+ {
+ #DS:ds-name:GAUGE | COUNTER | DERIVE | ABSOLUTE:heartbeat:min:max
+ my ($ds, $name, $type, $hb, $min, $max) = split (':', $_[1]);
+ if (($ds ne 'DS') || !defined ($max))
+ {
+ print STDERR "Please use the standard RRDTool syntax when adding DSes. I. e. DS:<name>:<type>:<heartbeat>:<min>:<max>.\n";
+ exit (1);
+ }
+ push (@$NewDSes, {name => $name, type => $type, heartbeat => $hb, min => $min, max => $max});
+ },
'rra|a=s' => sub
{
my ($rra, $cf, $xff, $steps, $rows) = split (':', $_[1]);
exit (1);
}
push (@$NewRRAs, {cf => $cf, xff => $xff, steps => $steps, rows => $rows});
- }
+ },
+ 'scale=f' => \$Scale,
+ 'shift=f' => \$Shift
) or exit (1);
if (!$InFile || !$OutFile)
print STDERR "You need the same amount of in- and out-DSes\n";
exit (1);
}
-
main ($InFile, $OutFile);
exit (0);
$ds_index = [];
for (my $i = 0; $i < @$InDS; $i++)
{
+ print STDOUT "DS map $i: $InDS->[$i] -> $OutDS->[$i]\n" if ($Debug);
$ds_index->[$i] = -1;
}
}
$out_cache->[$current_index] .= $line;
}
+ elsif ($line =~ m#<last_ds>\s*([^\s>]+)\s*</last_ds>#i)
+ {
+ $out_cache->[$current_index] .= "\t\t<last_ds> NaN </last_ds>\n";
+ }
+ elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
+ {
+ $out_cache->[$current_index] .= "\t\t<value> NaN </value>\n";
+ }
elsif ($line =~ m#</ds>#)
{
$out_cache->[$current_index] .= $line;
$current_index++;
$out_cache->[$current_index] .= $line;
}
+ elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
+ {
+ $out_cache->[$current_index] .= "\t\t\t<value> NaN </value>\n";
+ }
elsif ($line =~ m#</cdp_prep>#)
{
# Print out all the DS definitions we need
return;
}
+ if ($Debug && !defined ($step_factor_up))
+ {
+ print STDOUT "New step: $Step\n";
+ }
+
$step_factor_up ||= 0;
$step_factor_down ||= 0;
}} # handle_line_step
#
+# The _add DS_ handler
+#
+{
+my $add_ds_done;
+sub handle_line_add_ds
+{
+ my $line = shift;
+ my $index = shift;
+
+ my $post = sub { for (@_) { post_line ($_, $index + 1); } };
+
+ if (!@$NewDSes)
+ {
+ $post->($line);
+ return;
+ }
+
+ if (!$add_ds_done && ($line =~ m#<rra>#i))
+ {
+ for (my $i = 0; $i < @$NewDSes; $i++)
+ {
+ my $ds = $NewDSes->[$i];
+ my $temp;
+
+ my $min;
+ my $max;
+
+ if ($Debug)
+ {
+ print STDOUT "Adding DS: name = $ds->{'name'}, type = $ds->{'type'}, heartbeat = $ds->{'heartbeat'}, min = $ds->{'min'}, max = $ds->{'max'}\n";
+ }
+
+ $min = 'NaN';
+ if (defined ($ds->{'min'}) && ($ds->{'min'} ne 'U'))
+ {
+ $min = sprintf ('%.10e', $ds->{'min'});
+ }
+
+ $max = 'NaN';
+ if (defined ($ds->{'max'}) && ($ds->{'max'} ne 'U'))
+ {
+ $max = sprintf ('%.10e', $ds->{'max'});
+ }
+
+
+ $post->("\t<ds>\n",
+ "\t\t<name> $ds->{'name'} </name>\n",
+ "\t\t<type> $ds->{'type'} </type>\n",
+ "\t\t<minimal_heartbeat> $ds->{'heartbeat'} </minimal_heartbeat>\n",
+ "\t\t<min> $min </min>\n",
+ "\t\t<max> $max </max>\n",
+ "\n",
+ "\t\t<!-- PDP Status -->\n",
+ "\t\t<last_ds> UNKN </last_ds>\n",
+ "\t\t<value> NaN </value>\n",
+ "\t\t<unknown_sec> 0 </unknown_sec>\n",
+ "\t</ds>\n",
+ "\n");
+ }
+
+ $add_ds_done = 1;
+ }
+ elsif ($add_ds_done && ($line =~ m#</ds>#i)) # inside a cdp_prep block
+ {
+ $post->("\t\t\t</ds>\n",
+ "\t\t\t<ds>\n",
+ "\t\t\t<primary_value> NaN </primary_value>\n",
+ "\t\t\t<secondary_value> NaN </secondary_value>\n",
+ "\t\t\t<value> NaN </value>\n",
+ "\t\t\t<unknown_datapoints> 0 </unknown_datapoints>\n");
+ }
+ elsif ($line =~ m#<row>#i)
+ {
+ my $insert = '<v> NaN </v>' x (0 + @$NewDSes);
+ $line =~ s#</row>#$insert</row>#i;
+ }
+
+ $post->($line);
+}} # handle_line_add_ds
+
+#
# The _add RRA_ handler
#
{
{
my $rra = $NewRRAs->[$i];
my $temp;
+
+ if ($Debug)
+ {
+ print STDOUT "Adding RRA: CF = $rra->{'cf'}, xff = $rra->{'xff'}, steps = $rra->{'steps'}, rows = $rra->{'rows'}, num_ds = $num_ds\n";
+ }
+
$post->("\t<rra>\n",
"\t\t<cf> $rra->{'cf'} </cf>\n",
"\t\t<pdp_per_row> $rra->{'steps'} </pdp_per_row>\n",
{
$post->($temp);
}
- $post->("\t\t</database>\n");
+ $post->("\t\t</database>\n", "\t</rra>\n");
}
+
+ $add_rra_done = 1;
}
$post->($line);
}} # handle_line_add_rra
#
+# The _scale/shift_ handler
+#
+sub calculate_scale_shift
+{
+ my $value = shift;
+ my $tag = shift;
+ my $scale = shift;
+ my $shift = shift;
+
+ if (lc ("$value") eq 'nan')
+ {
+ $value = 'NaN';
+ return ("<$tag> NaN </$tag>");
+ }
+
+ $value = ($scale * (0.0 + $value)) + $shift;
+ return (sprintf ("<%s> %1.10e </%s>", $tag, $value, $tag));
+}
+
+sub handle_line_scale_shift
+{
+ my $line = shift;
+ my $index = shift;
+
+ if (($Scale != 1.0) || ($Shift != 0.0))
+ {
+ $line =~ s#<(min|max|last_ds|value|primary_value|secondary_value|v)>\s*([^\s<]+)\s*</[^>]+>#calculate_scale_shift ($2, $1, $Scale, $Shift)#eg;
+ }
+
+ post_line ($line, $index + 1);
+}
+
+#
# The _output_ handler
#
+# This filter is unfinished!
+#
{
my $fh;
sub set_output
$fh = shift;
}
+{
+my $previous_values;
+my $previous_differences;
+my $pdp_per_row;
+sub handle_line_peak_detect
+{
+ my $line = shift;
+ my $index = shift;
+
+ if (!$previous_values)
+ {
+ $previous_values = [];
+ $previous_differences = [];
+ }
+
+ if ($line =~ m#</database>#i)
+ {
+ $previous_values = [];
+ $previous_differences = [];
+ print STDERR "==============================================================================\n";
+ }
+ elsif ($line =~ m#<pdp_per_row>\s*([1-9][0-9]*)\s*</pdp_per_row>#)
+ {
+ $pdp_per_row = int ($1);
+ print STDERR "pdp_per_row = $pdp_per_row;\n";
+ }
+ elsif ($line =~ m#<row>#)
+ {
+ my @values = ();
+ while ($line =~ m#<v>\s*([^\s>]+)\s*</v>#ig)
+ {
+ if ($1 eq 'NaN')
+ {
+ push (@values, undef);
+ }
+ else
+ {
+ push (@values, 0.0 + $1);
+ }
+ }
+
+ for (my $i = 0; $i < @values; $i++)
+ {
+ if (!defined ($values[$i]))
+ {
+ $previous_values->[$i] = undef;
+ }
+ elsif (!defined ($previous_values->[$i]))
+ {
+ $previous_values->[$i] = $values[$i];
+ }
+ elsif (!defined ($previous_differences->[$i]))
+ {
+ $previous_differences->[$i] = abs ($previous_values->[$i] - $values[$i]);
+ }
+ else
+ {
+ my $divisor = ($previous_differences->[$i] < 1.0) ? 1.0 : $previous_differences->[$i];
+ my $difference = abs ($previous_values->[$i] - $values[$i]);
+ my $change = $pdp_per_row * $difference / $divisor;
+ if (($divisor > 10.0) && ($change > 10e5))
+ {
+ print STDERR "i = $i; average difference = " . $previous_differences->[$i]. "; current difference = " . $difference. "; change = $change;\n";
+ }
+ $previous_values->[$i] = $values[$i];
+ $previous_differences->[$i] = (0.95 * $previous_differences->[$i]) + (0.05 * $difference);
+ }
+ }
+ }
+
+ post_line ($line, $index + 1);
+}} # handle_line_peak_detect
+
sub handle_line_output
{
my $line = shift;
sub handle_fh
{
- my $in_fh = shift;
- my $out_fh = shift;
+ my $in_fh = shift;
+ my $out_fh = shift;
- set_output ($out_fh);
+ set_output ($out_fh);
- if (@$InDS)
- {
- add_handler (\&handle_line_dsmap);
- }
+ if (@$InDS)
+ {
+ add_handler (\&handle_line_dsmap);
+ }
- if ($Step)
- {
- add_handler (\&handle_line_step);
- }
+ if ($Step)
+ {
+ add_handler (\&handle_line_step);
+ }
- if (@$NewRRAs)
- {
- add_handler (\&handle_line_add_rra);
- }
+ if (($Scale != 1.0) || ($Shift != 0.0))
+ {
+ add_handler (\&handle_line_scale_shift);
+ }
- add_handler (\&handle_line_output);
+ #add_handler (\&handle_line_peak_detect);
- while (my $line = <$in_fh>)
- {
- post_line ($line, 0);
- }
+ if (@$NewDSes)
+ {
+ add_handler (\&handle_line_add_ds);
+ }
+
+ if (@$NewRRAs)
+ {
+ add_handler (\&handle_line_add_rra);
+ }
+
+ add_handler (\&handle_line_output);
+
+ while (my $line = <$in_fh>)
+ {
+ post_line ($line, 0);
+ }
} # handle_fh
sub main
return (0);
} /* int uc_send_notification */
+static int uc_insert (const data_set_t *ds, const value_list_t *vl,
+ const char *key)
+{
+ int i;
+ char *key_copy;
+ cache_entry_t *ce;
+
+ /* `cache_lock' has been locked by `uc_update' */
+
+ key_copy = strdup (key);
+ if (key_copy == NULL)
+ {
+ ERROR ("uc_insert: strdup failed.");
+ return (-1);
+ }
+
+ ce = cache_alloc (ds->ds_num);
+ if (ce == NULL)
+ {
+ ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
+ return (-1);
+ }
+
+ sstrncpy (ce->name, key, sizeof (ce->name));
+
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ if (ds->ds[i].type == DS_TYPE_COUNTER)
+ {
+ ce->values_gauge[i] = NAN;
+ ce->values_counter[i] = vl->values[i].counter;
+ }
+ else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
+ {
+ ce->values_gauge[i] = vl->values[i].gauge;
+ }
+ } /* for (i) */
+
+ ce->last_time = vl->time;
+ ce->last_update = time (NULL);
+ ce->interval = vl->interval;
+ ce->state = STATE_OKAY;
+
+ if (c_avl_insert (cache_tree, key_copy, ce) != 0)
+ {
+ sfree (key_copy);
+ ERROR ("uc_insert: c_avl_insert failed.");
+ return (-1);
+ }
+
+ DEBUG ("uc_insert: Added %s to the cache.", key);
+ return (0);
+} /* int uc_insert */
+
int uc_init (void)
{
if (cache_tree == NULL)
else if (status == 0) /* ``service'' is uninteresting */
{
ce = NULL;
- DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''", keys[i]);
- status = c_avl_remove (cache_tree, keys[i], (void *) &key, (void *) &ce);
+ DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
+ keys[i]);
+ status = c_avl_remove (cache_tree, keys[i],
+ (void *) &key, (void *) &ce);
if (status != 0)
{
ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
sfree (keys[i]);
cache_free (ce);
}
- else /* (status > 0); ``service'' is interesting */
+ else if (status == 1) /* persist */
{
- /*
- * `keys[i]' is not freed and set to NULL, so that the for-loop below
- * will send out notifications. There's nothing else to do here.
- */
- DEBUG ("uc_check_timeout: %s is missing and ``interesting''", keys[i]);
+ DEBUG ("uc_check_timeout: %s is missing, sending notification.",
+ keys[i]);
ce->state = STATE_ERROR;
}
+ else if (status == 2) /* do not persist */
+ {
+ if (ce->state == STATE_ERROR)
+ {
+ DEBUG ("uc_check_timeout: %s is missing but "
+ "notification has already been sent.",
+ keys[i]);
+ sfree (keys[i]);
+ }
+ else /* (ce->state != STATE_ERROR) */
+ {
+ DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
+ keys[i]);
+ ce->state = STATE_ERROR;
+ }
+ }
+ else
+ {
+ WARNING ("uc_check_timeout: ut_check_interesting (%s) returned ",
+ "invalid status %i.",
+ keys[i], status);
+ }
} /* for (keys[i]) */
c_avl_iterator_destroy (iter);
char name[6 * DATA_MAX_NAME_LEN];
cache_entry_t *ce = NULL;
int send_okay_notification = 0;
+ time_t update_delay = 0;
+ notification_t n;
+ int status;
+ int i;
if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
{
- ERROR ("uc_insert: FORMAT_VL failed.");
+ ERROR ("uc_update: FORMAT_VL failed.");
return (-1);
}
pthread_mutex_lock (&cache_lock);
- if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
+ status = c_avl_get (cache_tree, name, (void *) &ce);
+ if (status != 0) /* entry does not yet exist */
{
- int i;
-
- assert (ce != NULL);
- assert (ce->values_num == ds->ds_num);
-
- if (ce->last_time >= vl->time)
- {
- pthread_mutex_unlock (&cache_lock);
- NOTICE ("uc_insert: Value too old: name = %s; value time = %u; "
- "last cache update = %u;",
- name, (unsigned int) vl->time, (unsigned int) ce->last_time);
- return (-1);
- }
-
- if ((ce->last_time + ce->interval) < vl->time)
- {
- send_okay_notification = vl->time - ce->last_time;
- ce->state = STATE_OKAY;
- }
+ status = uc_insert (ds, vl, name);
+ pthread_mutex_unlock (&cache_lock);
+ return (status);
+ }
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
- {
- counter_t diff;
-
- /* check if the counter has wrapped around */
- if (vl->values[i].counter < ce->values_counter[i])
- {
- if (ce->values_counter[i] <= 4294967295U)
- diff = (4294967295U - ce->values_counter[i])
- + vl->values[i].counter;
- else
- diff = (18446744073709551615ULL - ce->values_counter[i])
- + vl->values[i].counter;
- }
- else /* counter has NOT wrapped around */
- {
- diff = vl->values[i].counter - ce->values_counter[i];
- }
-
- ce->values_gauge[i] = ((double) diff)
- / ((double) (vl->time - ce->last_time));
- ce->values_counter[i] = vl->values[i].counter;
- }
- else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
- {
- ce->values_gauge[i] = vl->values[i].gauge;
- }
- DEBUG ("uc_insert: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
- } /* for (i) */
+ assert (ce != NULL);
+ assert (ce->values_num == ds->ds_num);
- ce->last_time = vl->time;
- ce->last_update = time (NULL);
- ce->interval = vl->interval;
+ if (ce->last_time >= vl->time)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
+ "last cache update = %u;",
+ name, (unsigned int) vl->time, (unsigned int) ce->last_time);
+ return (-1);
}
- else /* key is not found */
+
+ /* Send a notification (after the lock has been released) if we switch the
+ * state from something else to `okay'. */
+ if (ce->state != STATE_OKAY)
{
- int i;
- char *key;
-
- key = strdup (name);
- if (key == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: strdup failed.");
- return (-1);
- }
+ send_okay_notification = 1;
+ ce->state = STATE_OKAY;
+ update_delay = time (NULL) - ce->last_update;
+ }
- ce = cache_alloc (ds->ds_num);
- if (ce == NULL)
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ if (ds->ds[i].type == DS_TYPE_COUNTER)
{
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
- return (-1);
- }
-
- sstrncpy (ce->name, name, sizeof (ce->name));
+ counter_t diff;
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
+ /* check if the counter has wrapped around */
+ if (vl->values[i].counter < ce->values_counter[i])
{
- ce->values_gauge[i] = NAN;
- ce->values_counter[i] = vl->values[i].counter;
+ if (ce->values_counter[i] <= 4294967295U)
+ diff = (4294967295U - ce->values_counter[i])
+ + vl->values[i].counter;
+ else
+ diff = (18446744073709551615ULL - ce->values_counter[i])
+ + vl->values[i].counter;
}
- else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
+ else /* counter has NOT wrapped around */
{
- ce->values_gauge[i] = vl->values[i].gauge;
+ diff = vl->values[i].counter - ce->values_counter[i];
}
- } /* for (i) */
- ce->last_time = vl->time;
- ce->last_update = time (NULL);
- ce->interval = vl->interval;
- ce->state = STATE_OKAY;
-
- if (c_avl_insert (cache_tree, key, ce) != 0)
+ ce->values_gauge[i] = ((double) diff)
+ / ((double) (vl->time - ce->last_time));
+ ce->values_counter[i] = vl->values[i].counter;
+ }
+ else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
{
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: c_avl_insert failed.");
- return (-1);
+ ce->values_gauge[i] = vl->values[i].gauge;
}
+ DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
+ } /* for (i) */
- DEBUG ("uc_insert: Added %s to the cache.", name);
- } /* if (key is not found) */
+ ce->last_time = vl->time;
+ ce->last_update = time (NULL);
+ ce->interval = vl->interval;
pthread_mutex_unlock (&cache_lock);
+ if (send_okay_notification == 0)
+ return (0);
+
/* Do not send okay notifications for uninteresting values, i. e. values for
* which no threshold is configured. */
- if (send_okay_notification > 0)
- {
- int status;
-
- status = ut_check_interesting (name);
- if (status <= 0)
- send_okay_notification = 0;
- }
-
- if (send_okay_notification > 0)
- {
- notification_t n;
- memset (&n, '\0', sizeof (n));
+ status = ut_check_interesting (name);
+ if (status <= 0)
+ return (0);
- /* Copy the associative members */
- NOTIFICATION_INIT_VL (&n, vl, ds);
+ /* Initialize the notification */
+ memset (&n, '\0', sizeof (n));
+ NOTIFICATION_INIT_VL (&n, vl, ds);
- n.severity = NOTIF_OKAY;
- n.time = vl->time;
+ n.severity = NOTIF_OKAY;
+ n.time = vl->time;
- snprintf (n.message, sizeof (n.message),
- "Received a value for %s. It was missing for %i seconds.",
- name, send_okay_notification);
- n.message[sizeof (n.message) - 1] = '\0';
+ snprintf (n.message, sizeof (n.message),
+ "Received a value for %s. It was missing for %u seconds.",
+ name, (unsigned int) update_delay);
+ n.message[sizeof (n.message) - 1] = '\0';
- plugin_dispatch_notification (&n);
- }
+ plugin_dispatch_notification (&n);
return (0);
-} /* int uc_insert */
+} /* int uc_update */
gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
{
if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
{
- ERROR ("uc_insert: FORMAT_VL failed.");
+ ERROR ("uc_get_rate: FORMAT_VL failed.");
return (NULL);
}