From 1f2b236c9aa0aec7c5ca9d69cc36d9f7742578e3 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 21 Nov 2014 17:41:17 +0100 Subject: [PATCH] mqtt plugin: Add proof-of-concept subscriber code. Publisher and subscriber should now be able to send metrics to one another. --- src/mqtt.c | 305 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 286 insertions(+), 19 deletions(-) diff --git a/src/mqtt.c b/src/mqtt.c index bfa2fdb9..7a7ba59d 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -39,12 +39,14 @@ #define MQTT_DEFAULT_HOST "localhost" #define MQTT_DEFAULT_PORT 1883 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd" +#define MQTT_DEFAULT_TOPIC "collectd/#" /* * Data types */ struct mqtt_client_conf { + _Bool publish; char *name; struct mosquitto *mosq; @@ -52,20 +54,33 @@ struct mqtt_client_conf char *host; int port; + char *client_id; char *username; char *password; + int qos; - char *client_id; + /* For publishing */ char *topic_prefix; _Bool store_rates; _Bool retain; - int qos; + + /* For subscribing */ + pthread_t thread; + _Bool loop; + char *topic; + _Bool clean_session; c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; typedef struct mqtt_client_conf mqtt_client_conf_t; +static mqtt_client_conf_t **subscribers = NULL; +static size_t subscribers_num = 0; + +/* + * Functions + */ static char const *mosquitto_strerror (int code) { switch (code) @@ -108,9 +123,87 @@ static void mqtt_free (mqtt_client_conf_t *conf) sfree (conf); } -/* - * Functions - */ +static char *strip_prefix (char *topic) +{ + size_t num; + size_t i; + + num = 0; + for (i = 0; topic[i] != 0; i++) + if (topic[i] == '/') + num++; + + if (num < 2) + return (NULL); + + while (num > 2) + { + char *tmp = strchr (topic, '/'); + if (tmp == NULL) + return (NULL); + topic = tmp + 1; + num--; + } + + return (topic); +} + +static void on_message (__attribute__((unused)) void *arg, + const struct mosquitto_message *msg) +{ + value_list_t vl = VALUE_LIST_INIT; + data_set_t const *ds; + char *topic; + char *name; + char *payload; + int status; + + if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0)) + return; + + topic = strdup (msg->topic); + name = strip_prefix (topic); + + status = parse_identifier_vl (name, &vl); + if (status != 0) + { + ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic); + sfree (topic); + return; + } + sfree (topic); + + ds = plugin_get_ds (vl.type); + if (ds == NULL) + { + ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type); + return; + } + + vl.values = calloc (ds->ds_num, sizeof (*vl.values)); + if (vl.values == NULL) + { + ERROR ("mqtt plugin: calloc failed."); + return; + } + vl.values_len = ds->ds_num; + + payload = strdup ((void *) msg->payload); + DEBUG ("mqtt plugin: payload = \"%s\"", payload); + status = parse_values (payload, &vl, ds); + if (status != 0) + { + ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload); + sfree (payload); + sfree (vl.values); + return; + } + sfree (payload); + + plugin_dispatch_values (&vl); + sfree (vl.values); +} /* void on_message */ + /* must hold conf->lock when calling. */ static int mqtt_reconnect (mqtt_client_conf_t *conf) { @@ -179,7 +272,7 @@ static int mqtt_connect (mqtt_client_conf_t *conf) } status = mosquitto_connect (conf->mosq, conf->host, conf->port, - /* keepalive = */ 10, /* clean session = */ 1); + /* keepalive = */ 10, /* clean session = */ conf->clean_session); if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; @@ -193,10 +286,68 @@ static int mqtt_connect (mqtt_client_conf_t *conf) return (-1); } + if (!conf->publish) + { + mosquitto_message_callback_set (conf->mosq, on_message); + + status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL, + conf->topic, conf->qos); + if (status != MOSQ_ERR_SUCCESS) + { + ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s", + conf->topic, mosquitto_strerror (status)); + + mosquitto_disconnect (conf->mosq); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + return (-1); + } + } + conf->connected = 1; return (0); } /* mqtt_connect */ +static void *subscribers_thread (void *arg) +{ + mqtt_client_conf_t *conf = arg; + int status; + + conf->loop = 1; + + while (conf->loop) + { + status = mqtt_connect (conf); + if (status != 0) + { + sleep (1); + continue; + } + + /* The documentation says "0" would map to the default (1000ms), but + * that does not work on some versions. */ + status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */); + if (status == MOSQ_ERR_CONN_LOST) + { + conf->connected = 0; + continue; + } + else if (status != MOSQ_ERR_SUCCESS) + { + ERROR ("mqtt plugin: mosquitto_loop failed: %s", + mosquitto_strerror (status)); + mosquitto_destroy (conf->mosq); + conf->mosq = NULL; + conf->connected = 0; + continue; + } + + DEBUG ("mqtt plugin: mosquitto_loop succeeded."); + } /* while (conf->loop) */ + + pthread_exit (0); +} /* void *subscribers_thread */ + static int publish (mqtt_client_conf_t *conf, char const *topic, void const *payload, size_t payload_len) { @@ -301,16 +452,16 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl, * * Host "example.com" * Port 1883 - * Prefix "collectd" * ClientId "collectd" * User "guest" * Password "secret" + * Prefix "collectd" * StoreRates true * Retain false * QoS 0 * */ -static int mqtt_config_broker (oconfig_item_t *ci) +static int mqtt_config_publisher (oconfig_item_t *ci) { mqtt_client_conf_t *conf; user_data_t user_data; @@ -323,6 +474,7 @@ static int mqtt_config_broker (oconfig_item_t *ci) ERROR ("mqtt plugin: malloc failed."); return (-1); } + conf->publish = 1; conf->name = NULL; status = cf_util_get_string (ci, &conf->name); @@ -336,6 +488,7 @@ static int mqtt_config_broker (oconfig_item_t *ci) conf->port = MQTT_DEFAULT_PORT; conf->client_id = NULL; conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX); + C_COMPLAIN_INIT (&conf->complaint_cantpublish); for (i = 0; i < ci->children_num; i++) @@ -351,18 +504,97 @@ static int mqtt_config_broker (oconfig_item_t *ci) else conf->port = tmp; } - else if (strcasecmp ("Prefix", child->key) == 0) - cf_util_get_string (child, &conf->topic_prefix); else if (strcasecmp ("ClientId", child->key) == 0) cf_util_get_string (child, &conf->client_id); else if (strcasecmp ("User", child->key) == 0) cf_util_get_string (child, &conf->username); else if (strcasecmp ("Password", child->key) == 0) cf_util_get_string (child, &conf->password); + else if (strcasecmp ("QoS", child->key) == 0) + { + int tmp = -1; + status = cf_util_get_int (child, &tmp); + if ((status != 0) || (tmp < 0) || (tmp > 2)) + ERROR ("mqtt plugin: Not a valid QoS setting."); + else + conf->qos = tmp; + } + else if (strcasecmp ("Prefix", child->key) == 0) + cf_util_get_string (child, &conf->topic_prefix); else if (strcasecmp ("StoreRates", child->key) == 0) cf_util_get_boolean (child, &conf->store_rates); else if (strcasecmp ("Retain", child->key) == 0) cf_util_get_boolean (child, &conf->retain); + else + ERROR ("mqtt plugin: Unknown config option: %s", child->key); + } + + memset (&user_data, 0, sizeof (user_data)); + user_data.data = conf; + + plugin_register_write ("mqtt", mqtt_write, &user_data); + return (0); +} /* mqtt_config_publisher */ + +/* + * + * Host "example.com" + * Port 1883 + * ClientId "collectd" + * User "guest" + * Password "secret" + * Topic "collectd/#" + * + */ +static int mqtt_config_subscriber (oconfig_item_t *ci) +{ + mqtt_client_conf_t **tmp; + mqtt_client_conf_t *conf; + int status; + int i; + + conf = calloc (1, sizeof (*conf)); + if (conf == NULL) + { + ERROR ("mqtt plugin: malloc failed."); + return (-1); + } + conf->publish = 0; + + conf->name = NULL; + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + mqtt_free (conf); + return (status); + } + + conf->host = strdup (MQTT_DEFAULT_HOST); + conf->port = MQTT_DEFAULT_PORT; + conf->client_id = NULL; + conf->topic = strdup (MQTT_DEFAULT_TOPIC); + + C_COMPLAIN_INIT (&conf->complaint_cantpublish); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("Host", child->key) == 0) + cf_util_get_string (child, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) + { + int tmp = cf_util_get_port_number (child); + if (tmp < 0) + ERROR ("mqtt plugin: Invalid port number."); + else + conf->port = tmp; + } + else if (strcasecmp ("ClientId", child->key) == 0) + cf_util_get_string (child, &conf->client_id); + else if (strcasecmp ("User", child->key) == 0) + cf_util_get_string (child, &conf->username); + else if (strcasecmp ("Password", child->key) == 0) + cf_util_get_string (child, &conf->password); else if (strcasecmp ("QoS", child->key) == 0) { int tmp = -1; @@ -372,25 +604,36 @@ static int mqtt_config_broker (oconfig_item_t *ci) else conf->qos = tmp; } + else if (strcasecmp ("Topic", child->key) == 0) + cf_util_get_string (child, &conf->topic); + else if (strcasecmp ("CleanSession", child->key) == 0) + cf_util_get_boolean (child, &conf->clean_session); else ERROR ("mqtt plugin: Unknown config option: %s", child->key); } - memset (&user_data, 0, sizeof (user_data)); - user_data.data = conf; - - DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"", - conf->host, conf->port); + tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num); + if (tmp == NULL) + { + ERROR ("mqtt plugin: realloc failed."); + mqtt_free (conf); + return (-1); + } + subscribers = tmp; + subscribers[subscribers_num] = conf; + subscribers_num++; - plugin_register_write ("mqtt", mqtt_write, &user_data); return (0); -} /* mqtt_config_broker */ +} /* mqtt_config_subscriber */ /* * * * # ... * + * + * # ... + * * */ static int mqtt_config (oconfig_item_t *ci) @@ -402,7 +645,9 @@ static int mqtt_config (oconfig_item_t *ci) oconfig_item_t *child = ci->children + i; if (strcasecmp ("Publish", child->key) == 0) - mqtt_config_broker (child); + mqtt_config_publisher (child); + else if (strcasecmp ("Subscribe", child->key) == 0) + mqtt_config_subscriber (child); else ERROR ("mqtt plugin: Unknown config option: %s", child->key); } @@ -412,7 +657,29 @@ static int mqtt_config (oconfig_item_t *ci) static int mqtt_init (void) { - mosquitto_lib_init(); + size_t i; + + mosquitto_lib_init (); + + for (i = 0; i < subscribers_num; i++) + { + int status; + + if (subscribers[i]->loop) + continue; + + status = plugin_thread_create (&subscribers[i]->thread, + /* attrs = */ NULL, + /* func = */ subscribers_thread, + /* args = */ subscribers[i]); + if (status != 0) + { + char errbuf[1024]; + ERROR ("mqtt plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + continue; + } + } return (0); } /* mqtt_init */ -- 2.11.0