zeromq plugin: Made the configuration a lot more flexible.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 6 Nov 2010 17:32:13 +0000 (18:32 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 6 Nov 2010 17:32:13 +0000 (18:32 +0100)
It is not possible to bind / connect each socket to multiple endpoints.
The code is not tested and probably buggy.

src/zeromq.c

index 6bb06ac..5dabc3f 100644 (file)
@@ -55,6 +55,7 @@ static void *cmq_context = NULL;
 
 static pthread_t *receive_thread_ids = NULL;
 static size_t     receive_thread_num = 0;
+static int        sending_sockets_num = 0;
 
 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
@@ -837,15 +838,6 @@ static int parse_packet (void *se, /* {{{ */
 ////////////////////////////
 
 
-
-
-static const char *config_keys[] =
-{
-  "ListenOn",
-  "SendDataTo"
-};
-static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
-
 // config data
 static char *zmq_send_to = NULL;
 
@@ -854,6 +846,17 @@ static int thread_running = 1;
 static pthread_t listen_thread_id;
 static void *push_socket = NULL;
 
+static void cmq_close_callback (void *socket) /* {{{ */
+{
+  if (socket != NULL)
+    (void) zmq_close (socket);
+} /* }}} void cmq_close_callback */
+
+static void free_data (void *data, void *hint) /* {{{ */
+{
+  free (data);
+} /* }}} void free_data */
+
 static void *receive_thread (void *cmq_socket) /* {{{ */
 {
   int status;
@@ -895,8 +898,86 @@ static void *receive_thread (void *cmq_socket) /* {{{ */
   return (NULL);
 } /* }}} void *receive_thread */
 
-static int my_config (const char *key, const char *value)
+#define PACKET_SIZE   512
+
+static int write_value (const data_set_t *ds, /* {{{ */
+    const value_list_t *vl,
+    user_data_t *user_data)
+{
+  void *cmq_socket = user_data->data;
+
+  zmq_msg_t msg;
+  char      *send_buffer;
+  int       send_buffer_size = PACKET_SIZE, real_size;
+
+  send_buffer = malloc(PACKET_SIZE);
+  if( send_buffer == NULL ) {
+    ERROR("Unable to allocate memory for send_buffer, aborting write");
+    return 1;
+  }
+
+  // empty buffer
+  memset(send_buffer, 0, PACKET_SIZE);
+
+  real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
+
+  // zeromq will free the memory when needed by calling the free_data function
+  if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
+    ERROR("zmq_msg_init : %s", zmq_strerror(errno));
+    return 1;
+  }
+
+  // try to send the message
+  if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+    if( errno == EAGAIN ) {
+      WARNING("ZeroMQ: Cannot send message, queue is full");
+    }
+    else {
+      ERROR("zmq_send : %s", zmq_strerror(errno));
+      return 1;
+    }
+  }
+
+  DEBUG("ZeroMQ: data sent");
+
+  return 0;
+} /* }}} int write_value */
+
+static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
+{
+  char buffer[64] = "";
+  int status;
+
+  status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
+  if (status != 0)
+    return (-1);
+
+  if (strcasecmp ("Publish", buffer) == 0)
+    return (ZMQ_PUB);
+  else if (strcasecmp ("Subscribe", buffer) == 0)
+    return (ZMQ_SUB);
+  else if (strcasecmp ("Push", buffer) == 0)
+    return (ZMQ_PUSH);
+  else if (strcasecmp ("Pull", buffer) == 0)
+    return (ZMQ_PULL);
+  
+  ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
+      buffer);
+  return (-1);
+} /* }}} int cmq_config_mode */
+
+static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
 {
+  int type;
+  int status;
+  int i;
+  int connections_num;
+  void *cmq_socket;
+
+  type = cmq_config_mode (ci);
+  if (type < 0)
+    return (-1);
+
   if (cmq_context == NULL)
   {
     cmq_context = zmq_init (cmq_threads_num);
@@ -908,11 +989,106 @@ static int my_config (const char *key, const char *value)
     }
   }
 
