zeromq plugin: added notifications support
authorJulien Ammous <j.ammous@gmail.com>
Sat, 13 Nov 2010 17:30:20 +0000 (18:30 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 16 Nov 2010 10:04:39 +0000 (11:04 +0100)
src/zeromq.c

index 69c1e26..bc672dc 100644 (file)
@@ -110,6 +110,88 @@ static void *receive_thread (void *cmq_socket) /* {{{ */
 
 #define PACKET_SIZE   512
 
+static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
+{
+  char        buffer[PACKET_SIZE];
+  char       *buffer_ptr = buffer;
+  int         buffer_free = sizeof (buffer);
+  int         status;
+  zmq_msg_t   msg;
+  
+  void *cmq_socket = user_data->data;
+
+  memset (buffer, '\0', sizeof (buffer));
+
+
+  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time);
+  if (status != 0)
+    return (-1);
+
+  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY, (uint64_t) n->severity);
+  if (status != 0)
+    return (-1);
+
+  if (strlen (n->host) > 0)
+  {
+    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, n->host, strlen (n->host));
+    if (status != 0)
+      return (-1);
+  }
+
+  if (strlen (n->plugin) > 0)
+  {
+    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, n->plugin, strlen (n->plugin));
+    if (status != 0)
+      return (-1);
+  }
+
+  if (strlen (n->plugin_instance) > 0)
+  {
+    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN_INSTANCE, n->plugin_instance,
+      strlen (n->plugin_instance));
+    if (status != 0)
+      return (-1);
+  }
+
+  if (strlen (n->type) > 0)
+  {
+    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, n->type, strlen (n->type));
+    if (status != 0)
+      return (-1);
+  }
+
+  if (strlen (n->type_instance) > 0)
+  {
+    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, n->type_instance,
+      strlen (n->type_instance));
+    if (status != 0)
+      return (-1);
+  }
+
+  status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE, n->message, strlen (n->message));
+  if (status != 0)
+    return (-1);
+  
+  // zeromq will free the memory when needed by calling the free_data function
+  if( zmq_msg_init_data(&msg, buffer, sizeof(buffer) - buffer_free, free_data, NULL) != 0 ) {
+    ERROR("zmq_msg_init : %s", zmq_strerror(errno));
+    return 1;
+  }
+
+  // try to send the message
+  if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+    if( errno == EAGAIN ) {
+      WARNING("ZeroMQ: Cannot send message, queue is full");
+    }
+    else {
+      ERROR("zmq_send : %s", zmq_strerror(errno));
+      return 1;
+    }
+  }
+  
+  return 0;
+}
+
 static int write_value (const data_set_t *ds, /* {{{ */
     const value_list_t *vl,
     user_data_t *user_data)
@@ -320,15 +402,19 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
   else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
   {
     user_data_t ud = { NULL, NULL };
-    char name[32];
+    char name[20];
 
     ud.data = cmq_socket;
     ud.free_func = cmq_close_callback;
 
     ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
     sending_sockets_num++;
-
+    
     plugin_register_write (name, write_value, &ud);
+    
+    ssnprintf (name, sizeof (name), "zeromq/%i/notif", sending_sockets_num);
+    
+    plugin_register_notification (name, write_notification, &ud);
   }
 
   return (0);
@@ -406,8 +492,6 @@ void module_register (void)
 {
   plugin_register_complex_config("zeromq", cmq_config);
   plugin_register_init("zeromq", plugin_init);
-  plugin_register_notification ("network", write_notification,
-      /* user_data = */ NULL);
   plugin_register_shutdown ("zeromq", my_shutdown);
 }