From 3da0db98030fa64111e830d46384bc589f853b46 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 6 Nov 2010 17:39:25 +0100 Subject: [PATCH] zeromq plugin: Make it possible to configure multiple receive threads. --- src/zeromq.c | 202 +++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 119 insertions(+), 83 deletions(-) diff --git a/src/zeromq.c b/src/zeromq.c index aaef23e9..6bb06acc 100644 --- a/src/zeromq.c +++ b/src/zeromq.c @@ -43,6 +43,19 @@ static value_list_t send_buffer_vl = VALUE_LIST_STATIC; static uint64_t stats_values_not_dispatched = 0; static uint64_t stats_values_dispatched = 0; +struct cmq_socket_s +{ + void *socket; + int type; +}; +typedef struct cmq_socket_s cmq_socket_t; + +static int cmq_threads_num = 1; +static void *cmq_context = NULL; + +static pthread_t *receive_thread_ids = NULL; +static size_t receive_thread_num = 0; + /* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 * +-------+-----------------------+-------------------------------+ @@ -834,32 +847,115 @@ static const char *config_keys[] = static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); // config data -static char *zmq_listen_on = NULL; static char *zmq_send_to = NULL; // private data static int thread_running = 1; static pthread_t listen_thread_id; -static void *zmq_context = NULL; static void *push_socket = NULL; -static void *pull_socket = NULL; -// return values: -// 0 => success -// -1 => error -// +static void *receive_thread (void *cmq_socket) /* {{{ */ +{ + int status; + char *data = NULL; + size_t data_size; + + assert (cmq_socket != NULL); + + while (thread_running) + { + zmq_msg_t msg; + + (void) zmq_msg_init (&msg); + + status = zmq_recv (cmq_socket, &msg, /* flags = */ 0); + if (status != 0) + { + if ((errno == EAGAIN) || (errno == EINTR)) + continue; + + ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno)); + break; + } + + data = zmq_msg_data (&msg); + data_size = zmq_msg_size (&msg); + + status = parse_packet (NULL, data, data_size, + /* flags = */ 0, + /* username = */ NULL); + DEBUG("zeromq plugin: received data, parse returned %d", status); + + (void) zmq_msg_close (&msg); + } /* while (thread_running) */ + + DEBUG ("zeromq plugin: Receive thread is terminating."); + (void) zmq_close (cmq_socket); + + return (NULL); +} /* }}} void *receive_thread */ static int my_config (const char *key, const char *value) { - if( strcasecmp(key, "ListenOn") == 0 ) { - if( zmq_listen_on != NULL ) { - free(zmq_listen_on); - zmq_listen_on = NULL; + if (cmq_context == NULL) + { + cmq_context = zmq_init (cmq_threads_num); + if (cmq_context == NULL) + { + ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s", + zmq_strerror (errno)); + return (-1); } - - if( (zmq_listen_on = strdup(value)) == NULL ) { - return 1; + } + + if (strcasecmp(key, "ListenOn") == 0) + { + pthread_t *thread_ptr; + void *cmq_socket; + int status; + + thread_ptr = realloc (receive_thread_ids, + sizeof (*receive_thread_ids) * (receive_thread_num + 1)); + if (thread_ptr == NULL) + { + ERROR ("zeromq plugin: realloc failed."); + return (-1); + } + receive_thread_ids = thread_ptr; + thread_ptr = receive_thread_ids + receive_thread_num; + + cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL); + if (cmq_socket == NULL) + { + ERROR ("zeromq plugin: zmq_socket failed: %s", + zmq_strerror (errno)); + return (-1); + } + + status = zmq_bind (cmq_socket, value); + if (status != 0) + { + ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s", + value, zmq_strerror (errno)); + (void) zmq_close (cmq_socket); + return (-1); } + + status = pthread_create (thread_ptr, + /* attr = */ NULL, + /* func = */ receive_thread, + /* args = */ cmq_socket); + if (status != 0) + { + char errbuf[1024]; + ERROR ("zeromq plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + (void) zmq_close (cmq_socket); + return (-1); + } + + receive_thread_num++; + return (0); } else if( strcasecmp(key, "SendDataTo") == 0 ) { if( zmq_send_to != NULL ) { @@ -876,83 +972,23 @@ static int my_config (const char *key, const char *value) return 0; } -static void *receive_thread (void __attribute__((unused)) *arg) -{ - int status; - zmq_msg_t msg; - char *data = NULL; - size_t data_size; - - while( thread_running ) { - status = zmq_msg_init(&msg); - assert( status == 0 ); - if( zmq_recv(pull_socket, &msg, 0) != 0 ) { - WARNING("zmq_recv : %s", zmq_strerror(errno)); - continue; - } - - data = zmq_msg_data(&msg); - data_size = zmq_msg_size(&msg); - - parse_packet(NULL, data, data_size, - /* flags = */ 0, - /* username = */ NULL); - - DEBUG("ZeroMQ: received data, parse returned %d", status); - - if( zmq_msg_close(&msg) != 0 ) { - ERROR("zmq_msg_close : %s", zmq_strerror(errno)); - } - - } - - return NULL; -} - static int plugin_init (void) { - int major, minor, patch, status; + int major, minor, patch; zmq_version (&major, &minor, &patch); - - // init zeromq (1 I/O thread) - zmq_context = zmq_init(1); - if( zmq_context == NULL ) { + /* init zeromq (1 I/O thread) */ + if (cmq_context == NULL) + cmq_context = zmq_init(1); + + if( cmq_context == NULL ) { ERROR("zmq_init : %s", zmq_strerror(errno)); return 1; } - // start listening socket - if( zmq_listen_on != NULL ) { - pull_socket = zmq_socket(zmq_context, ZMQ_PULL); - if( pull_socket == NULL ) { - ERROR("zmq_socket : %s", zmq_strerror(errno)); - return 1; - } - - // and bind it to the inbound queue - if( zmq_bind(pull_socket, zmq_listen_on) != 0 ) { - ERROR("zmq_bind : %s", zmq_strerror(errno)); - return 1; - } - - // listenining thread - status = pthread_create(&listen_thread_id, - NULL /* no attributes */, - receive_thread, - NULL /* no argument */); - - if( status != 0 ) { - ERROR("network: pthread_create failed: %s", strerror(errno)); - return 1; - } - - INFO("ZeroMQ listening on %s", zmq_listen_on); - } - // start send socket if( zmq_send_to != NULL ) { - push_socket = zmq_socket(zmq_context, ZMQ_PUSH); + push_socket = zmq_socket(cmq_context, ZMQ_PUSH); if( push_socket == NULL ) { ERROR("zmq_socket : %s", zmq_strerror(errno)); @@ -1035,13 +1071,13 @@ static int write_value (const data_set_t *ds, const value_list_t *vl, static int my_shutdown (void) { - if( zmq_context ) { + if( cmq_context ) { thread_running = 0; DEBUG("ZeroMQ: shutting down"); - if( zmq_term(zmq_context) != 0 ) { + if( zmq_term(cmq_context) != 0 ) { ERROR("zmq_term : %s", zmq_strerror(errno)); return 1; } -- 2.11.0