zeromq plugin: Make it possible to configure multiple receive threads.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 6 Nov 2010 16:39:25 +0000 (17:39 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 6 Nov 2010 16:39:25 +0000 (17:39 +0100)
src/zeromq.c

index aaef23e..6bb06ac 100644 (file)
@@ -43,6 +43,19 @@ static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
 static uint64_t stats_values_not_dispatched = 0;
 static uint64_t stats_values_dispatched = 0;
 
+struct cmq_socket_s
+{
+       void *socket;
+       int type;
+};
+typedef struct cmq_socket_s cmq_socket_t;
+
+static int cmq_threads_num = 1;
+static void *cmq_context = NULL;
+
+static pthread_t *receive_thread_ids = NULL;
+static size_t     receive_thread_num = 0;
+
 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  * +-------+-----------------------+-------------------------------+
@@ -834,32 +847,115 @@ static const char *config_keys[] =
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
 // config data
-static char *zmq_listen_on = NULL;
 static char *zmq_send_to = NULL;
 
 // private data
 static int thread_running = 1;
 static pthread_t listen_thread_id;
-static void *zmq_context = NULL;
 static void *push_socket = NULL;
-static void *pull_socket = NULL;
 
-// return values:
-// 0  => success
-// -1 => error
-//
+static void *receive_thread (void *cmq_socket) /* {{{ */
+{
+  int status;
+  char *data = NULL;
+  size_t data_size;
+
+  assert (cmq_socket != NULL);
+
+  while (thread_running)
+  {
+    zmq_msg_t msg;
+
+    (void) zmq_msg_init (&msg);
+
+    status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
+    if (status != 0)
+    {
+      if ((errno == EAGAIN) || (errno == EINTR))
+        continue;
+
+      ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
+      break;
+    }
+
+    data = zmq_msg_data (&msg);
+    data_size = zmq_msg_size (&msg);
+
+    status = parse_packet (NULL, data, data_size,
+        /* flags = */ 0,
+        /* username = */ NULL);
+    DEBUG("zeromq plugin: received data, parse returned %d", status);
+
+    (void) zmq_msg_close (&msg);
+  } /* while (thread_running) */
+
+  DEBUG ("zeromq plugin: Receive thread is terminating.");
+  (void) zmq_close (cmq_socket);
+  
+  return (NULL);
+} /* }}} void *receive_thread */
 
 static int my_config (const char *key, const char *value)
 {
-  if( strcasecmp(key, "ListenOn") == 0 ) {
-    if( zmq_listen_on != NULL ) {
-      free(zmq_listen_on);
-      zmq_listen_on = NULL;
+  if (cmq_context == NULL)
+  {
+    cmq_context = zmq_init (cmq_threads_num);
+    if (cmq_context == NULL)
+    {
+      ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
+          zmq_strerror (errno));
+      return (-1);
     }
-    
-    if( (zmq_listen_on = strdup(value)) == NULL ) {
-      return 1;
+  }
+
+  if (strcasecmp(key, "ListenOn") == 0)
+  {
+    pthread_t *thread_ptr;
+    void *cmq_socket;
+    int status;
+
+    thread_ptr = realloc (receive_thread_ids,
+        sizeof (*receive_thread_ids) * (receive_thread_num + 1));
+    if (thread_ptr == NULL)
+    {
+      ERROR ("zeromq plugin: realloc failed.");
+      return (-1);
+    }
+    receive_thread_ids = thread_ptr;
+    thread_ptr = receive_thread_ids + receive_thread_num;
+
+    cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL);
+    if (cmq_socket == NULL)
+    {
+      ERROR ("zeromq plugin: zmq_socket failed: %s",
+          zmq_strerror (errno));
+      return (-1);
+    }
+
+    status = zmq_bind (cmq_socket, value);
+    if (status != 0)
+    {
+      ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
+          value, zmq_strerror (errno));
+      (void) zmq_close (cmq_socket);
+      return (-1);
     }
+
+    status = pthread_create (thread_ptr,
+        /* attr = */ NULL,
+        /* func = */ receive_thread,
+        /* args = */ cmq_socket);
+    if (status != 0)
+    {
+      char errbuf[1024];
+      ERROR ("zeromq plugin: pthread_create failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      (void) zmq_close (cmq_socket);
+      return (-1);
+    }
+
+    receive_thread_num++;
+    return (0);
   }
   else if( strcasecmp(key, "SendDataTo") == 0 ) {
     if( zmq_send_to != NULL ) {
@@ -876,83 +972,23 @@ static int my_config (const char *key, const char *value)
   return 0;
 }
 
-static void *receive_thread (void __attribute__((unused)) *arg)
-{
-  int status;
-  zmq_msg_t msg;
-  char *data = NULL;
-  size_t data_size;
-  
-  while( thread_running ) {
-    status = zmq_msg_init(&msg);
-    assert( status == 0 );
-    if( zmq_recv(pull_socket, &msg, 0) != 0 ) {
-      WARNING("zmq_recv : %s", zmq_strerror(errno));
-      continue;
-    }
-    
-    data = zmq_msg_data(&msg);
-    data_size = zmq_msg_size(&msg);
-    
-    parse_packet(NULL, data, data_size,
-      /* flags = */ 0,
-      /* username = */ NULL);
-    
-    DEBUG("ZeroMQ: received data, parse returned %d", status);
-    
-    if( zmq_msg_close(&msg) != 0 ) {
-      ERROR("zmq_msg_close : %s", zmq_strerror(errno));
-    }
-    
-  }
-  
-  return NULL;
-}
-
 static int plugin_init (void)
 {
-  int major, minor, patch, status;
+  int major, minor, patch;
   zmq_version (&major, &minor, &patch);
   
-  
-  // init zeromq (1 I/O thread)
-  zmq_context = zmq_init(1);
-  if( zmq_context == NULL ) {
+  /* init zeromq (1 I/O thread) */
+  if (cmq_context == NULL)
+    cmq_context = zmq_init(1);
+
+  if( cmq_context == NULL ) {
     ERROR("zmq_init : %s", zmq_strerror(errno));
     return 1;
   }
   
-  // start listening socket
-  if( zmq_listen_on != NULL ) {
-    pull_socket = zmq_socket(zmq_context, ZMQ_PULL);
-    if( pull_socket == NULL ) {
-      ERROR("zmq_socket : %s", zmq_strerror(errno));
-      return 1;
-    }
-    
-    // and bind it to the inbound queue
-    if( zmq_bind(pull_socket, zmq_listen_on) != 0 ) {
-      ERROR("zmq_bind : %s", zmq_strerror(errno));
-      return 1;
-    }
-    
-    // listenining thread
-    status = pthread_create(&listen_thread_id, 
-      NULL /* no attributes */,
-      receive_thread,
-      NULL /* no argument */);
-    
-    if( status != 0 ) {
-      ERROR("network: pthread_create failed: %s", strerror(errno));
-      return 1;
-    }
-    
-    INFO("ZeroMQ listening on %s", zmq_listen_on);
-  }
-  
   // start send socket
   if( zmq_send_to != NULL ) {
-    push_socket = zmq_socket(zmq_context, ZMQ_PUSH);
+    push_socket = zmq_socket(cmq_context, ZMQ_PUSH);
     
     if( push_socket == NULL ) {
       ERROR("zmq_socket : %s", zmq_strerror(errno));
@@ -1035,13 +1071,13 @@ static int write_value (const data_set_t *ds, const value_list_t *vl,
 
 static int my_shutdown (void)
 {
-  if( zmq_context ) {
+  if( cmq_context ) {
     
     thread_running = 0;
     
     DEBUG("ZeroMQ: shutting down");
     
-    if( zmq_term(zmq_context) != 0 ) {
+    if( zmq_term(cmq_context) != 0 ) {
       ERROR("zmq_term : %s", zmq_strerror(errno));
       return 1;
     }