*/
#include "collectd.h"
+
#include "common.h"
#include "plugin.h"
-#include "configfile.h"
-#include "utils_cache.h"
#include "utils_complain.h"
#include "utils_format_graphite.h"
-/* Folks without pthread will need to disable this plugin. */
-#include <pthread.h>
-
#include <netdb.h>
#define WG_DEFAULT_NODE "localhost"
c_complain_t init_complaint;
cdtime_t last_connect_time;
- /*Force reconnect useful for load balanced environments*/
- cdtime_t last_force_reconnect_time;
- int force_reconnect_timeout;
+ /* Force reconnect useful for load balanced environments */
+ cdtime_t last_reconnect_time;
+ cdtime_t reconnect_interval;
+ _Bool reconnect_interval_reached;
};
-/*
-* Force Reconnect functions
-*/
-
-static void wg_force_reconnect_check(struct wg_callback *cb)
+/* wg_force_reconnect_check closes cb->sock_fd when it was open for longer
+ * than cb->reconnect_interval. Must hold cb->send_lock when calling. */
+static void wg_force_reconnect_check (struct wg_callback *cb)
{
cdtime_t now;
- if(!cb->force_reconnect_timeout) return;
- //check if address changes if addr_timeout
+
+ if (cb->reconnect_interval == 0)
+ return;
+
+ /* check if address changes if addr_timeout */
now = cdtime ();
- DEBUG("wg_force_reconnect_check: now %ld last: %ld ",CDTIME_T_TO_TIME_T(now),CDTIME_T_TO_TIME_T(cb->last_force_reconnect_time));
- if ((now - cb->last_force_reconnect_time) < TIME_T_TO_CDTIME_T(cb->force_reconnect_timeout)){
- return;
- }
- //here we should close connection on next
+ if ((now - cb->last_reconnect_time) < cb->reconnect_interval)
+ return;
+
+ /* here we should close connection on next */
close (cb->sock_fd);
cb->sock_fd = -1;
- INFO("Connection Forced closed after %ld seconds ",CDTIME_T_TO_TIME_T(now - cb->last_force_reconnect_time));
- cb->last_force_reconnect_time = now;
-}
-
+ cb->last_reconnect_time = now;
+ cb->reconnect_interval_reached = 1;
+ INFO ("write_graphite plugin: Connection closed after %.3f seconds.",
+ CDTIME_T_TO_DOUBLE (now - cb->last_reconnect_time));
+}
/*
* Functions
static int wg_send_buffer (struct wg_callback *cb)
{
- ssize_t status = 0;
+ ssize_t status;
+
+ if (cb->sock_fd < 0)
+ return (-1);
status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf));
- if (status < 0)
+ if (status != 0)
{
if (cb->log_send_errors)
{
return (0);
}
- if (cb->send_buf_fill <= 0)
+ if (cb->send_buf_fill == 0)
{
cb->send_buf_init_time = cdtime ();
return (0);
static int wg_callback_init (struct wg_callback *cb)
{
- struct addrinfo ai_hints;
struct addrinfo *ai_list;
- struct addrinfo *ai_ptr;
cdtime_t now;
int status;
return (EAGAIN);
cb->last_connect_time = now;
- memset (&ai_hints, 0, sizeof (ai_hints));
-#ifdef AI_ADDRCONFIG
- ai_hints.ai_flags |= AI_ADDRCONFIG;
-#endif
- ai_hints.ai_family = AF_UNSPEC;
+ struct addrinfo ai_hints = {
+ .ai_family = AF_UNSPEC,
+ .ai_flags = AI_ADDRCONFIG
+ };
if (0 == strcasecmp ("tcp", cb->protocol))
ai_hints.ai_socktype = SOCK_STREAM;
else
ai_hints.ai_socktype = SOCK_DGRAM;
- ai_list = NULL;
-
status = getaddrinfo (cb->node, cb->service, &ai_hints, &ai_list);
if (status != 0)
{
}
assert (ai_list != NULL);
- for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
{
cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
ai_ptr->ai_protocol);
continue;
}
+ set_sock_opts (cb->sock_fd);
+
status = connect (cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
if (status != 0)
{
cb->node, cb->service, cb->protocol);
}
- wg_reset_buffer (cb);
+ /* wg_force_reconnect_check does not flush the buffer before closing a
+ * sending socket, so only call wg_reset_buffer() if the socket was closed
+ * for a different reason (tracked in cb->reconnect_interval_reached). */
+ if (!cb->reconnect_interval_reached || (cb->send_buf_free == 0))
+ wg_reset_buffer (cb);
+ else
+ cb->reconnect_interval_reached = 0;
return (0);
}
pthread_mutex_lock (&cb->send_lock);
- wg_force_reconnect_check(cb);
+ wg_force_reconnect_check (cb);
if (cb->sock_fd < 0)
{
static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
struct wg_callback *cb)
{
- char buffer[WG_SEND_BUF_SIZE];
+ char buffer[WG_SEND_BUF_SIZE] = { 0 };
int status;
if (0 != strcmp (ds->type, vl->type))
return -1;
}
- memset (buffer, 0, sizeof (buffer));
status = format_graphite (buffer, sizeof (buffer), ds, vl,
cb->prefix, cb->postfix, cb->escape_char, cb->format_flags);
if (status != 0) /* error message has been printed already. */
static int config_set_char (char *dest,
oconfig_item_t *ci)
{
- char buffer[4];
+ char buffer[4] = { 0 };
int status;
- memset (buffer, 0, sizeof (buffer));
-
status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
if (status != 0)
return (status);
static int wg_config_node (oconfig_item_t *ci)
{
struct wg_callback *cb;
- user_data_t user_data;
char callback_name[DATA_MAX_NAME_LEN];
- int i;
int status = 0;
- cb = malloc (sizeof (*cb));
+ cb = calloc (1, sizeof (*cb));
if (cb == NULL)
{
- ERROR ("write_graphite plugin: malloc failed.");
+ ERROR ("write_graphite plugin: calloc failed.");
return (-1);
}
- memset (cb, 0, sizeof (*cb));
cb->sock_fd = -1;
cb->name = NULL;
cb->node = strdup (WG_DEFAULT_NODE);
cb->service = strdup (WG_DEFAULT_SERVICE);
cb->protocol = strdup (WG_DEFAULT_PROTOCOL);
- cb->last_force_reconnect_time=cdtime();
- cb->force_reconnect_timeout=0;
+ cb->last_reconnect_time = cdtime();
+ cb->reconnect_interval = 0;
+ cb->reconnect_interval_reached = 0;
cb->log_send_errors = WG_DEFAULT_LOG_SEND_ERRORS;
cb->prefix = NULL;
cb->postfix = NULL;
pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
C_COMPLAIN_INIT (&cb->init_complaint);
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
status = -1;
}
}
- else if (strcasecmp ("ForceReconnectTimeout", child->key) == 0)
- cf_util_get_int (child,&cb->force_reconnect_timeout);
+ else if (strcasecmp ("ReconnectInterval", child->key) == 0)
+ cf_util_get_cdtime (child, &cb->reconnect_interval);
else if (strcasecmp ("LogSendErrors", child->key) == 0)
cf_util_get_boolean (child, &cb->log_send_errors);
else if (strcasecmp ("Prefix", child->key) == 0)
ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s",
cb->name);
- memset (&user_data, 0, sizeof (user_data));
- user_data.data = cb;
- user_data.free_func = wg_callback_free;
- plugin_register_write (callback_name, wg_write, &user_data);
+ plugin_register_write (callback_name, wg_write,
+ &(user_data_t) {
+ .data = cb,
+ .free_func = wg_callback_free,
+ });
- user_data.free_func = NULL;
- plugin_register_flush (callback_name, wg_flush, &user_data);
+ plugin_register_flush (callback_name, wg_flush, &(user_data_t) { .data = cb });
return (0);
}
static int wg_config (oconfig_item_t *ci)
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;