mqtt plugin: Change message format to one topic per metric.
[collectd.git] / src / mqtt.c
index 3bed0a5..ef46e52 100644 (file)
@@ -55,26 +55,49 @@ struct mqtt_client_conf
     c_complain_t        complaint_cantpublish;
     pthread_mutex_t     lock;
 };
+typedef struct mqtt_client_conf mqtt_client_conf_t;
+
+static char const *mosquitto_strerror (int code)
+{
+    switch (code)
+    {
+        case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
+        case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
+        case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
+        case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
+        case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
+        case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
+        case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
+        case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
+        case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
+        case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
+        case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
+        case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
+        case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
+        case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
+        case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
+    }
+
+    return "UNKNOWN ERROR CODE";
+}
 
 /*
  * Functions
  */
-static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
+/* must hold conf->lock when calling. */
+static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
 {
     int status;
 
     if (conf->connected)
         return (0);
 
-    pthread_mutex_lock (&conf->lock);
-
     status = mosquitto_reconnect (conf->mosq);
-
-    if (status != MOSQ_ERR_SUCCESS) {
+    if (status != MOSQ_ERR_SUCCESS)
+    {
         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
             (status == MOSQ_ERR_ERRNO ?
                 strerror(errno) : mosquitto_strerror (status)));
-        pthread_mutex_unlock (&conf->lock);
         return (-1);
     }
 
@@ -85,17 +108,24 @@ static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
         "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
         conf->host, conf->port);
 
-    pthread_mutex_unlock (&conf->lock);
-
     return (0);
 } /* mqtt_reconnect_broker */
 
-static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
-    char *payload, size_t payload_len)
+static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
+    void const *payload, size_t payload_len)
 {
     char errbuf[1024];
     int status;
 
+    pthread_mutex_lock (&conf->lock);
+
+    status = mqtt_reconnect_broker (conf);
+    if (status != 0) {
+        pthread_mutex_unlock (&conf->lock);
+        ERROR ("mqtt plugin: unable to reconnect to broker");
+        return (status);
+    }
+
     status = mosquitto_publish(conf->mosq,
         /* message id */ NULL,
         topic,
@@ -103,7 +133,6 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
         payload,
         /* qos */ 0,
         /* retain */ false);
-
     if (status != MOSQ_ERR_SUCCESS)
     {
         c_complain (LOG_ERR,
@@ -118,183 +147,67 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
         */
         conf->connected = false;
 
+        pthread_mutex_unlock (&conf->lock);
         return (-1);
     }
 
+    pthread_mutex_unlock (&conf->lock);
     return (0);
 } /* mqtt_publish_message */
 
-static int mqtt_format_metric_value (char *buf, size_t buf_len,
-    const data_set_t *data_set, const value_list_t *vl, int ds_num)
-{
-    gauge_t *rates = NULL;
-    gauge_t *value = NULL;
-    size_t metric_value_len;
-    int status = 0;
-
-    memset (buf, 0, buf_len);
-
-    if (data_set->ds[ds_num].type == DS_TYPE_GAUGE)
-        value = &vl->values[ds_num].gauge;
-    else {
-        rates = uc_get_rate (data_set, vl);
-        value = &rates[ds_num];
-    }
-
-    metric_value_len = ssnprintf (buf, buf_len, "%f", *value);
-
-    if (metric_value_len >= buf_len)
-        return (-ENOMEM);
-
-    if (rates)
-        sfree (rates);
-
-    return (status);
-} /* mqtt_format_metric_value */
-
-static int mqtt_format_message_topic (char *buf, size_t buf_len,
-    char const *prefix, const value_list_t *vl, const char *ds_name)
-{
-    size_t topic_buf_len;
-
-    memset (buf, 0, buf_len);
-
-    /*
-        MQTT message topic format:
-        [<prefix>/]<hostname>/<plugin>/<plugin instance>/<type>/<type instance>/<ds>/
-    */
-    topic_buf_len = (size_t) ssnprintf (buf, buf_len,
-        "%s/%s/%s/%s/%s/%s/%s",
-        prefix,
-        vl->host,
-        vl->plugin,
-        vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)",
-        vl->type,
-        vl->type_instance[0] != '\0' ? vl->type_instance : "(null)",
-        ds_name);
-
-    if (topic_buf_len >= buf_len)
-    {
-        ERROR ("mqtt_format_message_topic: topic buffer too small: "
-                "Need %zu bytes.", topic_buf_len + 1);
-        return (-ENOMEM);
-    }
-
-    return (0);
-} /* mqtt_format_message_topic */
-
-static int mqtt_format_payload (char *buf, size_t buf_len,
-    const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static int format_topic (char *buf, size_t buf_len,
+    data_set_t const *ds, value_list_t const *vl,
+    mqtt_client_conf_t *conf)
 {
-    char metric_path[10 * DATA_MAX_NAME_LEN];
-    char metric_value[512];
-    size_t payload_buf_len;
-    int status = 0;
+    char name[MQTT_MAX_TOPIC_SIZE];
+    int status;
 
-    memset (buf, 0, buf_len);
-
-    ssnprintf (metric_path, sizeof (metric_path),
-        "%s.%s%s%s.%s%s%s%s%s",
-        vl->host,
-        vl->plugin,
-        vl->plugin_instance[0] != '\0' ? "." : "",
-        vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "",
-        vl->type,
-        vl->type_instance[0] != '\0' ? "." : "",
-        vl->type_instance[0] != '\0' ? vl->type_instance : "",
-        strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "",
-        strcmp(data_set->ds[ds_num].name, "value") != 0 ?
-            data_set->ds[ds_num].name : "");
-
-    status = mqtt_format_metric_value (metric_value,
-        sizeof (metric_value),
-        data_set,
-        vl,
-        ds_num);
+    if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
+        return (FORMAT_VL (buf, buf_len, vl));
 
+    status = FORMAT_VL (name, sizeof (name), vl);
     if (status != 0)
-    {
-        ERROR ("mqtt_format_payload: error with mqtt_format_metric_value");
         return (status);
-    }
-
-    payload_buf_len = (size_t) ssnprintf (buf, buf_len,
-        "%s %s %u",
-        metric_path,
-        metric_value,
-        (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
 
-    if (payload_buf_len >= buf_len)
-    {
-        ERROR ("mqtt_format_payload: payload buffer too small: "
-                "Need %zu bytes.", payload_buf_len + 1);
-        return (-ENOMEM);
-    }
+    status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
+    if ((status < 0) || (((size_t) status) >= buf_len))
+        return (ENOMEM);
 
-    return (status);
-} /* mqtt_format_payload */
+    return (0);
+} /* int format_topic */
 
-static int mqtt_write (const data_set_t *data_set, const value_list_t *vl,
+static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
     user_data_t *user_data)
 {
-    struct mqtt_client_conf *conf;
-    char msg_topic[MQTT_MAX_TOPIC_SIZE];
-    char msg_payload[MQTT_MAX_MESSAGE_SIZE];
+    mqtt_client_conf_t *conf;
+    char topic[MQTT_MAX_TOPIC_SIZE];
+    char payload[MQTT_MAX_MESSAGE_SIZE];
     int status = 0;
-    int i;
+    _Bool const store_rates = 0; /* TODO: Config option */
 
-    if (user_data == NULL)
+    if ((user_data == NULL) || (user_data->data == NULL))
         return (EINVAL);
-
     conf = user_data->data;
 
-    if (!conf->connected)
+    status = format_topic (topic, sizeof (topic), ds, vl, conf);
     {
-        status = mqtt_reconnect_broker (conf);
+        ERROR ("mqtt plugin: format_topic failed with status %d.", status);
+        return (status);
+    }
 
-        if (status != 0) {
-            ERROR ("plugin mqtt: unable to reconnect to broker");
-            return (status);
-        }
+    status = format_values (payload, sizeof (payload),
+            ds, vl, store_rates);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: format_values failed with status %d.", status);
+        return (status);
     }
 
-    for (i = 0; i < data_set->ds_num; i++)
+    status = publish (conf, topic, payload, sizeof (payload));
+    if (status != 0)
     {
-        status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic),
-            conf->topic_prefix, vl, data_set->ds[i].name);
-        if (status != 0)
-        {
-            ERROR ("plugin mqtt: error with mqtt_format_message_topic");
-            return (status);
-        }
-
-        status = mqtt_format_payload (msg_payload,
-            sizeof (msg_payload),
-            data_set,
-            vl,
-            i);
-
-        if (status != 0)
-        {
-            ERROR ("mqtt_write: error with mqtt_format_payload");
-            return (status);
-        }
-
-        status = mqtt_publish_message (conf,
-            msg_topic,
-            msg_payload,
-            sizeof (msg_payload));
-        if (status != 0)
-        {
-            ERROR ("plugin mqtt: unable to publish message");
-            return (status);
-        }
-
-        DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m "
-            "published message: topic=%s payload=%s",
-            (unsigned)pthread_self(),
-            msg_topic,
-            msg_payload);
+        ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
+        return (status);
     }
 
     return (status);
