#define PACKET_SIZE 512
+static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
+{
+ char buffer[PACKET_SIZE];
+ char *buffer_ptr = buffer;
+ int buffer_free = sizeof (buffer);
+ int status;
+ zmq_msg_t msg;
+
+ void *cmq_socket = user_data->data;
+
+ memset (buffer, '\0', sizeof (buffer));
+
+
+ status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time);
+ if (status != 0)
+ return (-1);
+
+ status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY, (uint64_t) n->severity);
+ if (status != 0)
+ return (-1);
+
+ if (strlen (n->host) > 0)
+ {
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, n->host, strlen (n->host));
+ if (status != 0)
+ return (-1);
+ }
+
+ if (strlen (n->plugin) > 0)
+ {
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, n->plugin, strlen (n->plugin));
+ if (status != 0)
+ return (-1);
+ }
+
+ if (strlen (n->plugin_instance) > 0)
+ {
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN_INSTANCE, n->plugin_instance,
+ strlen (n->plugin_instance));
+ if (status != 0)
+ return (-1);
+ }
+
+ if (strlen (n->type) > 0)
+ {
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, n->type, strlen (n->type));
+ if (status != 0)
+ return (-1);
+ }
+
+ if (strlen (n->type_instance) > 0)
+ {
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, n->type_instance,
+ strlen (n->type_instance));
+ if (status != 0)
+ return (-1);
+ }
+
+ status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE, n->message, strlen (n->message));
+ if (status != 0)
+ return (-1);
+
+ // zeromq will free the memory when needed by calling the free_data function
+ if( zmq_msg_init_data(&msg, buffer, sizeof(buffer) - buffer_free, free_data, NULL) != 0 ) {
+ ERROR("zmq_msg_init : %s", zmq_strerror(errno));
+ return 1;
+ }
+
+ // try to send the message
+ if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+ if( errno == EAGAIN ) {
+ WARNING("ZeroMQ: Cannot send message, queue is full");
+ }
+ else {
+ ERROR("zmq_send : %s", zmq_strerror(errno));
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
static int write_value (const data_set_t *ds, /* {{{ */
const value_list_t *vl,
user_data_t *user_data)
else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
{
user_data_t ud = { NULL, NULL };
- char name[32];
+ char name[20];
ud.data = cmq_socket;
ud.free_func = cmq_close_callback;
ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
sending_sockets_num++;
-
+
plugin_register_write (name, write_value, &ud);
+
+ ssnprintf (name, sizeof (name), "zeromq/%i/notif", sending_sockets_num);
+
+ plugin_register_notification (name, write_notification, &ud);
}
return (0);
{
plugin_register_complex_config("zeromq", cmq_config);
plugin_register_init("zeromq", plugin_init);
- plugin_register_notification ("network", write_notification,
- /* user_data = */ NULL);
plugin_register_shutdown ("zeromq", my_shutdown);
}