#define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
#define MQTT_DEFAULT_HOST "localhost"
#define MQTT_DEFAULT_PORT 1883
-#define MQTT_DEFAULT_CLIENT_ID "collectd"
#define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
/*
* Functions
*/
/* must hold conf->lock when calling. */
-static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
+static int mqtt_reconnect (mqtt_client_conf_t *conf)
{
int status;
conf->host, conf->port);
return (0);
-} /* mqtt_reconnect_broker */
+} /* mqtt_reconnect */
+
+/* must hold conf->lock when calling. */
+static int mqtt_connect (mqtt_client_conf_t *conf)
+{
+ char const *client_id;
+ int status;
+
+ if (conf->mosq != NULL)
+ return mqtt_reconnect (conf);
+
+ if (conf->client_id)
+ client_id = conf->client_id;
+ else
+ client_id = hostname_g;
+
+ conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+ if (conf->mosq == NULL)
+ {
+ ERROR ("mqtt plugin: mosquitto_new failed");
+ return (-1);
+ }
+
+ if (conf->username && conf->password)
+ {
+ status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
+
+ status = mosquitto_connect (conf->mosq, conf->host, conf->port,
+ /* keepalive = */ 10, /* clean session = */ 1);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_connect failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+
+ conf->connected = 1;
+ return (0);
+} /* mqtt_connect */
static int publish (mqtt_client_conf_t *conf, char const *topic,
void const *payload, size_t payload_len)
pthread_mutex_lock (&conf->lock);
- status = mqtt_reconnect_broker (conf);
+ status = mqtt_connect (conf);
if (status != 0) {
pthread_mutex_unlock (&conf->lock);
ERROR ("mqtt plugin: unable to reconnect to broker");
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
- conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID);
+ conf->client_id = NULL;
conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
memset (&user_data, 0, sizeof (user_data));
user_data.data = conf;
- conf->mosq = mosquitto_new (conf->client_id, /* user data = */ conf);
- if (conf->mosq == NULL)
- {
- ERROR ("mqtt plugin: mosquitto_new failed");
- mqtt_free (conf);
- return (-1);
- }
-
- if (conf->username && conf->password)
- {
- status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror (status));
- mqtt_free (conf);
- return (-1);
- }
- }
-
- status = mosquitto_connect (conf->mosq, conf->host, conf->port,
- /* keepalive = */ 10, /* clean session = */ 1);
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- ERROR ("mqtt plugin: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror (status));
- mqtt_free (conf);
- return (-1);
- }
-
DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
conf->host, conf->port);
- conf->connected = 1;
-
plugin_register_write ("mqtt", mqtt_write, &user_data);
-
return (0);
} /* mqtt_config_broker */