static pthread_t *receive_thread_ids = NULL;
static size_t receive_thread_num = 0;
+static int sending_sockets_num = 0;
/* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
////////////////////////////
-
-
-static const char *config_keys[] =
-{
- "ListenOn",
- "SendDataTo"
-};
-static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
-
// config data
static char *zmq_send_to = NULL;
static pthread_t listen_thread_id;
static void *push_socket = NULL;
+static void cmq_close_callback (void *socket) /* {{{ */
+{
+ if (socket != NULL)
+ (void) zmq_close (socket);
+} /* }}} void cmq_close_callback */
+
+static void free_data (void *data, void *hint) /* {{{ */
+{
+ free (data);
+} /* }}} void free_data */
+
static void *receive_thread (void *cmq_socket) /* {{{ */
{
int status;
return (NULL);
} /* }}} void *receive_thread */
-static int my_config (const char *key, const char *value)
+#define PACKET_SIZE 512
+
+static int write_value (const data_set_t *ds, /* {{{ */
+ const value_list_t *vl,
+ user_data_t *user_data)
+{
+ void *cmq_socket = user_data->data;
+
+ zmq_msg_t msg;
+ char *send_buffer;
+ int send_buffer_size = PACKET_SIZE, real_size;
+
+ send_buffer = malloc(PACKET_SIZE);
+ if( send_buffer == NULL ) {
+ ERROR("Unable to allocate memory for send_buffer, aborting write");
+ return 1;
+ }
+
+ // empty buffer
+ memset(send_buffer, 0, PACKET_SIZE);
+
+ real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
+
+ // zeromq will free the memory when needed by calling the free_data function
+ if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
+ ERROR("zmq_msg_init : %s", zmq_strerror(errno));
+ return 1;
+ }
+
+ // try to send the message
+ if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+ if( errno == EAGAIN ) {
+ WARNING("ZeroMQ: Cannot send message, queue is full");
+ }
+ else {
+ ERROR("zmq_send : %s", zmq_strerror(errno));
+ return 1;
+ }
+ }
+
+ DEBUG("ZeroMQ: data sent");
+
+ return 0;
+} /* }}} int write_value */
+
+static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
+{
+ char buffer[64] = "";
+ int status;
+
+ status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
+ if (status != 0)
+ return (-1);
+
+ if (strcasecmp ("Publish", buffer) == 0)
+ return (ZMQ_PUB);
+ else if (strcasecmp ("Subscribe", buffer) == 0)
+ return (ZMQ_SUB);
+ else if (strcasecmp ("Push", buffer) == 0)
+ return (ZMQ_PUSH);
+ else if (strcasecmp ("Pull", buffer) == 0)
+ return (ZMQ_PULL);
+
+ ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
+ buffer);
+ return (-1);
+} /* }}} int cmq_config_mode */
+
+static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
{
+ int type;
+ int status;
+ int i;
+ int connections_num;
+ void *cmq_socket;
+
+ type = cmq_config_mode (ci);
+ if (type < 0)
+ return (-1);
+
if (cmq_context == NULL)
{
cmq_context = zmq_init (cmq_threads_num);
}
}
- if (strcasecmp(key, "ListenOn") == 0)
+ /* Create a new socket */
+ cmq_socket = zmq_socket (cmq_context, type);
+ if (cmq_socket == NULL)
+ {
+ ERROR ("zeromq plugin: zmq_socket failed: %s",
+ zmq_strerror (errno));
+ return (-1);
+ }
+
+ if (type == ZMQ_SUB)
+ {
+ /* Subscribe to all messages */
+ status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
+ /* prefix = */ "", /* prefix length = */ 0);
+ if (status != 0)
+ {
+ ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
+ zmq_strerror (errno));
+ (void) zmq_close (cmq_socket);
+ return (-1);
+ }
+ }
+
+ /* Iterate over all children and do all the binds and connects requested. */
+ connections_num = 0;
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Bind", child->key) == 0)
+ {
+ char *value = NULL;
+
+ if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
+ {
+ ERROR ("zeromq plugin: The \"Bind\" option is not available for "
+ "publish and push sockets. Try \"Connect\".");
+ continue;
+ }
+
+ status = cf_util_get_string (child, &value);
+ if (status != 0)
+ continue;
+
+ status = zmq_bind (cmq_socket, value);
+ if (status != 0)
+ {
+ ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
+ value, zmq_strerror (errno));
+ continue;
+ }
+
+ connections_num++;
+ continue;
+ } /* Bind */
+
+ if (strcasecmp ("Connect", child->key) == 0)
+ {
+ char *value = NULL;
+
+ if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
+ {
+ ERROR ("zeromq plugin: The \"Connect\" option is not available for "
+ "subscribe and pull sockets. Try \"Bind\".");
+ continue;
+ }
+
+ status = cf_util_get_string (child, &value);
+ if (status != 0)
+ continue;
+
+ status = zmq_connect (cmq_socket, value);
+ if (status != 0)
+ {
+ ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
+ value, zmq_strerror (errno));
+ continue;
+ }
+
+ connections_num++;
+ continue;
+ } /* Connect */
+
+ ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
+ child->key);
+ } /* for (i = 0; i < ci->children_num; i++) */
+
+ if (connections_num == 0)
+ {
+ ERROR ("zeromq plugin: No (valid) \"%s\" options were found in this "
+ "\"Socket\" block.",
+ ((type == ZMQ_SUB) || (type == ZMQ_PULL)) ? "Bind" : "Connect");
+ (void) zmq_close (cmq_socket);
+ return (-1);
+ }
+
+ /* If this is a receiving socket, create a new receive thread */
+ if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
{
pthread_t *thread_ptr;
- void *cmq_socket;
- int status;
thread_ptr = realloc (receive_thread_ids,
sizeof (*receive_thread_ids) * (receive_thread_num + 1));
receive_thread_ids = thread_ptr;
thread_ptr = receive_thread_ids + receive_thread_num;
- cmq_socket = zmq_socket (cmq_context, /* type = */ ZMQ_PULL);
- if (cmq_socket == NULL)
- {
- ERROR ("zeromq plugin: zmq_socket failed: %s",
- zmq_strerror (errno));
- return (-1);
- }
-
- status = zmq_bind (cmq_socket, value);
- if (status != 0)
- {
- ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
- value, zmq_strerror (errno));
- (void) zmq_close (cmq_socket);
- return (-1);
- }
-
status = pthread_create (thread_ptr,
/* attr = */ NULL,
/* func = */ receive_thread,
}
receive_thread_num++;
- return (0);
}
- else if( strcasecmp(key, "SendDataTo") == 0 ) {
- if( zmq_send_to != NULL ) {
- free(zmq_send_to);
- zmq_send_to = NULL;
+
+ /* If this is a sending socket, register a new write function */
+ else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
+ {
+ user_data_t ud = { NULL, NULL };
+ char name[32];
+
+ ud.data = cmq_socket;
+ ud.free_func = cmq_close_callback;
+
+ ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
+ sending_sockets_num++;
+
+ plugin_register_write (name, write_value, &ud);
+ }
+
+ return (0);
+} /* }}} int cmq_config_socket */
+
+/*
+ * Config schema:
+ *
+ * <Plugin "zeromq">
+ * <Socket Publish>
+ * Connect "tcp://localhost:6666"
+ * </Socket>
+ * <Socket Subscribe>
+ * Bind "tcp://eth0:6666"
+ * Bind "tcp://collectd.example.com:6666"
+ * </Socket>
+ * </Plugin>
+ */
+static int cmq_config (oconfig_item_t *ci) /* {{{ */
+{
+ int status;
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Socket", child->key) == 0)
+ status = cmq_config_socket (child);
+ else if (strcasecmp ("Threads", child->key) == 0)
+ {
+ int tmp = 0;
+ status = cf_util_get_int (child, &tmp);
+ if ((status == 0) && (tmp >= 1))
+ cmq_threads_num = tmp;
}
-
- if( (zmq_send_to = strdup(value)) == NULL ) {
- return 1;
+ else
+ {
+ WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
+ child->key);
}
}
-
-
- return 0;
-}
+
+ return (0);
+} /* }}} int cmq_config */
static int plugin_init (void)
{
return 0;
}
-void free_data(void *data, void *hint)
-{
- free(data);
-}
-
-#define PACKET_SIZE 512
-
-static int write_value (const data_set_t *ds, const value_list_t *vl,
- user_data_t __attribute__((unused)) *user_data)
-{
- zmq_msg_t msg;
- char *send_buffer;
- int send_buffer_size = PACKET_SIZE, real_size;
-
- // do nothing if no socket open
- if( zmq_send_to == NULL )
- return 0;
-
- send_buffer = malloc(PACKET_SIZE);
- if( send_buffer == NULL ) {
- ERROR("Unable to allocate memory for send_buffer, aborting write");
- return 1;
- }
-
- // empty buffer
- memset(send_buffer, 0, PACKET_SIZE);
-
- real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
-
- // zeromq will free the memory when needed by calling the free_data function
- if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
- ERROR("zmq_msg_init : %s", zmq_strerror(errno));
- return 1;
- }
-
- // try to send the message
- if( zmq_send(push_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
- if( errno == EAGAIN ) {
- WARNING("ZeroMQ: Cannot send message, queue is full");
- }
- else {
- ERROR("zmq_send : %s", zmq_strerror(errno));
- return 1;
- }
- }
-
- DEBUG("ZeroMQ: data sent");
-
- return 0;
-}
-
static int my_shutdown (void)
{
if( cmq_context ) {
void module_register (void)
{
- plugin_register_config("zeromq", my_config, config_keys, config_keys_num);
+ plugin_register_complex_config("zeromq", cmq_config);
plugin_register_init("zeromq", plugin_init);
plugin_register_write("zeromq", write_value,
/* user_data = */ NULL);