X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fmqtt.c;h=ef46e529375c9d3171562f5c1332c9541aa19dbb;hb=677f1d67d3ff255b1a28c2fbb5861ef50c1b2d3a;hp=3bed0a5e32e537b9898c4bb7cb3b63085f529ce7;hpb=ed93534ffd9b64f72fc4d9dcff866191cddf3de6;p=collectd.git diff --git a/src/mqtt.c b/src/mqtt.c index 3bed0a5e..ef46e529 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -55,26 +55,49 @@ struct mqtt_client_conf c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; +typedef struct mqtt_client_conf mqtt_client_conf_t; + +static char const *mosquitto_strerror (int code) +{ + switch (code) + { + case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS"; + case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM"; + case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL"; + case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL"; + case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN"; + case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED"; + case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND"; + case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST"; + case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL"; + case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE"; + case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED"; + case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH"; + case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED"; + case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN"; + case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO"; + } + + return "UNKNOWN ERROR CODE"; +} /* * Functions */ -static int mqtt_reconnect_broker (struct mqtt_client_conf *conf) +/* must hold conf->lock when calling. */ +static int mqtt_reconnect_broker (mqtt_client_conf_t *conf) { int status; if (conf->connected) return (0); - pthread_mutex_lock (&conf->lock); - status = mosquitto_reconnect (conf->mosq); - - if (status != MOSQ_ERR_SUCCESS) { + if (status != MOSQ_ERR_SUCCESS) + { ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s", (status == MOSQ_ERR_ERRNO ? strerror(errno) : mosquitto_strerror (status))); - pthread_mutex_unlock (&conf->lock); return (-1); } @@ -85,17 +108,24 @@ static int mqtt_reconnect_broker (struct mqtt_client_conf *conf) "mqtt plugin: successfully reconnected to broker \"%s:%d\"", conf->host, conf->port); - pthread_mutex_unlock (&conf->lock); - return (0); } /* mqtt_reconnect_broker */ -static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, - char *payload, size_t payload_len) +static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic, + void const *payload, size_t payload_len) { char errbuf[1024]; int status; + pthread_mutex_lock (&conf->lock); + + status = mqtt_reconnect_broker (conf); + if (status != 0) { + pthread_mutex_unlock (&conf->lock); + ERROR ("mqtt plugin: unable to reconnect to broker"); + return (status); + } + status = mosquitto_publish(conf->mosq, /* message id */ NULL, topic, @@ -103,7 +133,6 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, payload, /* qos */ 0, /* retain */ false); - if (status != MOSQ_ERR_SUCCESS) { c_complain (LOG_ERR, @@ -118,183 +147,67 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, */ conf->connected = false; + pthread_mutex_unlock (&conf->lock); return (-1); } + pthread_mutex_unlock (&conf->lock); return (0); } /* mqtt_publish_message */ -static int mqtt_format_metric_value (char *buf, size_t buf_len, - const data_set_t *data_set, const value_list_t *vl, int ds_num) -{ - gauge_t *rates = NULL; - gauge_t *value = NULL; - size_t metric_value_len; - int status = 0; - - memset (buf, 0, buf_len); - - if (data_set->ds[ds_num].type == DS_TYPE_GAUGE) - value = &vl->values[ds_num].gauge; - else { - rates = uc_get_rate (data_set, vl); - value = &rates[ds_num]; - } - - metric_value_len = ssnprintf (buf, buf_len, "%f", *value); - - if (metric_value_len >= buf_len) - return (-ENOMEM); - - if (rates) - sfree (rates); - - return (status); -} /* mqtt_format_metric_value */ - -static int mqtt_format_message_topic (char *buf, size_t buf_len, - char const *prefix, const value_list_t *vl, const char *ds_name) -{ - size_t topic_buf_len; - - memset (buf, 0, buf_len); - - /* - MQTT message topic format: - [/]////// - */ - topic_buf_len = (size_t) ssnprintf (buf, buf_len, - "%s/%s/%s/%s/%s/%s/%s", - prefix, - vl->host, - vl->plugin, - vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)", - vl->type, - vl->type_instance[0] != '\0' ? vl->type_instance : "(null)", - ds_name); - - if (topic_buf_len >= buf_len) - { - ERROR ("mqtt_format_message_topic: topic buffer too small: " - "Need %zu bytes.", topic_buf_len + 1); - return (-ENOMEM); - } - - return (0); -} /* mqtt_format_message_topic */ - -static int mqtt_format_payload (char *buf, size_t buf_len, - const data_set_t *data_set, const value_list_t *vl, int ds_num) +static int format_topic (char *buf, size_t buf_len, + data_set_t const *ds, value_list_t const *vl, + mqtt_client_conf_t *conf) { - char metric_path[10 * DATA_MAX_NAME_LEN]; - char metric_value[512]; - size_t payload_buf_len; - int status = 0; + char name[MQTT_MAX_TOPIC_SIZE]; + int status; - memset (buf, 0, buf_len); - - ssnprintf (metric_path, sizeof (metric_path), - "%s.%s%s%s.%s%s%s%s%s", - vl->host, - vl->plugin, - vl->plugin_instance[0] != '\0' ? "." : "", - vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "", - vl->type, - vl->type_instance[0] != '\0' ? "." : "", - vl->type_instance[0] != '\0' ? vl->type_instance : "", - strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "", - strcmp(data_set->ds[ds_num].name, "value") != 0 ? - data_set->ds[ds_num].name : ""); - - status = mqtt_format_metric_value (metric_value, - sizeof (metric_value), - data_set, - vl, - ds_num); + if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0)) + return (FORMAT_VL (buf, buf_len, vl)); + status = FORMAT_VL (name, sizeof (name), vl); if (status != 0) - { - ERROR ("mqtt_format_payload: error with mqtt_format_metric_value"); return (status); - } - - payload_buf_len = (size_t) ssnprintf (buf, buf_len, - "%s %s %u", - metric_path, - metric_value, - (unsigned int) CDTIME_T_TO_TIME_T (vl->time)); - if (payload_buf_len >= buf_len) - { - ERROR ("mqtt_format_payload: payload buffer too small: " - "Need %zu bytes.", payload_buf_len + 1); - return (-ENOMEM); - } + status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name); + if ((status < 0) || (((size_t) status) >= buf_len)) + return (ENOMEM); - return (status); -} /* mqtt_format_payload */ + return (0); +} /* int format_topic */ -static int mqtt_write (const data_set_t *data_set, const value_list_t *vl, +static int mqtt_write (const data_set_t *ds, const value_list_t *vl, user_data_t *user_data) { - struct mqtt_client_conf *conf; - char msg_topic[MQTT_MAX_TOPIC_SIZE]; - char msg_payload[MQTT_MAX_MESSAGE_SIZE]; + mqtt_client_conf_t *conf; + char topic[MQTT_MAX_TOPIC_SIZE]; + char payload[MQTT_MAX_MESSAGE_SIZE]; int status = 0; - int i; + _Bool const store_rates = 0; /* TODO: Config option */ - if (user_data == NULL) + if ((user_data == NULL) || (user_data->data == NULL)) return (EINVAL); - conf = user_data->data; - if (!conf->connected) + status = format_topic (topic, sizeof (topic), ds, vl, conf); { - status = mqtt_reconnect_broker (conf); + ERROR ("mqtt plugin: format_topic failed with status %d.", status); + return (status); + } - if (status != 0) { - ERROR ("plugin mqtt: unable to reconnect to broker"); - return (status); - } + status = format_values (payload, sizeof (payload), + ds, vl, store_rates); + if (status != 0) + { + ERROR ("mqtt plugin: format_values failed with status %d.", status); + return (status); } - for (i = 0; i < data_set->ds_num; i++) + status = publish (conf, topic, payload, sizeof (payload)); + if (status != 0) { - status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic), - conf->topic_prefix, vl, data_set->ds[i].name); - if (status != 0) - { - ERROR ("plugin mqtt: error with mqtt_format_message_topic"); - return (status); - } - - status = mqtt_format_payload (msg_payload, - sizeof (msg_payload), - data_set, - vl, - i); - - if (status != 0) - { - ERROR ("mqtt_write: error with mqtt_format_payload"); - return (status); - } - - status = mqtt_publish_message (conf, - msg_topic, - msg_payload, - sizeof (msg_payload)); - if (status != 0) - { - ERROR ("plugin mqtt: unable to publish message"); - return (status); - } - - DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m " - "published message: topic=%s payload=%s", - (unsigned)pthread_self(), - msg_topic, - msg_payload); + ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status)); + return (status); } return (status); @@ -302,23 +215,17 @@ static int mqtt_write (const data_set_t *data_set, const value_list_t *vl, static int mqtt_config (oconfig_item_t *ci) { - struct mqtt_client_conf *conf; + mqtt_client_conf_t *conf; user_data_t user_data; - char errbuf[1024]; int status; - DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ", - (unsigned)pthread_self()); - - conf = malloc (sizeof (*conf)); + conf = calloc (1, sizeof (*conf)); if (conf == NULL) { - ERROR ("write_mqtt plugin: malloc failed."); + ERROR ("mqtt plugin: malloc failed."); return (-1); } - memset (conf, 0, sizeof (*conf)); - conf->connected = false; conf->host = MQTT_DEFAULT_HOST; conf->port = MQTT_DEFAULT_PORT; @@ -329,18 +236,24 @@ static int mqtt_config (oconfig_item_t *ci) memset (&user_data, 0, sizeof (user_data)); user_data.data = conf; - if ((conf->mosq = mosquitto_new (conf->client_id, true, NULL)) == NULL) { - ERROR ("mqtt_config: mosquitto_new failed"); + conf->mosq = mosquitto_new (conf->client_id, /* user data = */ conf); + if (conf->mosq == NULL) + { + ERROR ("mqtt plugin: mosquitto_new failed"); + free (conf); return (-1); } - status = mosquitto_connect (conf->mosq, conf->host, conf->port, 10); - - if (status != MOSQ_ERR_SUCCESS) { - ERROR ("mqtt_config: mosquitto_connect failed: %s", - (status == MOSQ_ERR_ERRNO ? - sstrerror(errno, errbuf, sizeof (errbuf)) : - mosquitto_strerror (status))); + status = mosquitto_connect (conf->mosq, conf->host, conf->port, + /* keepalive = */ 10, /* clean session = */ 1); + if (status != MOSQ_ERR_SUCCESS) + { + char errbuf[1024]; + ERROR ("mqtt plugin: mosquitto_connect failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror (status)); + free (conf); return (-1); }