}
// try to send the message
- if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+ if( zmq_send(cmq_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
if( errno == EAGAIN ) {
- WARNING("ZeroMQ: Cannot send message, queue is full");
+ WARNING("ZeroMQ: Unable to queue message, queue may be full");
+ return -1;
}
else {
ERROR("zmq_send : %s", zmq_strerror(errno));
- return 1;
+ return -1;
}
}
endpoints_num++;
continue;
} /* Endpoint */
+ else if( strcasecmp("HWM", child->key) == 0 )
+ {
+ int tmp;
+ uint64_t hwm;
+
+ status = cf_util_get_int(child, &tmp);
+ if( status != 0 )
+ continue;
+
+ hwm = (uint64_t) tmp;
+
+ status = zmq_setsockopt (cmq_socket, ZMQ_HWM, &hwm, sizeof(hwm));
+ if (status != 0) {
+ ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_HWM) failed: %s", zmq_strerror (errno));
+ (void) zmq_close (cmq_socket);
+ return (-1);
+ }
+
+ continue;
+ } /* HWM */
else
{
ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
* Config schema:
*
* <Plugin "zeromq">
+ * Threads 2
+ *
* <Socket Publish>
+ * HWM 300
* Endpoint "tcp://localhost:6666"
* </Socket>
* <Socket Subscribe>