From d91da120502dff291ebc57d45ee3d222d5ad317e Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 21 Nov 2014 12:05:39 +0100 Subject: [PATCH] mqtt plugin: Add support for multiple brokers. Also adds support for authentication and configuring a couple of settings (QoS, rates, retention). --- src/mqtt.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 112 insertions(+), 20 deletions(-) diff --git a/src/mqtt.c b/src/mqtt.c index 163bd4ad..457d7bd8 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -46,12 +46,22 @@ */ struct mqtt_client_conf { - struct mosquitto *mosq; + char *name; + + struct mosquitto *mosq; _Bool connected; - char *host; + + char *host; int port; - char *client_id; - char *topic_prefix; + char *username; + char *password; + + char *client_id; + char *topic_prefix; + _Bool store_rates; + _Bool retain; + int qos; + c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; @@ -81,6 +91,24 @@ static char const *mosquitto_strerror (int code) return "UNKNOWN ERROR CODE"; } +static void mqtt_free (mqtt_client_conf_t *conf) +{ + if (conf == NULL) + return; + + if (conf->connected) + (void) mosquitto_disconnect (conf->mosq); + conf->connected = 0; + (void) mosquitto_destroy (conf->mosq); + + sfree (conf->host); + sfree (conf->username); + sfree (conf->password); + sfree (conf->client_id); + sfree (conf->topic_prefix); + sfree (conf); +} + /* * Functions */ @@ -116,7 +144,6 @@ static int mqtt_reconnect_broker (mqtt_client_conf_t *conf) static int publish (mqtt_client_conf_t *conf, char const *topic, void const *payload, size_t payload_len) { - int const qos = 0; /* TODO: Config option */ int status; pthread_mutex_lock (&conf->lock); @@ -132,8 +159,8 @@ static int publish (mqtt_client_conf_t *conf, char const *topic, /* message id */ NULL, topic, (uint32_t) payload_len, payload, - /* qos */ qos, - /* retain */ false); + /* qos */ conf->qos, + /* retain */ conf->retain); if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; @@ -184,7 +211,6 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl, char topic[MQTT_MAX_TOPIC_SIZE]; char payload[MQTT_MAX_MESSAGE_SIZE]; int status = 0; - _Bool const store_rates = 0; /* TODO: Config option */ if ((user_data == NULL) || (user_data->data == NULL)) return (EINVAL); @@ -198,7 +224,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl, } status = format_values (payload, sizeof (payload), - ds, vl, store_rates); + ds, vl, conf->store_rates); if (status != 0) { ERROR ("mqtt plugin: format_values failed with status %d.", status); @@ -216,14 +242,19 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl, } /* mqtt_write */ /* - * + * * Host "example.com" * Port 1883 * Prefix "collectd" * ClientId "collectd" - * + * User "guest" + * Password "secret" + * StoreRates true + * Retain false + * QoS 0 + * */ -static int mqtt_config (oconfig_item_t *ci) +static int mqtt_config_broker (oconfig_item_t *ci) { mqtt_client_conf_t *conf; user_data_t user_data; @@ -237,7 +268,14 @@ static int mqtt_config (oconfig_item_t *ci) return (-1); } - conf->connected = 0; + conf->name = NULL; + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + mqtt_free (conf); + return (status); + } + conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID); @@ -253,16 +291,31 @@ static int mqtt_config (oconfig_item_t *ci) { int tmp = cf_util_get_port_number (child); if (tmp < 0) - { ERROR ("mqtt plugin: Invalid port number."); - continue; - } - conf->port = tmp; + else + conf->port = tmp; } else if (strcasecmp ("Prefix", child->key) == 0) cf_util_get_string (child, &conf->topic_prefix); else if (strcasecmp ("ClientId", child->key) == 0) cf_util_get_string (child, &conf->client_id); + else if (strcasecmp ("User", child->key) == 0) + cf_util_get_string (child, &conf->username); + else if (strcasecmp ("Password", child->key) == 0) + cf_util_get_string (child, &conf->password); + else if (strcasecmp ("StoreRates", child->key) == 0) + cf_util_get_boolean (child, &conf->store_rates); + else if (strcasecmp ("Retain", child->key) == 0) + cf_util_get_boolean (child, &conf->retain); + else if (strcasecmp ("QoS", child->key) == 0) + { + int tmp = -1; + status = cf_util_get_int (child, &tmp); + if ((status != 0) || (tmp < 0) || (tmp > 2)) + ERROR ("mqtt plugin: Not a valid QoS setting."); + else + conf->qos = tmp; + } else ERROR ("mqtt plugin: Unknown config option: %s", child->key); } @@ -274,10 +327,25 @@ static int mqtt_config (oconfig_item_t *ci) if (conf->mosq == NULL) { ERROR ("mqtt plugin: mosquitto_new failed"); - free (conf); + mqtt_free (conf); return (-1); } + if (conf->username && conf->password) + { + status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password); + if (status != MOSQ_ERR_SUCCESS) + { + char errbuf[1024]; + ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror (status)); + mqtt_free (conf); + return (-1); + } + } + status = mosquitto_connect (conf->mosq, conf->host, conf->port, /* keepalive = */ 10, /* clean session = */ 1); if (status != MOSQ_ERR_SUCCESS) @@ -287,7 +355,7 @@ static int mqtt_config (oconfig_item_t *ci) (status == MOSQ_ERR_ERRNO) ? sstrerror (errno, errbuf, sizeof (errbuf)) : mosquitto_strerror (status)); - free (conf); + mqtt_free (conf); return (-1); } @@ -299,7 +367,31 @@ static int mqtt_config (oconfig_item_t *ci) plugin_register_write ("mqtt", mqtt_write, &user_data); return (0); -} /* mqtt_config */ +} /* mqtt_config_broker */ + +/* + * + * + * # ... + * + * + */ +static int mqtt_config (oconfig_item_t *ci) +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Publish", child->key) == 0) + mqtt_config_broker (child); + else + ERROR ("mqtt plugin: Unknown config option: %s", child->key); + } + + return (0); +} /* int mqtt_config */ static int mqtt_init (void) { -- 2.11.0