From 34f363d3919fb2ba27678da7231095c2e9802dd0 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Tue, 27 Mar 2007 18:58:56 +0200 Subject: [PATCH] network plugin: Implemented duplicate detection and a `Forward' option. The plugin will now only send values received via the network, if the `Forward' option is set to `true'. Also, duplicates are detected and discarded, preventing loops, duplicate entries and errors from RRDTool. --- src/collectd.conf.in | 1 + src/collectd.conf.pod | 9 ++++ src/network.c | 119 +++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 119 insertions(+), 10 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index d8432c54..2b349468 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -126,6 +126,7 @@ # Listen "ff18::efc0:4a42" "25826" # Listen "239.192.74.66" "25826" # TimeToLive "128" +# Forward false # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 6d42f07c..02e6def1 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -370,6 +370,15 @@ multicast, and IPv4 and IPv6 packets. The default is to not change this value. That means that multicast packets will be sent with a TTL of C<1> (one) on most operating systems. +=item B I + +If set to I, write packets that were received via the network plugin to +the sending sockets. This should only be activated when the B- and +B-statements differ. Otherwise packets may be send multiple times to +the same multicast group. While this results in more network traffic than +neccessary it's not a huge problem since the plugin has a duplicate detection, +so the values will not loop. + =back =head2 Plugin C diff --git a/src/network.c b/src/network.c index 35ffb393..3c258f4b 100644 --- a/src/network.c +++ b/src/network.c @@ -23,6 +23,7 @@ #include "plugin.h" #include "common.h" #include "configfile.h" +#include "utils_avltree.h" #include "network.h" @@ -143,11 +144,12 @@ static const char *config_keys[] = "Listen", "Server", "TimeToLive", - NULL + "Forward" }; -static int config_keys_num = 3; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); static int network_config_ttl = 0; +static int network_config_forward = 0; static sockent_t *sending_sockets = NULL; @@ -163,9 +165,68 @@ static value_list_t send_buffer_vl = VALUE_LIST_INIT; static char send_buffer_type[DATA_MAX_NAME_LEN]; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; +static avl_tree_t *cache_tree = NULL; +static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; + /* * Private functions */ +static int cache_check (const char *type, const value_list_t *vl) +{ + char key[1024]; + time_t *value = NULL; + int retval = -1; + + if (cache_tree == NULL) + return (-1); + + if (format_name (key, sizeof (key), vl->host, vl->plugin, + vl->plugin_instance, type, vl->type_instance)) + return (-1); + + pthread_mutex_lock (&cache_lock); + + if (avl_get (cache_tree, key, (void *) &value) == 0) + { + if (*value < vl->time) + { + *value = vl->time; + retval = 0; + } + else + { + DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i", + (int) *value, (int) vl->time); + retval = 1; + } + } + else + { + char *key_copy = strdup (key); + value = malloc (sizeof (time_t)); + if ((key_copy != NULL) && (value != NULL)) + { + *value = vl->time; + avl_insert (cache_tree, key_copy, value); + retval = 0; + } + else + { + sfree (key_copy); + sfree (value); + } + } + + /* TODO: Flush cache */ + + pthread_mutex_unlock (&cache_lock); + + DEBUG ("network plugin: cache_check: key = %s; time = %i; retval = %i", + key, (int) vl->time, retval); + + return (retval); +} /* int cache_check */ + static int write_part_values (char **ret_buffer, int *ret_buffer_len, const data_set_t *ds, const value_list_t *vl) { @@ -413,7 +474,8 @@ static int parse_packet (void *buffer, int buffer_len) if ((vl.time > 0) && (strlen (vl.host) > 0) && (strlen (vl.plugin) > 0) - && (strlen (type) > 0)) + && (strlen (type) > 0) + && (cache_check (type, &vl) == 0)) { DEBUG ("dispatching values"); plugin_dispatch_values (type, &vl); @@ -458,7 +520,7 @@ static int parse_packet (void *buffer, int buffer_len) { status = parse_part_string (&buffer, &buffer_len, vl.type_instance, sizeof (vl.type_instance)); - DEBUG ("network type: parse_packet: vl.type_instance = %s", vl.type_instance); + DEBUG ("network plugin: parse_packet: vl.type_instance = %s", vl.type_instance); } else { @@ -956,7 +1018,7 @@ static int add_to_buffer (char *buffer, int buffer_size, vl->host, strlen (vl->host)) != 0) return (-1); strcpy (vl_def->host, vl->host); - DEBUG ("host = %s", vl->host); + DEBUG ("network plugin: add_to_buffer: host = %s", vl->host); } if (vl_def->time != vl->time) @@ -965,7 +1027,8 @@ static int add_to_buffer (char *buffer, int buffer_size, (uint64_t) vl->time)) return (-1); vl_def->time = vl->time; - DEBUG ("time = %u", (unsigned int) vl->time); + DEBUG ("network plugin: add_to_buffer: time = %u", + (unsigned int) vl->time); } if (strcmp (vl_def->plugin, vl->plugin) != 0) @@ -974,7 +1037,8 @@ static int add_to_buffer (char *buffer, int buffer_size, vl->plugin, strlen (vl->plugin)) != 0) return (-1); strcpy (vl_def->plugin, vl->plugin); - DEBUG ("plugin = %s", vl->plugin); + DEBUG ("network plugin: add_to_buffer: plugin = %s", + vl->plugin); } if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0) @@ -984,7 +1048,8 @@ static int add_to_buffer (char *buffer, int buffer_size, strlen (vl->plugin_instance)) != 0) return (-1); strcpy (vl_def->plugin_instance, vl->plugin_instance); - DEBUG ("plugin_instance = %s", vl->plugin_instance); + DEBUG ("network plugin: add_to_buffer: plugin_instance = %s", + vl->plugin_instance); } if (strcmp (type_def, ds->type) != 0) @@ -993,7 +1058,7 @@ static int add_to_buffer (char *buffer, int buffer_size, ds->type, strlen (ds->type)) != 0) return (-1); strcpy (type_def, ds->type); - DEBUG ("type = %s", ds->type); + DEBUG ("network plugin: add_to_buffer: type = %s", ds->type); } if (strcmp (vl_def->type_instance, vl->type_instance) != 0) @@ -1003,7 +1068,8 @@ static int add_to_buffer (char *buffer, int buffer_size, strlen (vl->type_instance)) != 0) return (-1); strcpy (vl_def->type_instance, vl->type_instance); - DEBUG ("type_instance = %s", vl->type_instance); + DEBUG ("network plugin: add_to_buffer: type_instance = %s", + vl->type_instance); } if (write_part_values (&buffer, &buffer_size, ds, vl) != 0) @@ -1025,6 +1091,14 @@ static int network_write (const data_set_t *ds, const value_list_t *vl) { int status; + /* If the value is already in the cache, we have received it via the + * network. We write it again if forwarding is activated. It's then in + * the cache and should we receive it again we will ignore it. */ + status = cache_check (ds->type, vl); + if ((network_config_forward == 0) + && (status != 0)) + return (0); + pthread_mutex_lock (&send_buffer_lock); status = add_to_buffer (send_buffer_ptr, @@ -1105,6 +1179,15 @@ static int network_config (const char *key, const char *val) else return (1); } + else if (strcasecmp ("Forward", key) == 0) + { + if ((strcasecmp ("true", value) == 0) + || (strcasecmp ("yes", value) == 0) + || (strcasecmp ("on", value) == 0)) + network_config_forward = 1; + else + network_config_forward = 0; + } else { return (-1); @@ -1127,6 +1210,20 @@ static int network_shutdown (void) listen_thread = 0; + if (cache_tree != NULL) + { + void *key; + void *value; + + while (avl_pick (cache_tree, &key, &value) == 0) + { + sfree (key); + sfree (value); + } + avl_destroy (cache_tree); + cache_tree = NULL; + } + /* TODO: Close `sending_sockets' */ plugin_unregister_config ("network"); @@ -1146,6 +1243,8 @@ static int network_init (void) memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); memset (send_buffer_type, '\0', sizeof (send_buffer_type)); + cache_tree = avl_create ((int (*) (const void *, const void *)) strcmp); + /* setup socket(s) and so on */ if (sending_sockets != NULL) plugin_register_write ("network", network_write); -- 2.11.0