mqtt plugin: Concurrency fixes, pick up conf->lock in publish.
authorFlorian Forster <octo@collectd.org>
Thu, 20 Nov 2014 17:19:37 +0000 (18:19 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 6 Jul 2015 12:07:10 +0000 (14:07 +0200)
src/mqtt.c

index 7adc7f7..5c844a7 100644 (file)
@@ -55,6 +55,7 @@ struct mqtt_client_conf
     c_complain_t        complaint_cantpublish;
     pthread_mutex_t     lock;
 };
+typedef struct mqtt_client_conf mqtt_client_conf_t;
 
 static char const *mosquitto_strerror (int code)
 {
@@ -83,22 +84,20 @@ static char const *mosquitto_strerror (int code)
 /*
  * Functions
  */
-static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
+/* must hold conf->lock when calling. */
+static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
 {
     int status;
 
     if (conf->connected)
         return (0);
 
-    pthread_mutex_lock (&conf->lock);
-
     status = mosquitto_reconnect (conf->mosq);
-
-    if (status != MOSQ_ERR_SUCCESS) {
+    if (status != MOSQ_ERR_SUCCESS)
+    {
         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
             (status == MOSQ_ERR_ERRNO ?
                 strerror(errno) : mosquitto_strerror (status)));
-        pthread_mutex_unlock (&conf->lock);
         return (-1);
     }
 
@@ -109,17 +108,24 @@ static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
         "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
         conf->host, conf->port);
 
-    pthread_mutex_unlock (&conf->lock);
-
     return (0);
 } /* mqtt_reconnect_broker */
 
-static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
+static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
     void const *payload, size_t payload_len)
 {
     char errbuf[1024];
     int status;
 
+    pthread_mutex_lock (&conf->lock);
+
+    status = mqtt_reconnect_broker (conf);
+    if (status != 0) {
+        pthread_mutex_unlock (&conf->lock);
+        ERROR ("mqtt plugin: unable to reconnect to broker");
+        return (status);
+    }
+
     status = mosquitto_publish(conf->mosq,
         /* message id */ NULL,
         topic,
@@ -127,7 +133,6 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
         payload,
         /* qos */ 0,
         /* retain */ false);
-
     if (status != MOSQ_ERR_SUCCESS)
     {
         c_complain (LOG_ERR,
@@ -142,9 +147,11 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
         */
         conf->connected = false;
 
+        pthread_mutex_unlock (&conf->lock);
         return (-1);
     }
 
+    pthread_mutex_unlock (&conf->lock);
     return (0);
 } /* mqtt_publish_message */