From ad69e91032f89f605899fc41545d6dd96b6b53f4 Mon Sep 17 00:00:00 2001 From: Julien Ammous Date: Sat, 13 Nov 2010 18:30:20 +0100 Subject: [PATCH] zeromq plugin: added notifications support --- src/zeromq.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/src/zeromq.c b/src/zeromq.c index 69c1e26b..bc672dc2 100644 --- a/src/zeromq.c +++ b/src/zeromq.c @@ -110,6 +110,88 @@ static void *receive_thread (void *cmq_socket) /* {{{ */ #define PACKET_SIZE 512 +static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data) +{ + char buffer[PACKET_SIZE]; + char *buffer_ptr = buffer; + int buffer_free = sizeof (buffer); + int status; + zmq_msg_t msg; + + void *cmq_socket = user_data->data; + + memset (buffer, '\0', sizeof (buffer)); + + + status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time); + if (status != 0) + return (-1); + + status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY, (uint64_t) n->severity); + if (status != 0) + return (-1); + + if (strlen (n->host) > 0) + { + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, n->host, strlen (n->host)); + if (status != 0) + return (-1); + } + + if (strlen (n->plugin) > 0) + { + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, n->plugin, strlen (n->plugin)); + if (status != 0) + return (-1); + } + + if (strlen (n->plugin_instance) > 0) + { + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN_INSTANCE, n->plugin_instance, + strlen (n->plugin_instance)); + if (status != 0) + return (-1); + } + + if (strlen (n->type) > 0) + { + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, n->type, strlen (n->type)); + if (status != 0) + return (-1); + } + + if (strlen (n->type_instance) > 0) + { + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, n->type_instance, + strlen (n->type_instance)); + if (status != 0) + return (-1); + } + + status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE, n->message, strlen (n->message)); + if (status != 0) + return (-1); + + // zeromq will free the memory when needed by calling the free_data function + if( zmq_msg_init_data(&msg, buffer, sizeof(buffer) - buffer_free, free_data, NULL) != 0 ) { + ERROR("zmq_msg_init : %s", zmq_strerror(errno)); + return 1; + } + + // try to send the message + if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) { + if( errno == EAGAIN ) { + WARNING("ZeroMQ: Cannot send message, queue is full"); + } + else { + ERROR("zmq_send : %s", zmq_strerror(errno)); + return 1; + } + } + + return 0; +} + static int write_value (const data_set_t *ds, /* {{{ */ const value_list_t *vl, user_data_t *user_data) @@ -320,15 +402,19 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */ else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH)) { user_data_t ud = { NULL, NULL }; - char name[32]; + char name[20]; ud.data = cmq_socket; ud.free_func = cmq_close_callback; ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num); sending_sockets_num++; - + plugin_register_write (name, write_value, &ud); + + ssnprintf (name, sizeof (name), "zeromq/%i/notif", sending_sockets_num); + + plugin_register_notification (name, write_notification, &ud); } return (0); @@ -406,8 +492,6 @@ void module_register (void) { plugin_register_complex_config("zeromq", cmq_config); plugin_register_init("zeromq", plugin_init); - plugin_register_notification ("network", write_notification, - /* user_data = */ NULL); plugin_register_shutdown ("zeromq", my_shutdown); } -- 2.11.0