/**
* collectd - src/mqtt.c
- * Copyright (C) 2014 Marc Falzon <marc at baha dot mu>
+ * Copyright (C) 2014 Marc Falzon
+ * Copyright (C) 2014,2015 Florian octo Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Marc Falzon <marc at baha dot mu>
+ * Florian octo Forster <octo at collectd.org>
+ * Jan-Piet Mens <jpmens at gmail.com>
**/
// Reference: http://mosquitto.org/api/files/mosquitto-h.html
#include "collectd.h"
+
#include "common.h"
#include "plugin.h"
-#include "utils_cache.h"
#include "utils_complain.h"
-#include <pthread.h>
-
#include <mosquitto.h>
#define MQTT_MAX_TOPIC_SIZE 1024
#define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
#define MQTT_DEFAULT_HOST "localhost"
#define MQTT_DEFAULT_PORT 1883
-#define MQTT_DEFAULT_CLIENT_ID "collectd"
#define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
+#define MQTT_DEFAULT_TOPIC "collectd/#"
+#ifndef MQTT_KEEPALIVE
+# define MQTT_KEEPALIVE 60
+#endif
+#ifndef SSL_VERIFY_PEER
+# define SSL_VERIFY_PEER 1
+#endif
+
/*
* Data types
*/
struct mqtt_client_conf
{
- struct mosquitto *mosq;
- bool connected;
- char *host;
+ _Bool publish;
+ char *name;
+
+ struct mosquitto *mosq;
+ _Bool connected;
+
+ char *host;
int port;
- char *client_id;
- char *topic_prefix;
+ char *client_id;
+ char *username;
+ char *password;
+ int qos;
+ char *cacertificatefile;
+ char *certificatefile;
+ char *certificatekeyfile;
+ char *tlsprotocol;
+ char *ciphersuite;
+
+ /* For publishing */
+ char *topic_prefix;
+ _Bool store_rates;
+ _Bool retain;
+
+ /* For subscribing */
+ pthread_t thread;
+ _Bool loop;
+ char *topic;
+ _Bool clean_session;
+
c_complain_t complaint_cantpublish;
pthread_mutex_t lock;
};
+typedef struct mqtt_client_conf mqtt_client_conf_t;
+
+static mqtt_client_conf_t **subscribers = NULL;
+static size_t subscribers_num = 0;
/*
* Functions
*/
-static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
+#if LIBMOSQUITTO_MAJOR == 0
+static char const *mosquitto_strerror (int code)
+{
+ switch (code)
+ {
+ case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
+ case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
+ case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
+ case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
+ case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
+ case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
+ case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
+ case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
+ case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
+ case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
+ case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
+ case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
+ case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
+ case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
+ case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
+ }
+
+ return "UNKNOWN ERROR CODE";
+}
+#else
+/* provided by libmosquitto */
+#endif
+
+static void mqtt_free (mqtt_client_conf_t *conf)
+{
+ if (conf == NULL)
+ return;
+
+ if (conf->connected)
+ (void) mosquitto_disconnect (conf->mosq);
+ conf->connected = 0;
+ (void) mosquitto_destroy (conf->mosq);
+
+ sfree (conf->host);
+ sfree (conf->username);
+ sfree (conf->password);
+ sfree (conf->client_id);
+ sfree (conf->topic_prefix);
+ sfree (conf);
+}
+
+static char *strip_prefix (char *topic)
+{
+ size_t num = 0;
+
+ for (size_t i = 0; topic[i] != 0; i++)
+ if (topic[i] == '/')
+ num++;
+
+ if (num < 2)
+ return (NULL);
+
+ while (num > 2)
+ {
+ char *tmp = strchr (topic, '/');
+ if (tmp == NULL)
+ return (NULL);
+ topic = tmp + 1;
+ num--;
+ }
+
+ return (topic);
+}
+
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+ __attribute__((unused)) struct mosquitto *m,
+#endif
+ __attribute__((unused)) void *arg,
+ const struct mosquitto_message *msg)
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ data_set_t const *ds;
+ char *topic;
+ char *name;
+ char *payload;
+ int status;
+
+ if (msg->payloadlen <= 0) {
+ DEBUG ("mqtt plugin: message has empty payload");
+ return;
+ }
+
+ topic = strdup (msg->topic);
+ name = strip_prefix (topic);
+
+ status = parse_identifier_vl (name, &vl);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
+ sfree (topic);
+ return;
+ }
+ sfree (topic);
+
+ ds = plugin_get_ds (vl.type);
+ if (ds == NULL)
+ {
+ ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
+ return;
+ }
+
+ vl.values = calloc (ds->ds_num, sizeof (*vl.values));
+ if (vl.values == NULL)
+ {
+ ERROR ("mqtt plugin: calloc failed.");
+ return;
+ }
+ vl.values_len = ds->ds_num;
+
+ payload = malloc (msg->payloadlen+1);
+ if (payload == NULL)
+ {
+ ERROR ("mqtt plugin: malloc for payload buffer failed.");
+ sfree (vl.values);
+ return;
+ }
+ memmove (payload, msg->payload, msg->payloadlen);
+ payload[msg->payloadlen] = 0;
+
+ DEBUG ("mqtt plugin: payload = \"%s\"", payload);
+ status = parse_values (payload, &vl, ds);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
+ sfree (payload);
+ sfree (vl.values);
+ return;
+ }
+ sfree (payload);
+
+ plugin_dispatch_values (&vl);
+ sfree (vl.values);
+} /* void on_message */
+
+/* must hold conf->lock when calling. */
+static int mqtt_reconnect (mqtt_client_conf_t *conf)
{
int status;
if (conf->connected)
return (0);
- pthread_mutex_lock (&conf->lock);
-
status = mosquitto_reconnect (conf->mosq);
-
- if (status != MOSQ_ERR_SUCCESS) {
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO ?
- strerror(errno) : mosquitto_strerror (status)));
- pthread_mutex_unlock (&conf->lock);
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
return (-1);
}
- conf->connected = true;
+ conf->connected = 1;
c_release (LOG_INFO,
- &conf->complaint_cantpublish,
- "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
- conf->host, conf->port);
-
- pthread_mutex_unlock (&conf->lock);
+ &conf->complaint_cantpublish,
+ "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
+ conf->host, conf->port);
return (0);
-} /* mqtt_reconnect_broker */
+} /* mqtt_reconnect */
-static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
- char *payload, size_t payload_len)
+/* must hold conf->lock when calling. */
+static int mqtt_connect (mqtt_client_conf_t *conf)
{
- char errbuf[1024];
+ char const *client_id;
int status;
- status = mosquitto_publish(conf->mosq,
- /* message id */ NULL,
- topic,
- (int) payload_len,
- payload,
- /* qos */ 0,
- /* retain */ false);
+ if (conf->mosq != NULL)
+ return mqtt_reconnect (conf);
- if (status != MOSQ_ERR_SUCCESS)
+ if (conf->client_id)
+ client_id = conf->client_id;
+ else
+ client_id = hostname_g;
+
+#if LIBMOSQUITTO_MAJOR == 0
+ conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+ conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
+ if (conf->mosq == NULL)
{
- c_complain (LOG_ERR,
- &conf->complaint_cantpublish,
- "plugin mqtt: mosquitto_publish failed: %s",
- status == MOSQ_ERR_ERRNO ?
- sstrerror(errno, errbuf, sizeof (errbuf)) :
- mosquitto_strerror(status));
- /*
- Mark our connection "down" regardless of the error as a safety measure;
- we will try to reconnect the next time we have to publish a message
- */
- conf->connected = false;
+ ERROR ("mqtt plugin: mosquitto_new failed");
+ return (-1);
+ }
+
+#if LIBMOSQUITTO_MAJOR != 0
+ if (conf->cacertificatefile) {
+ status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
+ conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status));
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+
+ status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status));
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ status = mosquitto_tls_insecure_set(conf->mosq, false);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status));
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
+#endif
+
+ if (conf->username && conf->password)
+ {
+ status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
+
+#if LIBMOSQUITTO_MAJOR == 0
+ status = mosquitto_connect (conf->mosq, conf->host, conf->port,
+ /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+ status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_connect failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
return (-1);
}
+ if (!conf->publish)
+ {
+ mosquitto_message_callback_set (conf->mosq, on_message);
+
+ status = mosquitto_subscribe (conf->mosq,
+ /* message_id = */ NULL,
+ conf->topic, conf->qos);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
+ conf->topic, mosquitto_strerror (status));
+
+ mosquitto_disconnect (conf->mosq);
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
+
+ conf->connected = 1;
return (0);
-} /* mqtt_publish_message */
+} /* mqtt_connect */
-static int mqtt_format_metric_value (char *buf, size_t buf_len,
- const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static void *subscribers_thread (void *arg)
{
- gauge_t *rates = NULL;
- gauge_t *value = NULL;
- size_t metric_value_len;
- int status = 0;
-
- memset (buf, 0, buf_len);
+ mqtt_client_conf_t *conf = arg;
+ int status;
- if (data_set->ds[ds_num].type == DS_TYPE_GAUGE)
- value = &vl->values[ds_num].gauge;
- else {
- rates = uc_get_rate (data_set, vl);
- value = &rates[ds_num];
- }
+ conf->loop = 1;
- metric_value_len = ssnprintf (buf, buf_len, "%f", *value);
+ while (conf->loop)
+ {
+ status = mqtt_connect (conf);
+ if (status != 0)
+ {
+ sleep (1);
+ continue;
+ }
- if (metric_value_len >= buf_len)
- return (-ENOMEM);
+ /* The documentation says "0" would map to the default (1000ms), but
+ * that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
+ status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+ status = mosquitto_loop (conf->mosq,
+ /* timeout[ms] = */ 1000,
+ /* max_packets = */ 100);
+#endif
+ if (status == MOSQ_ERR_CONN_LOST)
+ {
+ conf->connected = 0;
+ continue;
+ }
+ else if (status != MOSQ_ERR_SUCCESS)
+ {
+ ERROR ("mqtt plugin: mosquitto_loop failed: %s",
+ mosquitto_strerror (status));
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ conf->connected = 0;
+ continue;
+ }
- if (rates)
- sfree (rates);
+ DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
+ } /* while (conf->loop) */
- return (status);
-} /* mqtt_format_metric_value */
+ pthread_exit (0);
+} /* void *subscribers_thread */
-static int mqtt_format_message_topic (char *buf, size_t buf_len,
- char const *prefix, const value_list_t *vl, const char *ds_name)
+static int publish (mqtt_client_conf_t *conf, char const *topic,
+ void const *payload, size_t payload_len)
{
- size_t topic_buf_len;
-
- memset (buf, 0, buf_len);
-
- /*
- MQTT message topic format:
- [<prefix>/]<hostname>/<plugin>/<plugin instance>/<type>/<type instance>/<ds>/
- */
- topic_buf_len = (size_t) ssnprintf (buf, buf_len,
- "%s/%s/%s/%s/%s/%s/%s",
- prefix,
- vl->host,
- vl->plugin,
- vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)",
- vl->type,
- vl->type_instance[0] != '\0' ? vl->type_instance : "(null)",
- ds_name);
-
- if (topic_buf_len >= buf_len)
+ int status;
+
+ pthread_mutex_lock (&conf->lock);
+
+ status = mqtt_connect (conf);
+ if (status != 0) {
+ pthread_mutex_unlock (&conf->lock);
+ ERROR ("mqtt plugin: unable to reconnect to broker");
+ return (status);
+ }
+
+ status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
+ (uint32_t) payload_len, payload,
+#else
+ (int) payload_len, payload,
+#endif
+ conf->qos, conf->retain);
+ if (status != MOSQ_ERR_SUCCESS)
{
- ERROR ("mqtt_format_message_topic: topic buffer too small: "
- "Need %zu bytes.", topic_buf_len + 1);
- return (-ENOMEM);
+ char errbuf[1024];
+ c_complain (LOG_ERR,
+ &conf->complaint_cantpublish,
+ "mqtt plugin: mosquitto_publish failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror(status));
+ /* Mark our connection "down" regardless of the error as a safety
+ * measure; we will try to reconnect the next time we have to publish a
+ * message */
+ conf->connected = 0;
+
+ pthread_mutex_unlock (&conf->lock);
+ return (-1);
}
+ pthread_mutex_unlock (&conf->lock);
+ return (0);
+} /* int publish */
+
+static int format_topic (char *buf, size_t buf_len,
+ data_set_t const *ds, value_list_t const *vl,
+ mqtt_client_conf_t *conf)
+{
+ char name[MQTT_MAX_TOPIC_SIZE];
+ int status;
+
+ if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
+ return (FORMAT_VL (buf, buf_len, vl));
+
+ status = FORMAT_VL (name, sizeof (name), vl);
+ if (status != 0)
+ return (status);
+
+ status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
+ if ((status < 0) || (((size_t) status) >= buf_len))
+ return (ENOMEM);
+
return (0);
-} /* mqtt_format_message_topic */
+} /* int format_topic */
-static int mqtt_format_payload (char *buf, size_t buf_len,
- const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
+ user_data_t *user_data)
{
- char metric_path[10 * DATA_MAX_NAME_LEN];
- char metric_value[512];
- size_t payload_buf_len;
+ mqtt_client_conf_t *conf;
+ char topic[MQTT_MAX_TOPIC_SIZE];
+ char payload[MQTT_MAX_MESSAGE_SIZE];
int status = 0;
- memset (buf, 0, buf_len);
-
- ssnprintf (metric_path, sizeof (metric_path),
- "%s.%s%s%s.%s%s%s%s%s",
- vl->host,
- vl->plugin,
- vl->plugin_instance[0] != '\0' ? "." : "",
- vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "",
- vl->type,
- vl->type_instance[0] != '\0' ? "." : "",
- vl->type_instance[0] != '\0' ? vl->type_instance : "",
- strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "",
- strcmp(data_set->ds[ds_num].name, "value") != 0 ?
- data_set->ds[ds_num].name : "");
-
- status = mqtt_format_metric_value (metric_value,
- sizeof (metric_value),
- data_set,
- vl,
- ds_num);
+ if ((user_data == NULL) || (user_data->data == NULL))
+ return (EINVAL);
+ conf = user_data->data;
+ status = format_topic (topic, sizeof (topic), ds, vl, conf);
if (status != 0)
{
- ERROR ("mqtt_format_payload: error with mqtt_format_metric_value");
+ ERROR ("mqtt plugin: format_topic failed with status %d.", status);
return (status);
}
- payload_buf_len = (size_t) ssnprintf (buf, buf_len,
- "%s %s %u",
- metric_path,
- metric_value,
- (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+ status = format_values (payload, sizeof (payload),
+ ds, vl, conf->store_rates);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: format_values failed with status %d.", status);
+ return (status);
+ }
- if (payload_buf_len >= buf_len)
+ status = publish (conf, topic, payload, strlen (payload) + 1);
+ if (status != 0)
{
- ERROR ("mqtt_format_payload: payload buffer too small: "
- "Need %zu bytes.", payload_buf_len + 1);
- return (-ENOMEM);
+ ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
+ return (status);
}
return (status);
-} /* mqtt_format_payload */
+} /* mqtt_write */
-static int mqtt_write (const data_set_t *data_set, const value_list_t *vl,
- user_data_t *user_data)
+/*
+ * <Publish "name">
+ * Host "example.com"
+ * Port 1883
+ * ClientId "collectd"
+ * User "guest"
+ * Password "secret"
+ * Prefix "collectd"
+ * StoreRates true
+ * Retain false
+ * QoS 0
+ * CACert "ca.pem" Enables TLS if set
+ * CertificateFile "client-cert.pem" optional
+ * CertificateKeyFile "client-key.pem" optional
+ * TLSProtocol "tlsv1.2" optional
+ * </Publish>
+ */
+static int mqtt_config_publisher (oconfig_item_t *ci)
{
- struct mqtt_client_conf *conf;
- char msg_topic[MQTT_MAX_TOPIC_SIZE];
- char msg_payload[MQTT_MAX_MESSAGE_SIZE];
- int status = 0;
- int i;
-
- if (user_data == NULL)
- return (EINVAL);
-
- conf = user_data->data;
+ mqtt_client_conf_t *conf;
+ char cb_name[1024];
+ user_data_t user_data = { 0 };
+ int status;
- if (!conf->connected)
+ conf = calloc (1, sizeof (*conf));
+ if (conf == NULL)
{
- status = mqtt_reconnect_broker (conf);
+ ERROR ("mqtt plugin: calloc failed.");
+ return (-1);
+ }
+ conf->publish = 1;
- if (status != 0) {
- ERROR ("plugin mqtt: unable to reconnect to broker");
- return (status);
- }
+ conf->name = NULL;
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
}
- for (i = 0; i < data_set->ds_num; i++)
+ conf->host = strdup (MQTT_DEFAULT_HOST);
+ conf->port = MQTT_DEFAULT_PORT;
+ conf->client_id = NULL;
+ conf->qos = 0;
+ conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+ conf->store_rates = 1;
+
+ status = pthread_mutex_init (&conf->lock, NULL);
+ if (status != 0)
{
- status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic),
- conf->topic_prefix, vl, data_set->ds[i].name);
- if (status != 0)
- {
- ERROR ("plugin mqtt: error with mqtt_format_message_topic");
- return (status);
- }
+ mqtt_free (conf);
+ return (status);
+ }
- status = mqtt_format_payload (msg_payload,
- sizeof (msg_payload),
- data_set,
- vl,
- i);
+ C_COMPLAIN_INIT (&conf->complaint_cantpublish);
- if (status != 0)
+ for (int i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
{
- ERROR ("mqtt_write: error with mqtt_format_payload");
- return (status);
+ int tmp = cf_util_get_port_number (child);
+ if (tmp < 0)
+ ERROR ("mqtt plugin: Invalid port number.");
+ else
+ conf->port = tmp;
}
-
- status = mqtt_publish_message (conf,
- msg_topic,
- msg_payload,
- sizeof (msg_payload));
- if (status != 0)
+ else if (strcasecmp ("ClientId", child->key) == 0)
+ cf_util_get_string (child, &conf->client_id);
+ else if (strcasecmp ("User", child->key) == 0)
+ cf_util_get_string (child, &conf->username);
+ else if (strcasecmp ("Password", child->key) == 0)
+ cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("QoS", child->key) == 0)
{
- ERROR ("plugin mqtt: unable to publish message");
- return (status);
+ int tmp = -1;
+ status = cf_util_get_int (child, &tmp);
+ if ((status != 0) || (tmp < 0) || (tmp > 2))
+ ERROR ("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = tmp;
}
-
- DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m "
- "published message: topic=%s payload=%s",
- (unsigned)pthread_self(),
- msg_topic,
- msg_payload);
+ else if (strcasecmp ("Prefix", child->key) == 0)
+ cf_util_get_string (child, &conf->topic_prefix);
+ else if (strcasecmp ("StoreRates", child->key) == 0)
+ cf_util_get_boolean (child, &conf->store_rates);
+ else if (strcasecmp ("Retain", child->key) == 0)
+ cf_util_get_boolean (child, &conf->retain);
+ else if (strcasecmp ("CACert", child->key) == 0)
+ cf_util_get_string (child, &conf->cacertificatefile);
+ else if (strcasecmp ("CertificateFile", child->key) == 0)
+ cf_util_get_string (child, &conf->certificatefile);
+ else if (strcasecmp ("CertificateKeyFile", child->key) == 0)
+ cf_util_get_string (child, &conf->certificatekeyfile);
+ else if (strcasecmp ("TLSProtocol", child->key) == 0)
+ cf_util_get_string (child, &conf->tlsprotocol);
+ else if (strcasecmp ("CipherSuite", child->key) == 0)
+ cf_util_get_string (child, &conf->ciphersuite);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
- return (status);
-} /* mqtt_write */
+ ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
+ user_data.data = conf;
-static int mqtt_config (oconfig_item_t *ci)
+ plugin_register_write (cb_name, mqtt_write, &user_data);
+ return (0);
+} /* mqtt_config_publisher */
+
+/*
+ * <Subscribe "name">
+ * Host "example.com"
+ * Port 1883
+ * ClientId "collectd"
+ * User "guest"
+ * Password "secret"
+ * Topic "collectd/#"
+ * </Subscribe>
+ */
+static int mqtt_config_subscriber (oconfig_item_t *ci)
{
- struct mqtt_client_conf *conf;
- user_data_t user_data;
- char errbuf[1024];
+ mqtt_client_conf_t **tmp;
+ mqtt_client_conf_t *conf;
int status;
- DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ",
- (unsigned)pthread_self());
-
- conf = malloc (sizeof (*conf));
+ conf = calloc (1, sizeof (*conf));
if (conf == NULL)
{
- ERROR ("write_mqtt plugin: malloc failed.");
+ ERROR ("mqtt plugin: calloc failed.");
return (-1);
}
+ conf->publish = 0;
- memset (conf, 0, sizeof (*conf));
+ conf->name = NULL;
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
+ }
- conf->connected = false;
- conf->host = MQTT_DEFAULT_HOST;
+ conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
- conf->client_id = MQTT_DEFAULT_CLIENT_ID;
- conf->topic_prefix = MQTT_DEFAULT_TOPIC_PREFIX;
- C_COMPLAIN_INIT (&conf->complaint_cantpublish);
-
- memset (&user_data, 0, sizeof (user_data));
- user_data.data = conf;
+ conf->client_id = NULL;
+ conf->qos = 2;
+ conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+ conf->clean_session = 1;
- if ((conf->mosq = mosquitto_new (conf->client_id, true, NULL)) == NULL) {
- ERROR ("mqtt_config: mosquitto_new failed");
- return (-1);
+ status = pthread_mutex_init (&conf->lock, NULL);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
}
- status = mosquitto_connect (conf->mosq, conf->host, conf->port, 10);
+ C_COMPLAIN_INIT (&conf->complaint_cantpublish);
- if (status != MOSQ_ERR_SUCCESS) {
- ERROR ("mqtt_config: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO ?
- sstrerror(errno, errbuf, sizeof (errbuf)) :
- mosquitto_strerror (status)));
- return (-1);
+ for (int i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ status = cf_util_get_port_number (child);
+ if (status < 0)
+ ERROR ("mqtt plugin: Invalid port number.");
+ else
+ conf->port = status;
+ }
+ else if (strcasecmp ("ClientId", child->key) == 0)
+ cf_util_get_string (child, &conf->client_id);
+ else if (strcasecmp ("User", child->key) == 0)
+ cf_util_get_string (child, &conf->username);
+ else if (strcasecmp ("Password", child->key) == 0)
+ cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("QoS", child->key) == 0)
+ {
+ int qos = -1;
+ status = cf_util_get_int (child, &qos);
+ if ((status != 0) || (qos < 0) || (qos > 2))
+ ERROR ("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = qos;
+ }
+ else if (strcasecmp ("Topic", child->key) == 0)
+ cf_util_get_string (child, &conf->topic);
+ else if (strcasecmp ("CleanSession", child->key) == 0)
+ cf_util_get_boolean (child, &conf->clean_session);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
- DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
- conf->host, conf->port);
+ tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) );
+ if (tmp == NULL)
+ {
+ ERROR ("mqtt plugin: realloc failed.");
+ mqtt_free (conf);
+ return (-1);
+ }
+ subscribers = tmp;
+ subscribers[subscribers_num] = conf;
+ subscribers_num++;
- conf->connected = true;
+ return (0);
+} /* mqtt_config_subscriber */
- plugin_register_write ("mqtt", mqtt_write, &user_data);
+/*
+ * <Plugin mqtt>
+ * <Publish "name">
+ * # ...
+ * </Publish>
+ * <Subscribe "name">
+ * # ...
+ * </Subscribe>
+ * </Plugin>
+ */
+static int mqtt_config (oconfig_item_t *ci)
+{
+ for (int i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Publish", child->key) == 0)
+ mqtt_config_publisher (child);
+ else if (strcasecmp ("Subscribe", child->key) == 0)
+ mqtt_config_subscriber (child);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+ }
return (0);
-} /* mqtt_config */
+} /* int mqtt_config */
static int mqtt_init (void)
{
- mosquitto_lib_init();
+ mosquitto_lib_init ();
+
+ for (size_t i = 0; i < subscribers_num; i++)
+ {
+ int status;
+
+ if (subscribers[i]->loop)
+ continue;
+
+ status = plugin_thread_create (&subscribers[i]->thread,
+ /* attrs = */ NULL,
+ /* func = */ subscribers_thread,
+ /* args = */ subscribers[i]);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ continue;
+ }
+ }
return (0);
} /* mqtt_init */