static uint64_t stats_values_not_dispatched = 0;
static uint64_t stats_values_dispatched = 0;
+struct cmq_socket_s
+{
+ void *socket;
+ int type;
+};
+typedef struct cmq_socket_s cmq_socket_t;
+
+static int cmq_threads_num = 1;
+static void *cmq_context = NULL;
+
+static pthread_t *receive_thread_ids = NULL;
+static size_t receive_thread_num = 0;
+
/* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-------+-----------------------+-------------------------------+
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
// config data
-static char *zmq_listen_on = NULL;
static char *zmq_send_to = NULL;
// private data
static int thread_running = 1;
static pthread_t listen_thread_id;
-static void *zmq_context = NULL;
static void *push_socket = NULL;
-static void *pull_socket = NULL;
-// return values:
-// 0 => success
-// -1 => error
-//
+static void *receive_thread (void *cmq_socket) /* {{{ */
+{
+ int status;
+ char *data = NULL;
+ size_t data_size;
+
+ assert (cmq_socket != NULL);
+
+ while (thread_running)
+ {
+ zmq_msg_t msg;
+
+ (void) zmq_msg_init (&msg);
+
+ status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
+ if (status != 0)
+ {
+ if ((errno == EAGAIN) || (errno == EINTR))
+ continue;
+
+ ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
+ break;
+ }
+
+ data = zmq_msg_data (&msg);
+ data_size = zmq_msg_size (&msg);
+
+ status = parse_packet (NULL, data, data_size,
+ /* flags = */ 0,
+ /* username = */ NULL);
+ DEBUG("zeromq plugin: received data, parse returned %d", status);
+
+ (void) zmq_msg_close (&msg);
+ } /* while (thread_running) */
+
+ DEBUG ("zeromq plugin: Receive thread is terminating.");
+ (void) zmq_close (cmq_socket);
+
+ return (NULL);
+} /* }}} void *receive_thread */
static int my_config (const char *key, const char *value)
{
- if( strcasecmp(key, "ListenOn") == 0 ) {
- if( zmq_listen_on != NULL ) {
- free(zmq_listen_on);
- zmq_listen_on = NULL;
+ if (cmq_context == NULL)
+ {
+ cmq_context = zmq_init (cmq_threads_num);
+ if (cmq_context == NULL)
+ {
+ ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
+ zmq_strerror (errno));
+ return (-1);
}
-
- if( (zmq_listen_on = strdup(value)) == NULL ) {
- return 1;
+ }
+
+ if (strcasecmp(key, "ListenOn") == 0)
+ {
+ pthread_t *thread_ptr;
+ void *cmq_socket;
+ int status;
+
+ thread_ptr = realloc (receive_thread_ids,
+ sizeof (*receive_thread_ids) * (receive_thread_num + 1));
+ if (thread_ptr == NULL)
+ {
+ ERROR ("zeromq plugin: realloc failed.");
+ return (-1);
+ }
+ receive_thread_ids = thread_ptr;
+ thread_ptr = receive_thread_ids + receive_thread_num;
+
+ cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL);
+ if (cmq_socket == NULL)
+ {
+ ERROR ("zeromq plugin: zmq_socket failed: %s",
+ zmq_strerror (errno));
+ return (-1);
+ }
+
+ status = zmq_bind (cmq_socket, value);
+ if (status != 0)
+ {
+ ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
+ value, zmq_strerror (errno));
+ (void) zmq_close (cmq_socket);
+ return (-1);
}
+
+ status = pthread_create (thread_ptr,
+ /* attr = */ NULL,
+ /* func = */ receive_thread,
+ /* args = */ cmq_socket);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("zeromq plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ (void) zmq_close (cmq_socket);
+ return (-1);
+ }
+
+ receive_thread_num++;
+ return (0);
}
else if( strcasecmp(key, "SendDataTo") == 0 ) {
if( zmq_send_to != NULL ) {
return 0;
}
-static void *receive_thread (void __attribute__((unused)) *arg)
-{
- int status;
- zmq_msg_t msg;
- char *data = NULL;
- size_t data_size;
-
- while( thread_running ) {
- status = zmq_msg_init(&msg);
- assert( status == 0 );
- if( zmq_recv(pull_socket, &msg, 0) != 0 ) {
- WARNING("zmq_recv : %s", zmq_strerror(errno));
- continue;
- }
-
- data = zmq_msg_data(&msg);
- data_size = zmq_msg_size(&msg);
-
- parse_packet(NULL, data, data_size,
- /* flags = */ 0,
- /* username = */ NULL);
-
- DEBUG("ZeroMQ: received data, parse returned %d", status);
-
- if( zmq_msg_close(&msg) != 0 ) {
- ERROR("zmq_msg_close : %s", zmq_strerror(errno));
- }
-
- }
-
- return NULL;
-}
-
static int plugin_init (void)
{
- int major, minor, patch, status;
+ int major, minor, patch;
zmq_version (&major, &minor, &patch);
-
- // init zeromq (1 I/O thread)
- zmq_context = zmq_init(1);
- if( zmq_context == NULL ) {
+ /* init zeromq (1 I/O thread) */
+ if (cmq_context == NULL)
+ cmq_context = zmq_init(1);
+
+ if( cmq_context == NULL ) {
ERROR("zmq_init : %s", zmq_strerror(errno));
return 1;
}
- // start listening socket
- if( zmq_listen_on != NULL ) {
- pull_socket = zmq_socket(zmq_context, ZMQ_PULL);
- if( pull_socket == NULL ) {
- ERROR("zmq_socket : %s", zmq_strerror(errno));
- return 1;
- }
-
- // and bind it to the inbound queue
- if( zmq_bind(pull_socket, zmq_listen_on) != 0 ) {
- ERROR("zmq_bind : %s", zmq_strerror(errno));
- return 1;
- }
-
- // listenining thread
- status = pthread_create(&listen_thread_id,
- NULL /* no attributes */,
- receive_thread,
- NULL /* no argument */);
-
- if( status != 0 ) {
- ERROR("network: pthread_create failed: %s", strerror(errno));
- return 1;
- }
-
- INFO("ZeroMQ listening on %s", zmq_listen_on);
- }
-
// start send socket
if( zmq_send_to != NULL ) {
- push_socket = zmq_socket(zmq_context, ZMQ_PUSH);
+ push_socket = zmq_socket(cmq_context, ZMQ_PUSH);
if( push_socket == NULL ) {
ERROR("zmq_socket : %s", zmq_strerror(errno));
static int my_shutdown (void)
{
- if( zmq_context ) {
+ if( cmq_context ) {
thread_running = 0;
DEBUG("ZeroMQ: shutting down");
- if( zmq_term(zmq_context) != 0 ) {
+ if( zmq_term(cmq_context) != 0 ) {
ERROR("zmq_term : %s", zmq_strerror(errno));
return 1;
}