-  if (strcasecmp(key, "ListenOn") == 0)
+  /* Create a new socket */
+  cmq_socket = zmq_socket (cmq_context, type);
+  if (cmq_socket == NULL)
+  {
+    ERROR ("zeromq plugin: zmq_socket failed: %s",
+        zmq_strerror (errno));
+    return (-1);
+  }
+
+  if (type == ZMQ_SUB)
+  {
+    /* Subscribe to all messages */
+    status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
+        /* prefix = */ "", /* prefix length = */ 0);
+    if (status != 0)
+    {
+      ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
+          zmq_strerror (errno));
+      (void) zmq_close (cmq_socket);
+      return (-1);
+    }
+  }
+
+  /* Iterate over all children and do all the binds and connects requested. */
+  connections_num = 0;
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Bind", child->key) == 0)
+    {
+      char *value = NULL;
+
+      if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
+      {
+        ERROR ("zeromq plugin: The \"Bind\" option is not available for "
+            "publish and push sockets. Try \"Connect\".");
+        continue;
+      }
+
+      status = cf_util_get_string (child, &value);
+      if (status != 0)
+        continue;
+
+      status = zmq_bind (cmq_socket, value);
+      if (status != 0)
+      {
+        ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
+            value, zmq_strerror (errno));
+        continue;
+      }
+
+      connections_num++;
+      continue;
+    } /* Bind */
+
+    if (strcasecmp ("Connect", child->key) == 0)
+    {
+      char *value = NULL;
+
+      if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
+      {
+        ERROR ("zeromq plugin: The \"Connect\" option is not available for "
+            "subscribe and pull sockets. Try \"Bind\".");
+        continue;
+      }
+
+      status = cf_util_get_string (child, &value);
+      if (status != 0)
+        continue;
+
+      status = zmq_connect (cmq_socket, value);
+      if (status != 0)
+      {
+        ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
+            value, zmq_strerror (errno));
+        continue;
+      }
+
+      connections_num++;
+      continue;
+    } /* Connect */
+
+    ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
+        child->key);
+  } /* for (i = 0; i < ci->children_num; i++) */
+
+  if (connections_num == 0)
+  {
+    ERROR ("zeromq plugin: No (valid) \"%s\" options were found in this "
+        "\"Socket\" block.",
+        ((type == ZMQ_SUB) || (type == ZMQ_PULL)) ? "Bind" : "Connect");
+    (void) zmq_close (cmq_socket);
+    return (-1);
+  }
+
+  /* If this is a receiving socket, create a new receive thread */
+  if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
   {
     pthread_t *thread_ptr;
-    void *cmq_socket;
-    int status;
 
     thread_ptr = realloc (receive_thread_ids,
         sizeof (*receive_thread_ids) * (receive_thread_num + 1));
@@ -924,23 +1100,6 @@ static int my_config (const char *key, const char *value)
     receive_thread_ids = thread_ptr;
     thread_ptr = receive_thread_ids + receive_thread_num;
 
-    cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL);
-    if (cmq_socket == NULL)
-    {
-      ERROR ("zeromq plugin: zmq_socket failed: %s",
-          zmq_strerror (errno));
-      return (-1);
-    }
-
-    status = zmq_bind (cmq_socket, value);
-    if (status != 0)
-    {
-      ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
-          value, zmq_strerror (errno));
-      (void) zmq_close (cmq_socket);
-      return (-1);
-    }
-
     status = pthread_create (thread_ptr,
         /* attr = */ NULL,
         /* func = */ receive_thread,
@@ -955,22 +1114,66 @@ static int my_config (const char *key, const char *value)
     }
 
     receive_thread_num++;
-    return (0);
   }
-  else if( strcasecmp(key, "SendDataTo") == 0 ) {
-    if( zmq_send_to != NULL ) {
-      free(zmq_send_to);
-      zmq_send_to = NULL;
+
+  /* If this is a sending socket, register a new write function */
+  else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
+  {
+    user_data_t ud = { NULL, NULL };
+    char name[32];
+
+    ud.data = cmq_socket;
+    ud.free_func = cmq_close_callback;
+
+    ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
+    sending_sockets_num++;
+
+    plugin_register_write (name, write_value, &ud);
+  }
+
+  return (0);
+} /* }}} int cmq_config_socket */
+
+/*
+ * Config schema:
+ *
+ * <Plugin "zeromq">
+ *   <Socket Publish>
+ *     Connect "tcp://localhost:6666"
+ *   </Socket>
+ *   <Socket Subscribe>
+ *     Bind "tcp://eth0:6666"
+ *     Bind "tcp://collectd.example.com:6666"
+ *   </Socket>
+ * </Plugin>
+ */
+static int cmq_config (oconfig_item_t *ci) /* {{{ */
+{
+  int status;
+  int i;
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Socket", child->key) == 0)
+      status = cmq_config_socket (child);
+    else if (strcasecmp ("Threads", child->key) == 0)
+    {
+      int tmp = 0;
+      status = cf_util_get_int (child, &tmp);
+      if ((status == 0) && (tmp >= 1))
+        cmq_threads_num = tmp;
     }
-    
-    if( (zmq_send_to = strdup(value)) == NULL ) {
-      return 1;
+    else
+    {
+      WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
+          child->key);
     }
   }
-  
-  
-  return 0;
-}
+
+  return (0);
+} /* }}} int cmq_config */
 
 static int plugin_init (void)
 {
@@ -1018,57 +1221,6 @@ static int write_notification (const notification_t *n, user_data_t __attribute_
   return 0;
 }
 
-void free_data(void *data, void *hint)
-{
-  free(data);
-}
-
-#define PACKET_SIZE   512
-
-static int write_value (const data_set_t *ds, const value_list_t *vl,
-               user_data_t __attribute__((unused)) *user_data)
-{
-  zmq_msg_t msg;
-  char      *send_buffer;
-  int       send_buffer_size = PACKET_SIZE, real_size;
-  
-  // do nothing if no socket open
-  if( zmq_send_to == NULL )
-    return 0;
-  
-  send_buffer = malloc(PACKET_SIZE);
-  if( send_buffer == NULL ) {
-    ERROR("Unable to allocate memory for send_buffer, aborting write");
-    return 1;
-  }
-  
-  // empty buffer
-  memset(send_buffer, 0, PACKET_SIZE);
-  
-  real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
-  
-  // zeromq will free the memory when needed by calling the free_data function
-  if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
-    ERROR("zmq_msg_init : %s", zmq_strerror(errno));
-    return 1;
-  }
-  
-  // try to send the message
-  if( zmq_send(push_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
-    if( errno == EAGAIN ) {
-      WARNING("ZeroMQ: Cannot send message, queue is full");
-    }
-    else {
-      ERROR("zmq_send : %s", zmq_strerror(errno));
-      return 1;
-    }
-  }
-  
-  DEBUG("ZeroMQ: data sent");
-  
-  return 0;
-}
-
 static int my_shutdown (void)
 {
   if( cmq_context ) {
@@ -1090,7 +1242,7 @@ static int my_shutdown (void)
 
 void module_register (void)
 {
-  plugin_register_config("zeromq", my_config, config_keys, config_keys_num);
+  plugin_register_complex_config("zeromq", cmq_config);
   plugin_register_init("zeromq", plugin_init);
   plugin_register_write("zeromq", write_value,
       /* user_data = */ NULL);