From 1d260d242e7a6fb82e567b4f66df7d899f313076 Mon Sep 17 00:00:00 2001 From: Julien Ammous Date: Sat, 13 Nov 2010 18:31:06 +0100 Subject: [PATCH] zeromq plugin: Added support for High Water Mark socket option --- src/zeromq.c | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/zeromq.c b/src/zeromq.c index bc672dc2..6b75a9d7 100644 --- a/src/zeromq.c +++ b/src/zeromq.c @@ -220,13 +220,14 @@ static int write_value (const data_set_t *ds, /* {{{ */ } // try to send the message - if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) { + if( zmq_send(cmq_socket, &msg, ZMQ_NOBLOCK) != 0 ) { if( errno == EAGAIN ) { - WARNING("ZeroMQ: Cannot send message, queue is full"); + WARNING("ZeroMQ: Unable to queue message, queue may be full"); + return -1; } else { ERROR("zmq_send : %s", zmq_strerror(errno)); - return 1; + return -1; } } @@ -352,6 +353,26 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */ endpoints_num++; continue; } /* Endpoint */ + else if( strcasecmp("HWM", child->key) == 0 ) + { + int tmp; + uint64_t hwm; + + status = cf_util_get_int(child, &tmp); + if( status != 0 ) + continue; + + hwm = (uint64_t) tmp; + + status = zmq_setsockopt (cmq_socket, ZMQ_HWM, &hwm, sizeof(hwm)); + if (status != 0) { + ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_HWM) failed: %s", zmq_strerror (errno)); + (void) zmq_close (cmq_socket); + return (-1); + } + + continue; + } /* HWM */ else { ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.", @@ -424,7 +445,10 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */ * Config schema: * * + * Threads 2 + * * + * HWM 300 * Endpoint "tcp://localhost:6666" * * -- 2.11.0