mqtt plugin: Correctly check the return value of format_topic().
[collectd.git] / src / mqtt.c
index ef46e52..163bd4a 100644 (file)
@@ -47,7 +47,7 @@
 struct mqtt_client_conf
 {
     struct mosquitto    *mosq;
-    bool                connected;
+    _Bool               connected;
     char                *host;
     int                 port;
     char                *client_id;
@@ -95,26 +95,28 @@ static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
     status = mosquitto_reconnect (conf->mosq);
     if (status != MOSQ_ERR_SUCCESS)
     {
+        char errbuf[1024];
         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
-            (status == MOSQ_ERR_ERRNO ?
-                strerror(errno) : mosquitto_strerror (status)));
+                (status == MOSQ_ERR_ERRNO)
+                ? sstrerror(errno, errbuf, sizeof (errbuf))
+                : mosquitto_strerror (status));
         return (-1);
     }
 
-    conf->connected = true;
+    conf->connected = 1;
 
     c_release (LOG_INFO,
-        &conf->complaint_cantpublish,
-        "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
-        conf->host, conf->port);
+            &conf->complaint_cantpublish,
+            "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
+            conf->host, conf->port);
 
     return (0);
 } /* mqtt_reconnect_broker */
 
-static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
+static int publish (mqtt_client_conf_t *conf, char const *topic,
     void const *payload, size_t payload_len)
 {
-    char errbuf[1024];
+    int const qos = 0; /* TODO: Config option */
     int status;
 
     pthread_mutex_lock (&conf->lock);
@@ -127,25 +129,24 @@ static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
     }
 
     status = mosquitto_publish(conf->mosq,
-        /* message id */ NULL,
-        topic,
-        (int) payload_len,
-        payload,
-        /* qos */ 0,
-        /* retain */ false);
+            /* message id */ NULL,
+            topic,
+            (uint32_t) payload_len, payload,
+            /* qos */ qos,
+            /* retain */ false);
     if (status != MOSQ_ERR_SUCCESS)
     {
+        char errbuf[1024];
         c_complain (LOG_ERR,
-            &conf->complaint_cantpublish,
-            "plugin mqtt: mosquitto_publish failed: %s",
-            status == MOSQ_ERR_ERRNO ?
-            sstrerror(errno, errbuf, sizeof (errbuf)) :
+                &conf->complaint_cantpublish,
+                "plugin mqtt: mosquitto_publish failed: %s",
+                status == MOSQ_ERR_ERRNO ?
+                sstrerror(errno, errbuf, sizeof (errbuf)) :
                 mosquitto_strerror(status));
-        /*
-        Mark our connection "down" regardless of the error as a safety measure;
-        we will try to reconnect the next time we have to publish a message
-        */
-        conf->connected = false;
+        /* Mark our connection "down" regardless of the error as a safety
+         * measure; we will try to reconnect the next time we have to publish a
+         * message */
+        conf->connected = 0;
 
         pthread_mutex_unlock (&conf->lock);
         return (-1);
@@ -153,7 +154,7 @@ static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
 
     pthread_mutex_unlock (&conf->lock);
     return (0);
-} /* mqtt_publish_message */
+} /* int publish */
 
 static int format_topic (char *buf, size_t buf_len,
     data_set_t const *ds, value_list_t const *vl,
@@ -190,6 +191,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
     conf = user_data->data;
 
     status = format_topic (topic, sizeof (topic), ds, vl, conf);
+    if (status != 0)
     {
         ERROR ("mqtt plugin: format_topic failed with status %d.", status);
         return (status);
@@ -213,11 +215,20 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
     return (status);
 } /* mqtt_write */
 
+/*
+ * <Plugin mqtt>
+ *   Host "example.com"
+ *   Port 1883
+ *   Prefix "collectd"
+ *   ClientId "collectd"
+ * </Plugin>
+ */
 static int mqtt_config (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
     user_data_t user_data;
     int status;
+    int i;
 
     conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
@@ -226,13 +237,36 @@ static int mqtt_config (oconfig_item_t *ci)
         return (-1);
     }
 
-    conf->connected = false;
-    conf->host = MQTT_DEFAULT_HOST;
+    conf->connected = 0;
+    conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
-    conf->client_id = MQTT_DEFAULT_CLIENT_ID;
-    conf->topic_prefix = MQTT_DEFAULT_TOPIC_PREFIX;
+    conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID);
+    conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+        if (strcasecmp ("Host", child->key) == 0)
+            cf_util_get_string (child, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
+        {
+            int tmp = cf_util_get_port_number (child);
+            if (tmp < 0)
+            {
+                ERROR ("mqtt plugin: Invalid port number.");
+                continue;
+            }
+            conf->port = tmp;
+        }
+        else if (strcasecmp ("Prefix", child->key) == 0)
+            cf_util_get_string (child, &conf->topic_prefix);
+        else if (strcasecmp ("ClientId", child->key) == 0)
+            cf_util_get_string (child, &conf->client_id);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+    }
+
     memset (&user_data, 0, sizeof (user_data));
     user_data.data = conf;
 
@@ -260,7 +294,7 @@ static int mqtt_config (oconfig_item_t *ci)
     DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
         conf->host, conf->port);
 
-    conf->connected = true;
+    conf->connected = 1;
 
     plugin_register_write ("mqtt", mqtt_write, &user_data);