From b46550e9b0bfa67824e0fa30e8d8cbc1efa5cedd Mon Sep 17 00:00:00 2001 From: Julien Ammous Date: Wed, 27 Oct 2010 21:30:33 +0200 Subject: [PATCH] first working version --- src/zeromq.c | 171 +++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 96 insertions(+), 75 deletions(-) diff --git a/src/zeromq.c b/src/zeromq.c index 070c2503..70fa3064 100644 --- a/src/zeromq.c +++ b/src/zeromq.c @@ -26,6 +26,8 @@ #include "collectd.h" #include "common.h" /* auxiliary functions */ #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */ +#include "utils_cache.h" +#include "network.h" #include #include @@ -34,20 +36,58 @@ static value_list_t send_buffer_vl = VALUE_LIST_STATIC; +static uint64_t stats_values_not_dispatched = 0; +static uint64_t stats_values_dispatched = 0; + +/* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-------+-----------------------+-------------------------------+ + * ! Ver. ! ! Length ! + * +-------+-----------------------+-------------------------------+ + */ +struct part_header_s +{ + uint16_t type; + uint16_t length; +}; +typedef struct part_header_s part_header_t; + // we do not want to crypt here #undef HAVE_LIBGCRYPT +static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */ +{ + uint64_t time_sent = 0; + int status; + + status = uc_meta_data_get_unsigned_int (vl, + "network:time_sent", &time_sent); + + /* This is a value we already sent. Don't allow it to be received again in + * order to avoid looping. */ + if ((status == 0) && (time_sent >= ((uint64_t) vl->time))) + return (false); + + return (true); +} /* }}} _Bool check_receive_okay */ + static int network_dispatch_values (value_list_t *vl, /* {{{ */ const char *username) { int status; + + // DEBUG("host: %s", vl->host); + // DEBUG("plugin: %s", vl->plugin); + // DEBUG("plugin_instance: %s", vl->plugin_instance); + // DEBUG("type: %s", vl->type); + // DEBUG("type_instance: %s", vl->type_instance); if ((vl->time <= 0) || (strlen (vl->host) <= 0) || (strlen (vl->plugin) <= 0) || (strlen (vl->type) <= 0)) return (-EINVAL); - + if (!check_receive_okay (vl)) { #if COLLECT_DEBUG @@ -90,7 +130,8 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */ return (status); } } - + + // DEBUG("dispatching %d values", vl->values_len); plugin_dispatch_values (vl); stats_values_dispatched++; @@ -390,64 +431,27 @@ static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */ const data_set_t *ds, const value_list_t *vl) { char *buffer_orig = buffer; - - if (strcmp (vl_def->host, vl->host) != 0) - { - if (write_part_string (&buffer, &buffer_size, TYPE_HOST, - vl->host, strlen (vl->host)) != 0) - return (-1); - sstrncpy (vl_def->host, vl->host, sizeof (vl_def->host)); - } - - if (vl_def->time != vl->time) - { - if (write_part_number (&buffer, &buffer_size, TYPE_TIME, - (uint64_t) vl->time)) - return (-1); - vl_def->time = vl->time; - } - - if (vl_def->interval != vl->interval) - { - if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, - (uint64_t) vl->interval)) - return (-1); - vl_def->interval = vl->interval; - } - - if (strcmp (vl_def->plugin, vl->plugin) != 0) - { - if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, - vl->plugin, strlen (vl->plugin)) != 0) - return (-1); - sstrncpy (vl_def->plugin, vl->plugin, sizeof (vl_def->plugin)); - } - - if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0) - { - if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, - vl->plugin_instance, - strlen (vl->plugin_instance)) != 0) - return (-1); - sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance)); - } - - if (strcmp (vl_def->type, vl->type) != 0) - { - if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, - vl->type, strlen (vl->type)) != 0) - return (-1); - sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type)); - } - - if (strcmp (vl_def->type_instance, vl->type_instance) != 0) - { - if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, - vl->type_instance, - strlen (vl->type_instance)) != 0) - return (-1); - sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance)); - } + + if (write_part_string (&buffer, &buffer_size, TYPE_HOST, vl->host, strlen (vl->host)) != 0) + return (-1); + + if (write_part_number (&buffer, &buffer_size, TYPE_TIME, (uint64_t) vl->time)) + return (-1); + + if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, (uint64_t) vl->interval)) + return (-1); + + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, vl->plugin, strlen (vl->plugin)) != 0) + return (-1); + + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0) + return (-1); + + if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0) + return (-1); + + if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0) + return (-1); if (write_part_values (&buffer, &buffer_size, ds, vl) != 0) return (-1); @@ -580,7 +584,7 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, } /* int parse_part_string */ -static int parse_packet (sockent_t *se, /* {{{ */ +static int parse_packet (void *se, /* {{{ */ void *buffer, size_t buffer_size, int flags, const char *username) { @@ -624,15 +628,15 @@ static int parse_packet (sockent_t *se, /* {{{ */ if (pkg_type == TYPE_ENCR_AES256) { - status = parse_part_encr_aes256 (se, - &buffer, &buffer_size, flags); - if (status != 0) - { + // status = parse_part_encr_aes256 (se, + // &buffer, &buffer_size, flags); + // if (status != 0) + // { ERROR ("network plugin: Decrypting AES256 " "part failed " "with status %i.", status); break; - } + // } } #if HAVE_LIBGCRYPT else if ((se->data.server.security_level == SECURITY_LEVEL_ENCRYPT) @@ -650,15 +654,15 @@ static int parse_packet (sockent_t *se, /* {{{ */ #endif /* HAVE_LIBGCRYPT */ else if (pkg_type == TYPE_SIGN_SHA256) { - status = parse_part_sign_sha256 (se, - &buffer, &buffer_size, flags); - if (status != 0) - { + // status = parse_part_sign_sha256 (se, + // &buffer, &buffer_size, flags); + // if (status != 0) + // { ERROR ("network plugin: Verifying HMAC-SHA-256 " "signature failed " "with status %i.", status); break; - } + // } } #if HAVE_LIBGCRYPT else if ((se->data.server.security_level == SECURITY_LEVEL_SIGN) @@ -849,7 +853,7 @@ static int my_config (const char *key, const char *value) zmq_listen_on = NULL; } - if( zmq_listen_on = strdup(value)) == NULL ) { + if( (zmq_listen_on = strdup(value)) == NULL ) { return 1; } } @@ -859,7 +863,7 @@ static int my_config (const char *key, const char *value) zmq_send_to = NULL; } - if( zmq_send_to = strdup(value)) == NULL ) { + if( (zmq_send_to = strdup(value)) == NULL ) { return 1; } } @@ -878,7 +882,7 @@ static void *receive_thread (void __attribute__((unused)) *arg) while( thread_running ) { status = zmq_msg_init(&msg); assert( status == 0 ); - if( zmq_recv(&pull_socket, &msg, 0) != 0 ) { + if( zmq_recv(pull_socket, &msg, 0) != 0 ) { WARNING("zmq_recv : %s", zmq_strerror(errno)); continue; } @@ -890,8 +894,15 @@ static void *receive_thread (void __attribute__((unused)) *arg) /* flags = */ 0, /* username = */ NULL); + DEBUG("ZeroMQ: received data, parse returned %d", status); + + if( zmq_msg_close(&msg) != 0 ) { + ERROR("zmq_msg_close : %s", zmq_strerror(errno)); + } } + + return NULL; } static int plugin_init (void) @@ -963,6 +974,7 @@ static int plugin_init (void) static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data) { + DEBUG("ZeroMQ: received notification, not implemented yet"); return 0; } @@ -980,6 +992,10 @@ static int write_value (const data_set_t *ds, const value_list_t *vl, char *send_buffer; int send_buffer_size = PACKET_SIZE, real_size; + // do nothing if no socket open + if( zmq_send_to == NULL ) + return 0; + send_buffer = malloc(PACKET_SIZE); if( send_buffer == NULL ) { ERROR("Unable to allocate memory for send_buffer, aborting write"); @@ -1008,6 +1024,8 @@ static int write_value (const data_set_t *ds, const value_list_t *vl, } } + DEBUG("ZeroMQ: data sent"); + return 0; } @@ -1016,12 +1034,15 @@ static int my_shutdown (void) if( zmq_context ) { thread_running = 0; - pthread_join(listen_thread_id, NULL); + + DEBUG("ZeroMQ: shutting down"); if( zmq_term(zmq_context) != 0 ) { ERROR("zmq_term : %s", zmq_strerror(errno)); return 1; } + + pthread_join(listen_thread_id, NULL); } return 0; -- 2.11.0