+
+/*
+ * Always call while holding host->lock !
+ */
+static int riemann_batch_flush_nolock (cdtime_t timeout,
+ struct riemann_host *host)
+{
+ cdtime_t now;
+ int status = 0;
+
+ if (timeout > 0) {
+ now = cdtime ();
+ if ((host->batch_init + timeout) > now)
+ return status;
+ }
+ riemann_send_msg(host, host->batch_msg);
+ riemann_msg_protobuf_free(host->batch_msg);
+
+ if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0))
+ riemann_disconnect (host);
+
+ host->batch_init = cdtime();
+ host->batch_msg = NULL;
+ return status;
+}
+
+static int riemann_batch_flush (cdtime_t timeout,
+ const char *identifier __attribute__((unused)),
+ user_data_t *user_data)
+{
+ struct riemann_host *host;
+ int status;
+
+ if (user_data == NULL)
+ return (-EINVAL);
+
+ host = user_data->data;
+ pthread_mutex_lock (&host->lock);
+ status = riemann_batch_flush_nolock (timeout, host);
+ if (status != 0)
+ ERROR ("write_riemann plugin: riemann_send failed with status %i",
+ status);
+
+ pthread_mutex_unlock(&host->lock);
+ return status;
+}
+
+static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl,
+ int *statuses)
+{
+ size_t i;
+ Event **events;
+ Msg *msg;
+ size_t len;
+ int ret;
+
+ msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+ if (msg == NULL)
+ return -1;
+
+ pthread_mutex_lock(&host->lock);
+
+ if (host->batch_msg == NULL) {
+ host->batch_msg = msg;
+ } else {
+ len = msg->n_events + host->batch_msg->n_events;
+ events = realloc(host->batch_msg->events,
+ (len * sizeof(*host->batch_msg->events)));
+ if (events == NULL) {
+ pthread_mutex_unlock(&host->lock);
+ ERROR ("write_riemann plugin: out of memory");
+ riemann_msg_protobuf_free (msg);
+ return -1;
+ }
+ host->batch_msg->events = events;
+
+ for (i = host->batch_msg->n_events; i < len; i++)
+ host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events];
+
+ host->batch_msg->n_events = len;
+ sfree (msg->events);
+ msg->n_events = 0;
+ sfree (msg);
+ }
+
+ len = msg__get_packed_size(host->batch_msg);
+ ret = 0;
+ if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
+ ret = riemann_batch_flush_nolock(0, host);
+ }
+
+ pthread_mutex_unlock(&host->lock);
+ return ret;
+} /* }}} Msg *riemann_batch_add_value_list */
+