projects
/
collectd.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
write_riemann: add a BatchFlushTimeout option
[collectd.git]
/
src
/
write_riemann.c
diff --git
a/src/write_riemann.c
b/src/write_riemann.c
index
fd82650
..
1836b6b
100644
(file)
--- a/
src/write_riemann.c
+++ b/
src/write_riemann.c
@@
-46,6
+46,7
@@
#define RIEMANN_BATCH_MAX 8192
struct riemann_host {
#define RIEMANN_BATCH_MAX 8192
struct riemann_host {
+ c_complain_t init_complaint;
char *name;
char *event_service_prefix;
pthread_mutex_t lock;
char *name;
char *event_service_prefix;
pthread_mutex_t lock;
@@
-56,13
+57,12
@@
struct riemann_host {
_Bool always_append_ds;
char *node;
int port;
_Bool always_append_ds;
char *node;
int port;
- c_complain_t init_complaint;
- c_complain_t init_send_complaint;
riemann_client_type_t client_type;
riemann_client_t *client;
double ttl_factor;
cdtime_t batch_init;
int batch_max;
riemann_client_type_t client_type;
riemann_client_t *client;
double ttl_factor;
cdtime_t batch_init;
int batch_max;
+ int batch_timeout;
int reference_count;
riemann_message_t *batch_msg;
char *tls_ca_file;
int reference_count;
riemann_message_t *batch_msg;
char *tls_ca_file;
@@
-516,6
+516,7
@@
static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
riemann_message_t *msg;
size_t len;
int ret;
riemann_message_t *msg;
size_t len;
int ret;
+ cdtime_t timeout;
msg = wrr_value_list_to_message(host, ds, vl, statuses);
if (msg == NULL)
msg = wrr_value_list_to_message(host, ds, vl, statuses);
if (msg == NULL)
@@
-547,7
+548,12
@@
static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
ret = 0;
if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
ret = wrr_batch_flush_nolock(0, host);
ret = 0;
if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
ret = wrr_batch_flush_nolock(0, host);
- }
+ } else {
+ if (host->batch_timeout > 0) {
+ timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
+ ret = wrr_batch_flush_nolock(timeout, host);
+ }
+ }
pthread_mutex_unlock(&host->lock);
return ret;
pthread_mutex_unlock(&host->lock);
return ret;
@@
-660,6
+666,7
@@
static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
host->batch_mode = 1;
host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
host->batch_init = cdtime();
host->batch_mode = 1;
host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
host->batch_init = cdtime();
+ host->batch_timeout = 0;
host->ttl_factor = RIEMANN_TTL_FACTOR;
host->client = NULL;
host->client_type = RIEMANN_CLIENT_TCP;
host->ttl_factor = RIEMANN_TTL_FACTOR;
host->client = NULL;
host->client_type = RIEMANN_CLIENT_TCP;
@@
-705,6
+712,10
@@
static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
status = cf_util_get_int(child, &host->batch_max);
if (status != 0)
break;
status = cf_util_get_int(child, &host->batch_max);
if (status != 0)
break;
+ } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
+ status = cf_util_get_int(child, &host->batch_timeout);
+ if (status != 0)
+ break;
} else if (strcasecmp("Timeout", child->key) == 0) {
status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
if (status != 0)
} else if (strcasecmp("Timeout", child->key) == 0) {
status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
if (status != 0)