mqtt plugin: Add support for multiple brokers.
authorFlorian Forster <octo@collectd.org>
Fri, 21 Nov 2014 11:05:39 +0000 (12:05 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 6 Jul 2015 12:07:10 +0000 (14:07 +0200)
Also adds support for authentication and configuring a couple of settings
(QoS, rates, retention).

src/mqtt.c

index 163bd4a..457d7bd 100644 (file)
  */
 struct mqtt_client_conf
 {
-    struct mosquitto    *mosq;
+    char               *name;
+
+    struct mosquitto   *mosq;
     _Bool               connected;
-    char                *host;
+
+    char               *host;
     int                 port;
-    char                *client_id;
-    char                *topic_prefix;
+    char               *username;
+    char               *password;
+
+    char               *client_id;
+    char               *topic_prefix;
+    _Bool               store_rates;
+    _Bool               retain;
+    int qos;
+
     c_complain_t        complaint_cantpublish;
     pthread_mutex_t     lock;
 };
@@ -81,6 +91,24 @@ static char const *mosquitto_strerror (int code)
     return "UNKNOWN ERROR CODE";
 }
 
+static void mqtt_free (mqtt_client_conf_t *conf)
+{
+    if (conf == NULL)
+        return;
+
+    if (conf->connected)
+        (void) mosquitto_disconnect (conf->mosq);
+    conf->connected = 0;
+    (void) mosquitto_destroy (conf->mosq);
+
+    sfree (conf->host);
+    sfree (conf->username);
+    sfree (conf->password);
+    sfree (conf->client_id);
+    sfree (conf->topic_prefix);
+    sfree (conf);
+}
+
 /*
  * Functions
  */
@@ -116,7 +144,6 @@ static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
 static int publish (mqtt_client_conf_t *conf, char const *topic,
     void const *payload, size_t payload_len)
 {
-    int const qos = 0; /* TODO: Config option */
     int status;
 
     pthread_mutex_lock (&conf->lock);
@@ -132,8 +159,8 @@ static int publish (mqtt_client_conf_t *conf, char const *topic,
             /* message id */ NULL,
             topic,
             (uint32_t) payload_len, payload,
-            /* qos */ qos,
-            /* retain */ false);
+            /* qos */ conf->qos,
+            /* retain */ conf->retain);
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -184,7 +211,6 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
     char topic[MQTT_MAX_TOPIC_SIZE];
     char payload[MQTT_MAX_MESSAGE_SIZE];
     int status = 0;
-    _Bool const store_rates = 0; /* TODO: Config option */
 
     if ((user_data == NULL) || (user_data->data == NULL))
         return (EINVAL);
@@ -198,7 +224,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
     }
 
     status = format_values (payload, sizeof (payload),
-            ds, vl, store_rates);
+            ds, vl, conf->store_rates);
     if (status != 0)
     {
         ERROR ("mqtt plugin: format_values failed with status %d.", status);
@@ -216,14 +242,19 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
 } /* mqtt_write */
 
 /*
- * <Plugin mqtt>
+ * <Publish "name">
  *   Host "example.com"
  *   Port 1883
  *   Prefix "collectd"
  *   ClientId "collectd"
- * </Plugin>
+ *   User "guest"
+ *   Password "secret"
+ *   StoreRates true
+ *   Retain false
+ *   QoS 0
+ * </Publish>
  */
-static int mqtt_config (oconfig_item_t *ci)
+static int mqtt_config_broker (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
     user_data_t user_data;
@@ -237,7 +268,14 @@ static int mqtt_config (oconfig_item_t *ci)
         return (-1);
     }
 
-    conf->connected = 0;
+    conf->name = NULL;
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        mqtt_free (conf);
+        return (status);
+    }
+
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID);
@@ -253,16 +291,31 @@ static int mqtt_config (oconfig_item_t *ci)
         {
             int tmp = cf_util_get_port_number (child);
             if (tmp < 0)
-            {
                 ERROR ("mqtt plugin: Invalid port number.");
-                continue;
-            }
-            conf->port = tmp;
+            else
+                conf->port = tmp;
         }
         else if (strcasecmp ("Prefix", child->key) == 0)
             cf_util_get_string (child, &conf->topic_prefix);
         else if (strcasecmp ("ClientId", child->key) == 0)
             cf_util_get_string (child, &conf->client_id);
+        else if (strcasecmp ("User", child->key) == 0)
+            cf_util_get_string (child, &conf->username);
+        else if (strcasecmp ("Password", child->key) == 0)
+            cf_util_get_string (child, &conf->password);
+        else if (strcasecmp ("StoreRates", child->key) == 0)
+            cf_util_get_boolean (child, &conf->store_rates);
+        else if (strcasecmp ("Retain", child->key) == 0)
+            cf_util_get_boolean (child, &conf->retain);
+        else if (strcasecmp ("QoS", child->key) == 0)
+        {
+            int tmp = -1;
+            status = cf_util_get_int (child, &tmp);
+            if ((status != 0) || (tmp < 0) || (tmp > 2))
+                ERROR ("mqtt plugin: Not a valid QoS setting.");
+            else
+                conf->qos = tmp;
+        }
         else
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
@@ -274,10 +327,25 @@ static int mqtt_config (oconfig_item_t *ci)
     if (conf->mosq == NULL)
     {
         ERROR ("mqtt plugin: mosquitto_new failed");
-        free (conf);
+        mqtt_free (conf);
         return (-1);
     }
 
+    if (conf->username && conf->password)
+    {
+        status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
+        if (status != MOSQ_ERR_SUCCESS)
+        {
+            char errbuf[1024];
+            ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
+                    (status == MOSQ_ERR_ERRNO)
+                    ? sstrerror (errno, errbuf, sizeof (errbuf))
+                    : mosquitto_strerror (status));
+            mqtt_free (conf);
+            return (-1);
+        }
+    }
+
     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
             /* keepalive = */ 10, /* clean session = */ 1);
     if (status != MOSQ_ERR_SUCCESS)
@@ -287,7 +355,7 @@ static int mqtt_config (oconfig_item_t *ci)
                 (status == MOSQ_ERR_ERRNO)
                 ? sstrerror (errno, errbuf, sizeof (errbuf))
                 : mosquitto_strerror (status));
-        free (conf);
+        mqtt_free (conf);
         return (-1);
     }
 
@@ -299,7 +367,31 @@ static int mqtt_config (oconfig_item_t *ci)
     plugin_register_write ("mqtt", mqtt_write, &user_data);
 
     return (0);
-} /* mqtt_config */
+} /* mqtt_config_broker */
+
+/*
+ * <Plugin mqtt>
+ *   <Publish "name">
+ *     # ...
+ *   </Publish>
+ * </Plugin>
+ */
+static int mqtt_config (oconfig_item_t *ci)
+{
+    int i;
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp ("Publish", child->key) == 0)
+            mqtt_config_broker (child);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+    }
+
+    return (0);
+} /* int mqtt_config */
 
 static int mqtt_init (void)
 {