X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fmqtt.c;h=ad889953a1127a561234af42463fc040cba2bf7c;hb=4d370741101aeb037ae52f3529a4a0869e0dc08a;hp=3bed0a5e32e537b9898c4bb7cb3b63085f529ce7;hpb=ed93534ffd9b64f72fc4d9dcff866191cddf3de6;p=collectd.git diff --git a/src/mqtt.c b/src/mqtt.c index 3bed0a5e..ad889953 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -1,6 +1,7 @@ /** * collectd - src/mqtt.c - * Copyright (C) 2014 Marc Falzon + * Copyright (C) 2014 Marc Falzon + * Copyright (C) 2014,2015 Florian octo Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -19,344 +20,773 @@ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Marc Falzon + * Florian octo Forster + * Jan-Piet Mens **/ // Reference: http://mosquitto.org/api/files/mosquitto-h.html #include "collectd.h" + #include "common.h" #include "plugin.h" -#include "utils_cache.h" #include "utils_complain.h" -#include - #include #define MQTT_MAX_TOPIC_SIZE 1024 #define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024 #define MQTT_DEFAULT_HOST "localhost" #define MQTT_DEFAULT_PORT 1883 -#define MQTT_DEFAULT_CLIENT_ID "collectd" #define MQTT_DEFAULT_TOPIC_PREFIX "collectd" +#define MQTT_DEFAULT_TOPIC "collectd/#" +#ifndef MQTT_KEEPALIVE +# define MQTT_KEEPALIVE 60 +#endif +#ifndef SSL_VERIFY_PEER +# define SSL_VERIFY_PEER 1 +#endif + /* * Data types */ struct mqtt_client_conf { - struct mosquitto *mosq; - bool connected; - char *host; + _Bool publish; + char *name; + + struct mosquitto *mosq; + _Bool connected; + + char *host; int port; - char *client_id; - char *topic_prefix; + char *client_id; + char *username; + char *password; + int qos; + char *cacertificatefile; + char *certificatefile; + char *certificatekeyfile; + char *tlsprotocol; + char *ciphersuite; + + /* For publishing */ + char *topic_prefix; + _Bool store_rates; + _Bool retain; + + /* For subscribing */ + pthread_t thread; + _Bool loop; + char *topic; + _Bool clean_session; + c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; +typedef struct mqtt_client_conf mqtt_client_conf_t; + +static mqtt_client_conf_t **subscribers = NULL; +static size_t subscribers_num = 0; /* * Functions */ -static int mqtt_reconnect_broker (struct mqtt_client_conf *conf) +#if LIBMOSQUITTO_MAJOR == 0 +static char const *mosquitto_strerror (int code) +{ + switch (code) + { + case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS"; + case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM"; + case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL"; + case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL"; + case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN"; + case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED"; + case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND"; + case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST"; + case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL"; + case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE"; + case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED"; + case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH"; + case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED"; + case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN"; + case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO"; + } + + return "UNKNOWN ERROR CODE"; +} +#else +/* provided by libmosquitto */ +#endif + +static void mqtt_free (mqtt_client_conf_t *conf) +{ + if (conf == NULL) + return; + + if (conf->connected) + (void) mosquitto_disconnect (conf->mosq); + conf->connected = 0; + (void) mosquitto_destroy (conf->mosq); + + sfree (conf->host); + sfree (conf->username); + sfree (conf->password); + sfree (conf->client_id); + sfree (conf->topic_prefix); + sfree (conf); +} + +static char *strip_prefix (char *topic) +{ + size_t num = 0; + + for (size_t i = 0; topic[i] != 0; i++) + if (topic[i] == '/') + num++; + + if (num < 2) + return (NULL); + + while (num > 2) + { + char *tmp = strchr (topic, '/'); + if (tmp == NULL) + return (NULL); + topic = tmp + 1; + num--; + } + + return (topic); +} + +static void on_message ( +#if LIBMOSQUITTO_MAJOR == 0 +#else + __attribute__((unused)) struct mosquitto *m, +#endif + __attribute__((unused)) void *arg, + const struct mosquitto_message *msg) +{ + value_list_t vl = VALUE_LIST_INIT; + data_set_t const *ds; + char *topic; + char *name; + char *payload; + int status; + + if (msg->payloadlen <= 0) { + DEBUG ("mqtt plugin: message has empty payload"); + return; + } + + topic = strdup (msg->topic); + name = strip_prefix (topic); + + status = parse_identifier_vl (name, &vl); + if (status != 0) + { + ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic); + sfree (topic); + return; + } + sfree (topic); + + ds = plugin_get_ds (vl.type); + if (ds == NULL) + { + ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type); + return; + } + + vl.values = calloc (ds->ds_num, sizeof (*vl.values)); + if (vl.values == NULL) + { + ERROR ("mqtt plugin: calloc failed."); + return; + } + vl.values_len = ds->ds_num; + + payload = malloc (msg->payloadlen+1); + if (payload == NULL) + { + ERROR ("mqtt plugin: malloc for payload buffer failed."); + sfree (vl.values); + return; + } + memmove (payload, msg->payload, msg->payloadlen); + payload[msg->payloadlen] = 0; + + DEBUG ("mqtt plugin: payload = \"%s\"", payload); + status = parse_values (payload, &vl, ds); + if (status != 0) + { + ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload); + sfree (payload); + sfree (vl.values); + return; + } + sfree (payload); + + plugin_dispatch_values (&vl); + sfree (vl.values); +} /* void on_message */ + +/* must hold conf->lock when calling. */ +static int mqtt_reconnect (mqtt_client_conf_t *conf) { int status; if (conf->connected) return (0); - pthread_mutex_lock (&conf->lock); - status = mosquitto_reconnect (conf->mosq); - - if (status != MOSQ_ERR_SUCCESS) { + if (status != MOSQ_ERR_SUCCESS) + { + char errbuf[1024]; ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s", - (status == MOSQ_ERR_ERRNO ? - strerror(errno) : mosquitto_strerror (status))); - pthread_mutex_unlock (&conf->lock); + (status == MOSQ_ERR_ERRNO) + ? sstrerror(errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror (status)); return (-1); } - conf->connected = true; + conf->connected = 1; c_release (LOG_INFO, - &conf->complaint_cantpublish, - "mqtt plugin: successfully reconnected to broker \"%s:%d\"", - conf->host, conf->port); - - pthread_mutex_unlock (&conf->lock); + &conf->complaint_cantpublish, + "mqtt plugin: successfully reconnected to broker \"%s:%d\"", + conf->host, conf->port); return (0); -} /* mqtt_reconnect_broker */ +} /* mqtt_reconnect */ -static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, - char *payload, size_t payload_len) +/* must hold conf->lock when calling. */ +static int mqtt_connect (mqtt_client_conf_t *conf) { - char errbuf[1024]; + char const *client_id; int status; - status = mosquitto_publish(conf->mosq, - /* message id */ NULL, - topic, - (int) payload_len, - payload, - /* qos */ 0, - /* retain */ false); + if (conf->mosq != NULL) + return mqtt_reconnect (conf); - if (status != MOSQ_ERR_SUCCESS) + if (conf->client_id) + client_id = conf->client_id; + else + client_id = hostname_g; + +#if LIBMOSQUITTO_MAJOR == 0 + conf->mosq = mosquitto_new (client_id, /* user data = */ conf); +#else + conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf); +#endif + if (conf->mosq == NULL) { - c_complain (LOG_ERR, - &conf->complaint_cantpublish, - "plugin mqtt: mosquitto_publish failed: %s", - status == MOSQ_ERR_ERRNO ? - sstrerror(errno, errbuf, sizeof (errbuf)) : - mosquitto_strerror(status)); - /* - Mark our connection "down" regardless of the error as a safety measure; - we will try to reconnect the next time we have to publish a message - */ - conf->connected = false; + ERROR ("mqtt plugin: mosquitto_new failed"); + return (-1); + } +#if LIBMOSQUITTO_MAJOR != 0 + if (conf->cacertificatefile) { + status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL, + conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL); + if (status != MOSQ_ERR_SUCCESS) { + ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status)); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + + status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite); + if (status != MOSQ_ERR_SUCCESS) { + ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status)); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + + status = mosquitto_tls_insecure_set(conf->mosq, false); + if (status != MOSQ_ERR_SUCCESS) { + ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status)); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + } +#endif + + if (conf->username && conf->password) + { + status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password); + if (status != MOSQ_ERR_SUCCESS) + { + char errbuf[1024]; + ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror (status)); + + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + } + +#if LIBMOSQUITTO_MAJOR == 0 + status = mosquitto_connect (conf->mosq, conf->host, conf->port, + /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session); +#else + status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE); +#endif + if (status != MOSQ_ERR_SUCCESS) + { + char errbuf[1024]; + ERROR ("mqtt plugin: mosquitto_connect failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror (status)); + + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; return (-1); } + if (!conf->publish) + { + mosquitto_message_callback_set (conf->mosq, on_message); + + status = mosquitto_subscribe (conf->mosq, + /* message_id = */ NULL, + conf->topic, conf->qos); + if (status != MOSQ_ERR_SUCCESS) + { + ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s", + conf->topic, mosquitto_strerror (status)); + + mosquitto_disconnect (conf->mosq); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + } + + conf->connected = 1; return (0); -} /* mqtt_publish_message */ +} /* mqtt_connect */ -static int mqtt_format_metric_value (char *buf, size_t buf_len, - const data_set_t *data_set, const value_list_t *vl, int ds_num) +static void *subscribers_thread (void *arg) { - gauge_t *rates = NULL; - gauge_t *value = NULL; - size_t metric_value_len; - int status = 0; - - memset (buf, 0, buf_len); + mqtt_client_conf_t *conf = arg; + int status; - if (data_set->ds[ds_num].type == DS_TYPE_GAUGE) - value = &vl->values[ds_num].gauge; - else { - rates = uc_get_rate (data_set, vl); - value = &rates[ds_num]; - } + conf->loop = 1; - metric_value_len = ssnprintf (buf, buf_len, "%f", *value); + while (conf->loop) + { + status = mqtt_connect (conf); + if (status != 0) + { + sleep (1); + continue; + } - if (metric_value_len >= buf_len) - return (-ENOMEM); + /* The documentation says "0" would map to the default (1000ms), but + * that does not work on some versions. */ +#if LIBMOSQUITTO_MAJOR == 0 + status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */); +#else + status = mosquitto_loop (conf->mosq, + /* timeout[ms] = */ 1000, + /* max_packets = */ 100); +#endif + if (status == MOSQ_ERR_CONN_LOST) + { + conf->connected = 0; + continue; + } + else if (status != MOSQ_ERR_SUCCESS) + { + ERROR ("mqtt plugin: mosquitto_loop failed: %s", + mosquitto_strerror (status)); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + conf->connected = 0; + continue; + } - if (rates) - sfree (rates); + DEBUG ("mqtt plugin: mosquitto_loop succeeded."); + } /* while (conf->loop) */ - return (status); -} /* mqtt_format_metric_value */ + pthread_exit (0); +} /* void *subscribers_thread */ -static int mqtt_format_message_topic (char *buf, size_t buf_len, - char const *prefix, const value_list_t *vl, const char *ds_name) +static int publish (mqtt_client_conf_t *conf, char const *topic, + void const *payload, size_t payload_len) { - size_t topic_buf_len; - - memset (buf, 0, buf_len); - - /* - MQTT message topic format: - [/]////// - */ - topic_buf_len = (size_t) ssnprintf (buf, buf_len, - "%s/%s/%s/%s/%s/%s/%s", - prefix, - vl->host, - vl->plugin, - vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)", - vl->type, - vl->type_instance[0] != '\0' ? vl->type_instance : "(null)", - ds_name); - - if (topic_buf_len >= buf_len) + int status; + + pthread_mutex_lock (&conf->lock); + + status = mqtt_connect (conf); + if (status != 0) { + pthread_mutex_unlock (&conf->lock); + ERROR ("mqtt plugin: unable to reconnect to broker"); + return (status); + } + + status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic, +#if LIBMOSQUITTO_MAJOR == 0 + (uint32_t) payload_len, payload, +#else + (int) payload_len, payload, +#endif + conf->qos, conf->retain); + if (status != MOSQ_ERR_SUCCESS) { - ERROR ("mqtt_format_message_topic: topic buffer too small: " - "Need %zu bytes.", topic_buf_len + 1); - return (-ENOMEM); + char errbuf[1024]; + c_complain (LOG_ERR, + &conf->complaint_cantpublish, + "mqtt plugin: mosquitto_publish failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror(errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror(status)); + /* Mark our connection "down" regardless of the error as a safety + * measure; we will try to reconnect the next time we have to publish a + * message */ + conf->connected = 0; + + pthread_mutex_unlock (&conf->lock); + return (-1); } + pthread_mutex_unlock (&conf->lock); + return (0); +} /* int publish */ + +static int format_topic (char *buf, size_t buf_len, + data_set_t const *ds, value_list_t const *vl, + mqtt_client_conf_t *conf) +{ + char name[MQTT_MAX_TOPIC_SIZE]; + int status; + + if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0)) + return (FORMAT_VL (buf, buf_len, vl)); + + status = FORMAT_VL (name, sizeof (name), vl); + if (status != 0) + return (status); + + status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name); + if ((status < 0) || (((size_t) status) >= buf_len)) + return (ENOMEM); + return (0); -} /* mqtt_format_message_topic */ +} /* int format_topic */ -static int mqtt_format_payload (char *buf, size_t buf_len, - const data_set_t *data_set, const value_list_t *vl, int ds_num) +static int mqtt_write (const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data) { - char metric_path[10 * DATA_MAX_NAME_LEN]; - char metric_value[512]; - size_t payload_buf_len; + mqtt_client_conf_t *conf; + char topic[MQTT_MAX_TOPIC_SIZE]; + char payload[MQTT_MAX_MESSAGE_SIZE]; int status = 0; - memset (buf, 0, buf_len); - - ssnprintf (metric_path, sizeof (metric_path), - "%s.%s%s%s.%s%s%s%s%s", - vl->host, - vl->plugin, - vl->plugin_instance[0] != '\0' ? "." : "", - vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "", - vl->type, - vl->type_instance[0] != '\0' ? "." : "", - vl->type_instance[0] != '\0' ? vl->type_instance : "", - strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "", - strcmp(data_set->ds[ds_num].name, "value") != 0 ? - data_set->ds[ds_num].name : ""); - - status = mqtt_format_metric_value (metric_value, - sizeof (metric_value), - data_set, - vl, - ds_num); + if ((user_data == NULL) || (user_data->data == NULL)) + return (EINVAL); + conf = user_data->data; + status = format_topic (topic, sizeof (topic), ds, vl, conf); if (status != 0) { - ERROR ("mqtt_format_payload: error with mqtt_format_metric_value"); + ERROR ("mqtt plugin: format_topic failed with status %d.", status); return (status); } - payload_buf_len = (size_t) ssnprintf (buf, buf_len, - "%s %s %u", - metric_path, - metric_value, - (unsigned int) CDTIME_T_TO_TIME_T (vl->time)); + status = format_values (payload, sizeof (payload), + ds, vl, conf->store_rates); + if (status != 0) + { + ERROR ("mqtt plugin: format_values failed with status %d.", status); + return (status); + } - if (payload_buf_len >= buf_len) + status = publish (conf, topic, payload, strlen (payload) + 1); + if (status != 0) { - ERROR ("mqtt_format_payload: payload buffer too small: " - "Need %zu bytes.", payload_buf_len + 1); - return (-ENOMEM); + ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status)); + return (status); } return (status); -} /* mqtt_format_payload */ +} /* mqtt_write */ -static int mqtt_write (const data_set_t *data_set, const value_list_t *vl, - user_data_t *user_data) +/* + * + * Host "example.com" + * Port 1883 + * ClientId "collectd" + * User "guest" + * Password "secret" + * Prefix "collectd" + * StoreRates true + * Retain false + * QoS 0 + * CACert "ca.pem" Enables TLS if set + * CertificateFile "client-cert.pem" optional + * CertificateKeyFile "client-key.pem" optional + * TLSProtocol "tlsv1.2" optional + * + */ +static int mqtt_config_publisher (oconfig_item_t *ci) { - struct mqtt_client_conf *conf; - char msg_topic[MQTT_MAX_TOPIC_SIZE]; - char msg_payload[MQTT_MAX_MESSAGE_SIZE]; - int status = 0; - int i; - - if (user_data == NULL) - return (EINVAL); - - conf = user_data->data; + mqtt_client_conf_t *conf; + char cb_name[1024]; + int status; - if (!conf->connected) + conf = calloc (1, sizeof (*conf)); + if (conf == NULL) { - status = mqtt_reconnect_broker (conf); + ERROR ("mqtt plugin: calloc failed."); + return (-1); + } + conf->publish = 1; - if (status != 0) { - ERROR ("plugin mqtt: unable to reconnect to broker"); - return (status); - } + conf->name = NULL; + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + mqtt_free (conf); + return (status); } - for (i = 0; i < data_set->ds_num; i++) + conf->host = strdup (MQTT_DEFAULT_HOST); + conf->port = MQTT_DEFAULT_PORT; + conf->client_id = NULL; + conf->qos = 0; + conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX); + conf->store_rates = 1; + + status = pthread_mutex_init (&conf->lock, NULL); + if (status != 0) { - status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic), - conf->topic_prefix, vl, data_set->ds[i].name); - if (status != 0) - { - ERROR ("plugin mqtt: error with mqtt_format_message_topic"); - return (status); - } + mqtt_free (conf); + return (status); + } - status = mqtt_format_payload (msg_payload, - sizeof (msg_payload), - data_set, - vl, - i); + C_COMPLAIN_INIT (&conf->complaint_cantpublish); - if (status != 0) + for (int i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("Host", child->key) == 0) + cf_util_get_string (child, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) { - ERROR ("mqtt_write: error with mqtt_format_payload"); - return (status); + int tmp = cf_util_get_port_number (child); + if (tmp < 0) + ERROR ("mqtt plugin: Invalid port number."); + else + conf->port = tmp; } - - status = mqtt_publish_message (conf, - msg_topic, - msg_payload, - sizeof (msg_payload)); - if (status != 0) + else if (strcasecmp ("ClientId", child->key) == 0) + cf_util_get_string (child, &conf->client_id); + else if (strcasecmp ("User", child->key) == 0) + cf_util_get_string (child, &conf->username); + else if (strcasecmp ("Password", child->key) == 0) + cf_util_get_string (child, &conf->password); + else if (strcasecmp ("QoS", child->key) == 0) { - ERROR ("plugin mqtt: unable to publish message"); - return (status); + int tmp = -1; + status = cf_util_get_int (child, &tmp); + if ((status != 0) || (tmp < 0) || (tmp > 2)) + ERROR ("mqtt plugin: Not a valid QoS setting."); + else + conf->qos = tmp; } - - DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m " - "published message: topic=%s payload=%s", - (unsigned)pthread_self(), - msg_topic, - msg_payload); + else if (strcasecmp ("Prefix", child->key) == 0) + cf_util_get_string (child, &conf->topic_prefix); + else if (strcasecmp ("StoreRates", child->key) == 0) + cf_util_get_boolean (child, &conf->store_rates); + else if (strcasecmp ("Retain", child->key) == 0) + cf_util_get_boolean (child, &conf->retain); + else if (strcasecmp ("CACert", child->key) == 0) + cf_util_get_string (child, &conf->cacertificatefile); + else if (strcasecmp ("CertificateFile", child->key) == 0) + cf_util_get_string (child, &conf->certificatefile); + else if (strcasecmp ("CertificateKeyFile", child->key) == 0) + cf_util_get_string (child, &conf->certificatekeyfile); + else if (strcasecmp ("TLSProtocol", child->key) == 0) + cf_util_get_string (child, &conf->tlsprotocol); + else if (strcasecmp ("CipherSuite", child->key) == 0) + cf_util_get_string (child, &conf->ciphersuite); + else + ERROR ("mqtt plugin: Unknown config option: %s", child->key); } - return (status); -} /* mqtt_write */ + ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name); + user_data_t user_data = { + .data = conf + }; -static int mqtt_config (oconfig_item_t *ci) + plugin_register_write (cb_name, mqtt_write, &user_data); + return (0); +} /* mqtt_config_publisher */ + +/* + * + * Host "example.com" + * Port 1883 + * ClientId "collectd" + * User "guest" + * Password "secret" + * Topic "collectd/#" + * + */ +static int mqtt_config_subscriber (oconfig_item_t *ci) { - struct mqtt_client_conf *conf; - user_data_t user_data; - char errbuf[1024]; + mqtt_client_conf_t **tmp; + mqtt_client_conf_t *conf; int status; - DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ", - (unsigned)pthread_self()); - - conf = malloc (sizeof (*conf)); + conf = calloc (1, sizeof (*conf)); if (conf == NULL) { - ERROR ("write_mqtt plugin: malloc failed."); + ERROR ("mqtt plugin: calloc failed."); return (-1); } + conf->publish = 0; - memset (conf, 0, sizeof (*conf)); + conf->name = NULL; + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + mqtt_free (conf); + return (status); + } - conf->connected = false; - conf->host = MQTT_DEFAULT_HOST; + conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; - conf->client_id = MQTT_DEFAULT_CLIENT_ID; - conf->topic_prefix = MQTT_DEFAULT_TOPIC_PREFIX; - C_COMPLAIN_INIT (&conf->complaint_cantpublish); - - memset (&user_data, 0, sizeof (user_data)); - user_data.data = conf; + conf->client_id = NULL; + conf->qos = 2; + conf->topic = strdup (MQTT_DEFAULT_TOPIC); + conf->clean_session = 1; - if ((conf->mosq = mosquitto_new (conf->client_id, true, NULL)) == NULL) { - ERROR ("mqtt_config: mosquitto_new failed"); - return (-1); + status = pthread_mutex_init (&conf->lock, NULL); + if (status != 0) + { + mqtt_free (conf); + return (status); } - status = mosquitto_connect (conf->mosq, conf->host, conf->port, 10); + C_COMPLAIN_INIT (&conf->complaint_cantpublish); - if (status != MOSQ_ERR_SUCCESS) { - ERROR ("mqtt_config: mosquitto_connect failed: %s", - (status == MOSQ_ERR_ERRNO ? - sstrerror(errno, errbuf, sizeof (errbuf)) : - mosquitto_strerror (status))); - return (-1); + for (int i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("Host", child->key) == 0) + cf_util_get_string (child, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) + { + status = cf_util_get_port_number (child); + if (status < 0) + ERROR ("mqtt plugin: Invalid port number."); + else + conf->port = status; + } + else if (strcasecmp ("ClientId", child->key) == 0) + cf_util_get_string (child, &conf->client_id); + else if (strcasecmp ("User", child->key) == 0) + cf_util_get_string (child, &conf->username); + else if (strcasecmp ("Password", child->key) == 0) + cf_util_get_string (child, &conf->password); + else if (strcasecmp ("QoS", child->key) == 0) + { + int qos = -1; + status = cf_util_get_int (child, &qos); + if ((status != 0) || (qos < 0) || (qos > 2)) + ERROR ("mqtt plugin: Not a valid QoS setting."); + else + conf->qos = qos; + } + else if (strcasecmp ("Topic", child->key) == 0) + cf_util_get_string (child, &conf->topic); + else if (strcasecmp ("CleanSession", child->key) == 0) + cf_util_get_boolean (child, &conf->clean_session); + else + ERROR ("mqtt plugin: Unknown config option: %s", child->key); } - DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"", - conf->host, conf->port); + tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) ); + if (tmp == NULL) + { + ERROR ("mqtt plugin: realloc failed."); + mqtt_free (conf); + return (-1); + } + subscribers = tmp; + subscribers[subscribers_num] = conf; + subscribers_num++; - conf->connected = true; + return (0); +} /* mqtt_config_subscriber */ - plugin_register_write ("mqtt", mqtt_write, &user_data); +/* + * + * + * # ... + * + * + * # ... + * + * + */ +static int mqtt_config (oconfig_item_t *ci) +{ + for (int i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Publish", child->key) == 0) + mqtt_config_publisher (child); + else if (strcasecmp ("Subscribe", child->key) == 0) + mqtt_config_subscriber (child); + else + ERROR ("mqtt plugin: Unknown config option: %s", child->key); + } return (0); -} /* mqtt_config */ +} /* int mqtt_config */ static int mqtt_init (void) { - mosquitto_lib_init(); + mosquitto_lib_init (); + + for (size_t i = 0; i < subscribers_num; i++) + { + int status; + + if (subscribers[i]->loop) + continue; + + status = plugin_thread_create (&subscribers[i]->thread, + /* attrs = */ NULL, + /* func = */ subscribers_thread, + /* args = */ subscribers[i]); + if (status != 0) + { + char errbuf[1024]; + ERROR ("mqtt plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + continue; + } + } return (0); } /* mqtt_init */