c_complain_t complaint_cantpublish;
pthread_mutex_t lock;
};
+typedef struct mqtt_client_conf mqtt_client_conf_t;
static char const *mosquitto_strerror (int code)
{
/*
* Functions
*/
-static int mqtt_reconnect_broker (struct mqtt_client_conf *conf)
+/* must hold conf->lock when calling. */
+static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
{
int status;
if (conf->connected)
return (0);
- pthread_mutex_lock (&conf->lock);
-
status = mosquitto_reconnect (conf->mosq);
-
- if (status != MOSQ_ERR_SUCCESS) {
+ if (status != MOSQ_ERR_SUCCESS)
+ {
ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
(status == MOSQ_ERR_ERRNO ?
strerror(errno) : mosquitto_strerror (status)));
- pthread_mutex_unlock (&conf->lock);
return (-1);
}
"mqtt plugin: successfully reconnected to broker \"%s:%d\"",
conf->host, conf->port);
- pthread_mutex_unlock (&conf->lock);
-
return (0);
} /* mqtt_reconnect_broker */
-static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic,
+static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
void const *payload, size_t payload_len)
{
char errbuf[1024];
int status;
+ pthread_mutex_lock (&conf->lock);
+
+ status = mqtt_reconnect_broker (conf);
+ if (status != 0) {
+ pthread_mutex_unlock (&conf->lock);
+ ERROR ("mqtt plugin: unable to reconnect to broker");
+ return (status);
+ }
+
status = mosquitto_publish(conf->mosq,
/* message id */ NULL,
topic,
payload,
/* qos */ 0,
/* retain */ false);
-
if (status != MOSQ_ERR_SUCCESS)
{
c_complain (LOG_ERR,
*/
conf->connected = false;
+ pthread_mutex_unlock (&conf->lock);
return (-1);
}
+ pthread_mutex_unlock (&conf->lock);
return (0);
} /* mqtt_publish_message */