struct mqtt_client_conf
{
struct mosquitto *mosq;
- bool connected;
+ _Bool connected;
char *host;
int port;
char *client_id;
status = mosquitto_reconnect (conf->mosq);
if (status != MOSQ_ERR_SUCCESS)
{
+ char errbuf[1024];
ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO ?
- strerror(errno) : mosquitto_strerror (status)));
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
return (-1);
}
- conf->connected = true;
+ conf->connected = 1;
c_release (LOG_INFO,
- &conf->complaint_cantpublish,
- "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
- conf->host, conf->port);
+ &conf->complaint_cantpublish,
+ "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
+ conf->host, conf->port);
return (0);
} /* mqtt_reconnect_broker */
-static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
+static int publish (mqtt_client_conf_t *conf, char const *topic,
void const *payload, size_t payload_len)
{
- char errbuf[1024];
+ int const qos = 0; /* TODO: Config option */
int status;
pthread_mutex_lock (&conf->lock);
}
status = mosquitto_publish(conf->mosq,
- /* message id */ NULL,
- topic,
- (int) payload_len,
- payload,
- /* qos */ 0,
- /* retain */ false);
+ /* message id */ NULL,
+ topic,
+ (uint32_t) payload_len, payload,
+ /* qos */ qos,
+ /* retain */ false);
if (status != MOSQ_ERR_SUCCESS)
{
+ char errbuf[1024];
c_complain (LOG_ERR,
- &conf->complaint_cantpublish,
- "plugin mqtt: mosquitto_publish failed: %s",
- status == MOSQ_ERR_ERRNO ?
- sstrerror(errno, errbuf, sizeof (errbuf)) :
+ &conf->complaint_cantpublish,
+ "plugin mqtt: mosquitto_publish failed: %s",
+ status == MOSQ_ERR_ERRNO ?
+ sstrerror(errno, errbuf, sizeof (errbuf)) :
mosquitto_strerror(status));
- /*
- Mark our connection "down" regardless of the error as a safety measure;
- we will try to reconnect the next time we have to publish a message
- */
- conf->connected = false;
+ /* Mark our connection "down" regardless of the error as a safety
+ * measure; we will try to reconnect the next time we have to publish a
+ * message */
+ conf->connected = 0;
pthread_mutex_unlock (&conf->lock);
return (-1);
pthread_mutex_unlock (&conf->lock);
return (0);
-} /* mqtt_publish_message */
+} /* int publish */
static int format_topic (char *buf, size_t buf_len,
data_set_t const *ds, value_list_t const *vl,
conf = user_data->data;
status = format_topic (topic, sizeof (topic), ds, vl, conf);
+ if (status != 0)
{
ERROR ("mqtt plugin: format_topic failed with status %d.", status);
return (status);
return (status);
} /* mqtt_write */
+/*
+ * <Plugin mqtt>
+ * Host "example.com"
+ * Port 1883
+ * Prefix "collectd"
+ * ClientId "collectd"
+ * </Plugin>
+ */
static int mqtt_config (oconfig_item_t *ci)
{
mqtt_client_conf_t *conf;
user_data_t user_data;
int status;
+ int i;
conf = calloc (1, sizeof (*conf));
if (conf == NULL)
return (-1);
}
- conf->connected = false;
- conf->host = MQTT_DEFAULT_HOST;
+ conf->connected = 0;
+ conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
- conf->client_id = MQTT_DEFAULT_CLIENT_ID;
- conf->topic_prefix = MQTT_DEFAULT_TOPIC_PREFIX;
+ conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID);
+ conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ int tmp = cf_util_get_port_number (child);
+ if (tmp < 0)
+ {
+ ERROR ("mqtt plugin: Invalid port number.");
+ continue;
+ }
+ conf->port = tmp;
+ }
+ else if (strcasecmp ("Prefix", child->key) == 0)
+ cf_util_get_string (child, &conf->topic_prefix);
+ else if (strcasecmp ("ClientId", child->key) == 0)
+ cf_util_get_string (child, &conf->client_id);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+ }
+
memset (&user_data, 0, sizeof (user_data));
user_data.data = conf;
DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
conf->host, conf->port);
- conf->connected = true;
+ conf->connected = 1;
plugin_register_write ("mqtt", mqtt_write, &user_data);