From fe93da5fe8f84b486fcc60e0ae713e6333f1da35 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 6 Nov 2010 18:32:13 +0100 Subject: [PATCH] zeromq plugin: Made the configuration a lot more flexible. It is not possible to bind / connect each socket to multiple endpoints. The code is not tested and probably buggy. --- src/zeromq.c | 340 ++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 246 insertions(+), 94 deletions(-) diff --git a/src/zeromq.c b/src/zeromq.c index 6bb06acc..5dabc3f9 100644 --- a/src/zeromq.c +++ b/src/zeromq.c @@ -55,6 +55,7 @@ static void *cmq_context = NULL; static pthread_t *receive_thread_ids = NULL; static size_t receive_thread_num = 0; +static int sending_sockets_num = 0; /* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 @@ -837,15 +838,6 @@ static int parse_packet (void *se, /* {{{ */ //////////////////////////// - - -static const char *config_keys[] = -{ - "ListenOn", - "SendDataTo" -}; -static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); - // config data static char *zmq_send_to = NULL; @@ -854,6 +846,17 @@ static int thread_running = 1; static pthread_t listen_thread_id; static void *push_socket = NULL; +static void cmq_close_callback (void *socket) /* {{{ */ +{ + if (socket != NULL) + (void) zmq_close (socket); +} /* }}} void cmq_close_callback */ + +static void free_data (void *data, void *hint) /* {{{ */ +{ + free (data); +} /* }}} void free_data */ + static void *receive_thread (void *cmq_socket) /* {{{ */ { int status; @@ -895,8 +898,86 @@ static void *receive_thread (void *cmq_socket) /* {{{ */ return (NULL); } /* }}} void *receive_thread */ -static int my_config (const char *key, const char *value) +#define PACKET_SIZE 512 + +static int write_value (const data_set_t *ds, /* {{{ */ + const value_list_t *vl, + user_data_t *user_data) +{ + void *cmq_socket = user_data->data; + + zmq_msg_t msg; + char *send_buffer; + int send_buffer_size = PACKET_SIZE, real_size; + + send_buffer = malloc(PACKET_SIZE); + if( send_buffer == NULL ) { + ERROR("Unable to allocate memory for send_buffer, aborting write"); + return 1; + } + + // empty buffer + memset(send_buffer, 0, PACKET_SIZE); + + real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl); + + // zeromq will free the memory when needed by calling the free_data function + if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) { + ERROR("zmq_msg_init : %s", zmq_strerror(errno)); + return 1; + } + + // try to send the message + if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) { + if( errno == EAGAIN ) { + WARNING("ZeroMQ: Cannot send message, queue is full"); + } + else { + ERROR("zmq_send : %s", zmq_strerror(errno)); + return 1; + } + } + + DEBUG("ZeroMQ: data sent"); + + return 0; +} /* }}} int write_value */ + +static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */ +{ + char buffer[64] = ""; + int status; + + status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer)); + if (status != 0) + return (-1); + + if (strcasecmp ("Publish", buffer) == 0) + return (ZMQ_PUB); + else if (strcasecmp ("Subscribe", buffer) == 0) + return (ZMQ_SUB); + else if (strcasecmp ("Push", buffer) == 0) + return (ZMQ_PUSH); + else if (strcasecmp ("Pull", buffer) == 0) + return (ZMQ_PULL); + + ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"", + buffer); + return (-1); +} /* }}} int cmq_config_mode */ + +static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */ { + int type; + int status; + int i; + int connections_num; + void *cmq_socket; + + type = cmq_config_mode (ci); + if (type < 0) + return (-1); + if (cmq_context == NULL) { cmq_context = zmq_init (cmq_threads_num); @@ -908,11 +989,106 @@ static int my_config (const char *key, const char *value) } } - if (strcasecmp(key, "ListenOn") == 0) + /* Create a new socket */ + cmq_socket = zmq_socket (cmq_context, type); + if (cmq_socket == NULL) + { + ERROR ("zeromq plugin: zmq_socket failed: %s", + zmq_strerror (errno)); + return (-1); + } + + if (type == ZMQ_SUB) + { + /* Subscribe to all messages */ + status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE, + /* prefix = */ "", /* prefix length = */ 0); + if (status != 0) + { + ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s", + zmq_strerror (errno)); + (void) zmq_close (cmq_socket); + return (-1); + } + } + + /* Iterate over all children and do all the binds and connects requested. */ + connections_num = 0; + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Bind", child->key) == 0) + { + char *value = NULL; + + if ((type == ZMQ_PUB) || (type == ZMQ_PUSH)) + { + ERROR ("zeromq plugin: The \"Bind\" option is not available for " + "publish and push sockets. Try \"Connect\"."); + continue; + } + + status = cf_util_get_string (child, &value); + if (status != 0) + continue; + + status = zmq_bind (cmq_socket, value); + if (status != 0) + { + ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s", + value, zmq_strerror (errno)); + continue; + } + + connections_num++; + continue; + } /* Bind */ + + if (strcasecmp ("Connect", child->key) == 0) + { + char *value = NULL; + + if ((type == ZMQ_SUB) || (type == ZMQ_PULL)) + { + ERROR ("zeromq plugin: The \"Connect\" option is not available for " + "subscribe and pull sockets. Try \"Bind\"."); + continue; + } + + status = cf_util_get_string (child, &value); + if (status != 0) + continue; + + status = zmq_connect (cmq_socket, value); + if (status != 0) + { + ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s", + value, zmq_strerror (errno)); + continue; + } + + connections_num++; + continue; + } /* Connect */ + + ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.", + child->key); + } /* for (i = 0; i < ci->children_num; i++) */ + + if (connections_num == 0) + { + ERROR ("zeromq plugin: No (valid) \"%s\" options were found in this " + "\"Socket\" block.", + ((type == ZMQ_SUB) || (type == ZMQ_PULL)) ? "Bind" : "Connect"); + (void) zmq_close (cmq_socket); + return (-1); + } + + /* If this is a receiving socket, create a new receive thread */ + if ((type == ZMQ_SUB) || (type == ZMQ_PULL)) { pthread_t *thread_ptr; - void *cmq_socket; - int status; thread_ptr = realloc (receive_thread_ids, sizeof (*receive_thread_ids) * (receive_thread_num + 1)); @@ -924,23 +1100,6 @@ static int my_config (const char *key, const char *value) receive_thread_ids = thread_ptr; thread_ptr = receive_thread_ids + receive_thread_num; - cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL); - if (cmq_socket == NULL) - { - ERROR ("zeromq plugin: zmq_socket failed: %s", - zmq_strerror (errno)); - return (-1); - } - - status = zmq_bind (cmq_socket, value); - if (status != 0) - { - ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s", - value, zmq_strerror (errno)); - (void) zmq_close (cmq_socket); - return (-1); - } - status = pthread_create (thread_ptr, /* attr = */ NULL, /* func = */ receive_thread, @@ -955,22 +1114,66 @@ static int my_config (const char *key, const char *value) } receive_thread_num++; - return (0); } - else if( strcasecmp(key, "SendDataTo") == 0 ) { - if( zmq_send_to != NULL ) { - free(zmq_send_to); - zmq_send_to = NULL; + + /* If this is a sending socket, register a new write function */ + else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH)) + { + user_data_t ud = { NULL, NULL }; + char name[32]; + + ud.data = cmq_socket; + ud.free_func = cmq_close_callback; + + ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num); + sending_sockets_num++; + + plugin_register_write (name, write_value, &ud); + } + + return (0); +} /* }}} int cmq_config_socket */ + +/* + * Config schema: + * + * + * + * Connect "tcp://localhost:6666" + * + * + * Bind "tcp://eth0:6666" + * Bind "tcp://collectd.example.com:6666" + * + * + */ +static int cmq_config (oconfig_item_t *ci) /* {{{ */ +{ + int status; + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Socket", child->key) == 0) + status = cmq_config_socket (child); + else if (strcasecmp ("Threads", child->key) == 0) + { + int tmp = 0; + status = cf_util_get_int (child, &tmp); + if ((status == 0) && (tmp >= 1)) + cmq_threads_num = tmp; } - - if( (zmq_send_to = strdup(value)) == NULL ) { - return 1; + else + { + WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.", + child->key); } } - - - return 0; -} + + return (0); +} /* }}} int cmq_config */ static int plugin_init (void) { @@ -1018,57 +1221,6 @@ static int write_notification (const notification_t *n, user_data_t __attribute_ return 0; } -void free_data(void *data, void *hint) -{ - free(data); -} - -#define PACKET_SIZE 512 - -static int write_value (const data_set_t *ds, const value_list_t *vl, - user_data_t __attribute__((unused)) *user_data) -{ - zmq_msg_t msg; - char *send_buffer; - int send_buffer_size = PACKET_SIZE, real_size; - - // do nothing if no socket open - if( zmq_send_to == NULL ) - return 0; - - send_buffer = malloc(PACKET_SIZE); - if( send_buffer == NULL ) { - ERROR("Unable to allocate memory for send_buffer, aborting write"); - return 1; - } - - // empty buffer - memset(send_buffer, 0, PACKET_SIZE); - - real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl); - - // zeromq will free the memory when needed by calling the free_data function - if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) { - ERROR("zmq_msg_init : %s", zmq_strerror(errno)); - return 1; - } - - // try to send the message - if( zmq_send(push_socket, &msg, ZMQ_NOBLOCK) != 0 ) { - if( errno == EAGAIN ) { - WARNING("ZeroMQ: Cannot send message, queue is full"); - } - else { - ERROR("zmq_send : %s", zmq_strerror(errno)); - return 1; - } - } - - DEBUG("ZeroMQ: data sent"); - - return 0; -} - static int my_shutdown (void) { if( cmq_context ) { @@ -1090,7 +1242,7 @@ static int my_shutdown (void) void module_register (void) { - plugin_register_config("zeromq", my_config, config_keys, config_keys_num); + plugin_register_complex_config("zeromq", cmq_config); plugin_register_init("zeromq", plugin_init); plugin_register_write("zeromq", write_value, /* user_data = */ NULL); -- 2.11.0