struct riemann_host {
#define F_CONNECT 0x01
- u_int8_t flags;
+ uint8_t flags;
pthread_mutex_t lock;
int delay;
char *node;
char *service;
int s;
+
+ int reference_count;
};
static char *riemann_tags[RIEMANN_EXTRA_TAGS];
{
u_char *buffer;
size_t buffer_len;
- ssize_t status;
+ int status;
+
+ pthread_mutex_lock (&host->lock);
+
+ status = riemann_connect (host);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&host->lock);
+ return status;
+ }
buffer_len = msg__get_packed_size(msg);
buffer = malloc (buffer_len);
if (buffer == NULL) {
+ pthread_mutex_unlock (&host->lock);
ERROR ("riemann plugin: malloc failed.");
return ENOMEM;
}
msg__pack(msg, buffer);
- status = swrite (host->s, buffer, buffer_len);
+ status = (int) swrite (host->s, buffer, buffer_len);
if (status != 0)
{
char errbuf[1024];
+
+ riemann_disconnect (host);
+ pthread_mutex_unlock (&host->lock);
+
ERROR ("riemann plugin: Sending to Riemann at %s:%s failed: %s",
host->node,
(host->service != NULL) ? host->service : RIEMANN_PORT,
sstrerror (errno, errbuf, sizeof (errbuf)));
- riemann_disconnect (host);
sfree (buffer);
return -1;
}
+ pthread_mutex_unlock (&host->lock);
sfree (buffer);
return 0;
}
return (msg);
} /* }}} Msg *riemann_notification_to_protobuf */
-static Event *riemann_value_to_protobuf (struct riemann_host *host, /* {{{ */
+static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
data_set_t const *ds,
value_list_t const *vl, size_t index,
gauge_t const *rates)
return (event);
} /* }}} Event *riemann_value_to_protobuf */
-static Msg *riemann_value_list_to_protobuf (struct riemann_host *host, /* {{{ */
+static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
data_set_t const *ds,
value_list_t const *vl)
{
struct riemann_host *host = ud->data;
Msg *msg;
- if ((status = riemann_connect(host)) != 0)
- return status;
-
msg = riemann_value_list_to_protobuf (host, ds, vl);
if (msg == NULL)
return (-1);
return status;
}
+/* host->lock must be held when calling this function. */
static int
riemann_connect(struct riemann_host *host)
{
}
for (ai = res; ai != NULL; ai = ai->ai_next) {
- pthread_mutex_lock(&host->lock);
/*
* check if another thread did not already succesfully connect
*/
if ((host->s = socket(ai->ai_family,
ai->ai_socktype,
ai->ai_protocol)) == -1) {
- pthread_mutex_unlock(&host->lock);
WARNING("riemann_connect: could not open socket");
freeaddrinfo(res);
return -1;
if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
close(host->s);
host->flags |= ~F_CONNECT;
- pthread_mutex_unlock(&host->lock);
freeaddrinfo(res);
return -1;
}
host->flags |= F_CONNECT;
DEBUG("riemann plugin: got a succesful connection for: %s",
host->node);
- pthread_mutex_unlock(&host->lock);
break;
}
return 0;
}
+/* host->lock must be held when calling this function. */
static int
riemann_disconnect (struct riemann_host *host)
{
- if (host == NULL)
- return (EINVAL);
-
if ((host->flags & F_CONNECT) == 0)
return (0);
if (host == NULL)
return;
+ pthread_mutex_lock (&host->lock);
+
+ host->reference_count--;
+ if (host->reference_count > 0)
+ {
+ pthread_mutex_unlock (&host->lock);
+ return;
+ }
+
riemann_disconnect (host);
sfree(host->service);
+ pthread_mutex_destroy (&host->lock);
sfree(host);
}
WARNING("riemann host allocation failed");
return ENOMEM;
}
- pthread_mutex_init(&host->lock, NULL);
+ pthread_mutex_init (&host->lock, NULL);
+ host->reference_count = 1;
host->node = NULL;
host->service = NULL;
host->delay = RIEMANN_DELAY;
}
}
if (status != 0) {
- sfree(host);
+ riemann_free (host);
return status;
}
ud.data = host;
ud.free_func = riemann_free;
- if ((status = plugin_register_write(w_cb_name, riemann_write, &ud)) != 0)
- riemann_free(host);
+ pthread_mutex_lock (&host->lock);
+
+ status = plugin_register_write (w_cb_name, riemann_write, &ud);
+ if (status != 0)
+ WARNING ("riemann plugin: plugin_register_write (\"%s\") "
+ "failed with status %i.",
+ w_cb_name, status);
+ else /* success */
+ host->reference_count++;
+
+ status = plugin_register_notification (n_cb_name,
+ riemann_notification, &ud);
+ if (status != 0)
+ WARNING ("riemann plugin: plugin_register_notification (\"%s\") "
+ "failed with status %i.",
+ n_cb_name, status);
+ else /* success */
+ host->reference_count++;
- if ((status = plugin_register_notification(n_cb_name,
- riemann_notification,
- &ud)) != 0) {
- plugin_unregister_write(w_cb_name);
- riemann_free(host);
+ if (host->reference_count <= 1)
+ {
+ /* Both callbacks failed => free memory.
+ * We need to unlock here, because riemann_free() will lock.
+ * This is not a race condition, because we're the only one
+ * holding a reference. */
+ pthread_mutex_unlock (&host->lock);
+ riemann_free (host);
+ return (-1);
}
+
+ host->reference_count--;
+ pthread_mutex_unlock (&host->lock);
+
return status;
}