@@ -302,23 +215,17 @@ static int mqtt_write (const data_set_t *data_set, const value_list_t *vl,
 
 static int mqtt_config (oconfig_item_t *ci)
 {
-    struct mqtt_client_conf *conf;
+    mqtt_client_conf_t *conf;
     user_data_t user_data;
-    char errbuf[1024];
     int status;
 
-    DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ",
-        (unsigned)pthread_self());
-
-    conf = malloc (sizeof (*conf));
+    conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("write_mqtt plugin: malloc failed.");
+        ERROR ("mqtt plugin: malloc failed.");
         return (-1);
     }
 
-    memset (conf, 0, sizeof (*conf));
-
     conf->connected = false;
     conf->host = MQTT_DEFAULT_HOST;
     conf->port = MQTT_DEFAULT_PORT;
@@ -329,18 +236,24 @@ static int mqtt_config (oconfig_item_t *ci)
     memset (&user_data, 0, sizeof (user_data));
     user_data.data = conf;
 
-    if ((conf->mosq = mosquitto_new (conf->client_id, true, NULL)) == NULL) {
-        ERROR ("mqtt_config: mosquitto_new failed");
+    conf->mosq = mosquitto_new (conf->client_id, /* user data = */ conf);
+    if (conf->mosq == NULL)
+    {
+        ERROR ("mqtt plugin: mosquitto_new failed");
+        free (conf);
         return (-1);
     }
 
-    status = mosquitto_connect (conf->mosq, conf->host, conf->port, 10);
-
-    if (status != MOSQ_ERR_SUCCESS) {
-        ERROR ("mqtt_config: mosquitto_connect failed: %s",
-            (status == MOSQ_ERR_ERRNO ?
-                sstrerror(errno, errbuf, sizeof (errbuf)) :
-                mosquitto_strerror (status)));
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port,
+            /* keepalive = */ 10, /* clean session = */ 1);
+    if (status != MOSQ_ERR_SUCCESS)
+    {
+        char errbuf[1024];
+        ERROR ("mqtt plugin: mosquitto_connect failed: %s",
+                (status == MOSQ_ERR_ERRNO)
+                ? sstrerror (errno, errbuf, sizeof (errbuf))
+                : mosquitto_strerror (status));
+        free (conf);
         return (-1);
     }