7f785dc86b8a89249141ea37a3866e45cd23978f
[collectd.git] / src / mqtt.c
1 /**
2  * collectd - src/mqtt.c
3  * Copyright (C) 2014       Marc Falzon
4  * Copyright (C) 2014,2015  Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Marc Falzon <marc at baha dot mu>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
30
31
32 #include "collectd.h"
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37
38 #include <pthread.h>
39
40 #include <mosquitto.h>
41
42 #define MQTT_MAX_TOPIC_SIZE         1024
43 #define MQTT_MAX_MESSAGE_SIZE       MQTT_MAX_TOPIC_SIZE + 1024
44 #define MQTT_DEFAULT_HOST           "localhost"
45 #define MQTT_DEFAULT_PORT           1883
46 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
47 #define MQTT_DEFAULT_TOPIC          "collectd/#"
48
49 /*
50  * Data types
51  */
52 struct mqtt_client_conf
53 {
54     _Bool               publish;
55     char               *name;
56
57     struct mosquitto   *mosq;
58     _Bool               connected;
59
60     char               *host;
61     int                 port;
62     char               *client_id;
63     char               *username;
64     char               *password;
65     int                 qos;
66
67     /* For publishing */
68     char               *topic_prefix;
69     _Bool               store_rates;
70     _Bool               retain;
71
72     /* For subscribing */
73     pthread_t           thread;
74     _Bool               loop;
75     char               *topic;
76     _Bool               clean_session;
77
78     c_complain_t        complaint_cantpublish;
79     pthread_mutex_t     lock;
80 };
81 typedef struct mqtt_client_conf mqtt_client_conf_t;
82
83 static mqtt_client_conf_t **subscribers = NULL;
84 static size_t subscribers_num = 0;
85
86 /*
87  * Functions
88  */
89 static char const *mosquitto_strerror (int code)
90 {
91     switch (code)
92     {
93         case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
94         case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
95         case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
96         case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
97         case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
98         case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
99         case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
100         case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
101         case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
102         case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
103         case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
104         case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
105         case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
106         case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
107         case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
108     }
109
110     return "UNKNOWN ERROR CODE";
111 }
112
113 static void mqtt_free (mqtt_client_conf_t *conf)
114 {
115     if (conf == NULL)
116         return;
117
118     if (conf->connected)
119         (void) mosquitto_disconnect (conf->mosq);
120     conf->connected = 0;
121     (void) mosquitto_destroy (conf->mosq);
122
123     sfree (conf->host);
124     sfree (conf->username);
125     sfree (conf->password);
126     sfree (conf->client_id);
127     sfree (conf->topic_prefix);
128     sfree (conf);
129 }
130
131 static char *strip_prefix (char *topic)
132 {
133     size_t num;
134     size_t i;
135
136     num = 0;
137     for (i = 0; topic[i] != 0; i++)
138         if (topic[i] == '/')
139             num++;
140
141     if (num < 2)
142         return (NULL);
143
144     while (num > 2)
145     {
146         char *tmp = strchr (topic, '/');
147         if (tmp == NULL)
148             return (NULL);
149         topic = tmp + 1;
150         num--;
151     }
152
153     return (topic);
154 }
155
156 static void on_message (__attribute__((unused)) void *arg,
157         const struct mosquitto_message *msg)
158 {
159     value_list_t vl = VALUE_LIST_INIT;
160     data_set_t const *ds;
161     char *topic;
162     char *name;
163     char *payload;
164     int status;
165
166     if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
167         return;
168
169     topic = strdup (msg->topic);
170     name = strip_prefix (topic);
171
172     status = parse_identifier_vl (name, &vl);
173     if (status != 0)
174     {
175         ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
176         sfree (topic);
177         return;
178     }
179     sfree (topic);
180
181     ds = plugin_get_ds (vl.type);
182     if (ds == NULL)
183     {
184         ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
185         return;
186     }
187
188     vl.values = calloc (ds->ds_num, sizeof (*vl.values));
189     if (vl.values == NULL)
190     {
191         ERROR ("mqtt plugin: calloc failed.");
192         return;
193     }
194     vl.values_len = ds->ds_num;
195
196     payload = strdup ((void *) msg->payload);
197     DEBUG ("mqtt plugin: payload = \"%s\"", payload);
198     status = parse_values (payload, &vl, ds);
199     if (status != 0)
200     {
201         ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
202         sfree (payload);
203         sfree (vl.values);
204         return;
205     }
206     sfree (payload);
207
208     plugin_dispatch_values (&vl);
209     sfree (vl.values);
210 } /* void on_message */
211
212 /* must hold conf->lock when calling. */
213 static int mqtt_reconnect (mqtt_client_conf_t *conf)
214 {
215     int status;
216
217     if (conf->connected)
218         return (0);
219
220     status = mosquitto_reconnect (conf->mosq);
221     if (status != MOSQ_ERR_SUCCESS)
222     {
223         char errbuf[1024];
224         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
225                 (status == MOSQ_ERR_ERRNO)
226                 ? sstrerror(errno, errbuf, sizeof (errbuf))
227                 : mosquitto_strerror (status));
228         return (-1);
229     }
230
231     conf->connected = 1;
232
233     c_release (LOG_INFO,
234             &conf->complaint_cantpublish,
235             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
236             conf->host, conf->port);
237
238     return (0);
239 } /* mqtt_reconnect */
240
241 /* must hold conf->lock when calling. */
242 static int mqtt_connect (mqtt_client_conf_t *conf)
243 {
244     char const *client_id;
245     int status;
246
247     if (conf->mosq != NULL)
248         return mqtt_reconnect (conf);
249
250     if (conf->client_id)
251         client_id = conf->client_id;
252     else
253         client_id = hostname_g;
254
255     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
256     if (conf->mosq == NULL)
257     {
258         ERROR ("mqtt plugin: mosquitto_new failed");
259         return (-1);
260     }
261
262     if (conf->username && conf->password)
263     {
264         status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
265         if (status != MOSQ_ERR_SUCCESS)
266         {
267             char errbuf[1024];
268             ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
269                     (status == MOSQ_ERR_ERRNO)
270                     ? sstrerror (errno, errbuf, sizeof (errbuf))
271                     : mosquitto_strerror (status));
272
273             mosquitto_destroy (conf->mosq);
274             conf->mosq = NULL;
275             return (-1);
276         }
277     }
278
279     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
280             /* keepalive = */ 10, /* clean session = */ conf->clean_session);
281     if (status != MOSQ_ERR_SUCCESS)
282     {
283         char errbuf[1024];
284         ERROR ("mqtt plugin: mosquitto_connect failed: %s",
285                 (status == MOSQ_ERR_ERRNO)
286                 ? sstrerror (errno, errbuf, sizeof (errbuf))
287                 : mosquitto_strerror (status));
288
289         mosquitto_destroy (conf->mosq);
290         conf->mosq = NULL;
291         return (-1);
292     }
293
294     if (!conf->publish)
295     {
296         mosquitto_message_callback_set (conf->mosq, on_message);
297
298         status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
299                 conf->topic, conf->qos);
300         if (status != MOSQ_ERR_SUCCESS)
301         {
302             ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
303                     conf->topic, mosquitto_strerror (status));
304
305             mosquitto_disconnect (conf->mosq);
306             mosquitto_destroy (conf->mosq);
307             conf->mosq = NULL;
308             return (-1);
309         }
310     }
311
312     conf->connected = 1;
313     return (0);
314 } /* mqtt_connect */
315
316 static void *subscribers_thread (void *arg)
317 {
318     mqtt_client_conf_t *conf = arg;
319     int status;
320
321     conf->loop = 1;
322
323     while (conf->loop)
324     {
325         status = mqtt_connect (conf);
326         if (status != 0)
327         {
328             sleep (1);
329             continue;
330         }
331
332         /* The documentation says "0" would map to the default (1000ms), but
333          * that does not work on some versions. */
334         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
335         if (status == MOSQ_ERR_CONN_LOST)
336         {
337             conf->connected = 0;
338             continue;
339         }
340         else if (status != MOSQ_ERR_SUCCESS)
341         {
342             ERROR ("mqtt plugin: mosquitto_loop failed: %s",
343                     mosquitto_strerror (status));
344             mosquitto_destroy (conf->mosq);
345             conf->mosq = NULL;
346             conf->connected = 0;
347             continue;
348         }
349
350         DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
351     } /* while (conf->loop) */
352
353     pthread_exit (0);
354 } /* void *subscribers_thread */
355
356 static int publish (mqtt_client_conf_t *conf, char const *topic,
357     void const *payload, size_t payload_len)
358 {
359     int status;
360
361     pthread_mutex_lock (&conf->lock);
362
363     status = mqtt_connect (conf);
364     if (status != 0) {
365         pthread_mutex_unlock (&conf->lock);
366         ERROR ("mqtt plugin: unable to reconnect to broker");
367         return (status);
368     }
369
370     status = mosquitto_publish(conf->mosq,
371             /* message id */ NULL,
372             topic,
373             (uint32_t) payload_len, payload,
374             /* qos */ conf->qos,
375             /* retain */ conf->retain);
376     if (status != MOSQ_ERR_SUCCESS)
377     {
378         char errbuf[1024];
379         c_complain (LOG_ERR,
380                 &conf->complaint_cantpublish,
381                 "plugin mqtt: mosquitto_publish failed: %s",
382                 status == MOSQ_ERR_ERRNO ?
383                 sstrerror(errno, errbuf, sizeof (errbuf)) :
384                 mosquitto_strerror(status));
385         /* Mark our connection "down" regardless of the error as a safety
386          * measure; we will try to reconnect the next time we have to publish a
387          * message */
388         conf->connected = 0;
389
390         pthread_mutex_unlock (&conf->lock);
391         return (-1);
392     }
393
394     pthread_mutex_unlock (&conf->lock);
395     return (0);
396 } /* int publish */
397
398 static int format_topic (char *buf, size_t buf_len,
399     data_set_t const *ds, value_list_t const *vl,
400     mqtt_client_conf_t *conf)
401 {
402     char name[MQTT_MAX_TOPIC_SIZE];
403     int status;
404
405     if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
406         return (FORMAT_VL (buf, buf_len, vl));
407
408     status = FORMAT_VL (name, sizeof (name), vl);
409     if (status != 0)
410         return (status);
411
412     status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
413     if ((status < 0) || (((size_t) status) >= buf_len))
414         return (ENOMEM);
415
416     return (0);
417 } /* int format_topic */
418
419 static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
420     user_data_t *user_data)
421 {
422     mqtt_client_conf_t *conf;
423     char topic[MQTT_MAX_TOPIC_SIZE];
424     char payload[MQTT_MAX_MESSAGE_SIZE];
425     int status = 0;
426
427     if ((user_data == NULL) || (user_data->data == NULL))
428         return (EINVAL);
429     conf = user_data->data;
430
431     status = format_topic (topic, sizeof (topic), ds, vl, conf);
432     if (status != 0)
433     {
434         ERROR ("mqtt plugin: format_topic failed with status %d.", status);
435         return (status);
436     }
437
438     status = format_values (payload, sizeof (payload),
439             ds, vl, conf->store_rates);
440     if (status != 0)
441     {
442         ERROR ("mqtt plugin: format_values failed with status %d.", status);
443         return (status);
444     }
445
446     status = publish (conf, topic, payload, strlen (payload) + 1);
447     if (status != 0)
448     {
449         ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
450         return (status);
451     }
452
453     return (status);
454 } /* mqtt_write */
455
456 /*
457  * <Publish "name">
458  *   Host "example.com"
459  *   Port 1883
460  *   ClientId "collectd"
461  *   User "guest"
462  *   Password "secret"
463  *   Prefix "collectd"
464  *   StoreRates true
465  *   Retain false
466  *   QoS 0
467  * </Publish>
468  */
469 static int mqtt_config_publisher (oconfig_item_t *ci)
470 {
471     mqtt_client_conf_t *conf;
472     user_data_t user_data;
473     int status;
474     int i;
475
476     conf = calloc (1, sizeof (*conf));
477     if (conf == NULL)
478     {
479         ERROR ("mqtt plugin: malloc failed.");
480         return (-1);
481     }
482     conf->publish = 1;
483
484     conf->name = NULL;
485     status = cf_util_get_string (ci, &conf->name);
486     if (status != 0)
487     {
488         mqtt_free (conf);
489         return (status);
490     }
491
492     conf->host = strdup (MQTT_DEFAULT_HOST);
493     conf->port = MQTT_DEFAULT_PORT;
494     conf->client_id = NULL;
495     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
496
497     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
498
499     for (i = 0; i < ci->children_num; i++)
500     {
501         oconfig_item_t *child = ci->children + i;
502         if (strcasecmp ("Host", child->key) == 0)
503             cf_util_get_string (child, &conf->host);
504         else if (strcasecmp ("Port", child->key) == 0)
505         {
506             int tmp = cf_util_get_port_number (child);
507             if (tmp < 0)
508                 ERROR ("mqtt plugin: Invalid port number.");
509             else
510                 conf->port = tmp;
511         }
512         else if (strcasecmp ("ClientId", child->key) == 0)
513             cf_util_get_string (child, &conf->client_id);
514         else if (strcasecmp ("User", child->key) == 0)
515             cf_util_get_string (child, &conf->username);
516         else if (strcasecmp ("Password", child->key) == 0)
517             cf_util_get_string (child, &conf->password);
518         else if (strcasecmp ("QoS", child->key) == 0)
519         {
520             int tmp = -1;
521             status = cf_util_get_int (child, &tmp);
522             if ((status != 0) || (tmp < 0) || (tmp > 2))
523                 ERROR ("mqtt plugin: Not a valid QoS setting.");
524             else
525                 conf->qos = tmp;
526         }
527         else if (strcasecmp ("Prefix", child->key) == 0)
528             cf_util_get_string (child, &conf->topic_prefix);
529         else if (strcasecmp ("StoreRates", child->key) == 0)
530             cf_util_get_boolean (child, &conf->store_rates);
531         else if (strcasecmp ("Retain", child->key) == 0)
532             cf_util_get_boolean (child, &conf->retain);
533         else
534             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
535     }
536
537     memset (&user_data, 0, sizeof (user_data));
538     user_data.data = conf;
539
540     plugin_register_write ("mqtt", mqtt_write, &user_data);
541     return (0);
542 } /* mqtt_config_publisher */
543
544 /*
545  * <Subscribe "name">
546  *   Host "example.com"
547  *   Port 1883
548  *   ClientId "collectd"
549  *   User "guest"
550  *   Password "secret"
551  *   Topic "collectd/#"
552  * </Publish>
553  */
554 static int mqtt_config_subscriber (oconfig_item_t *ci)
555 {
556     mqtt_client_conf_t **tmp;
557     mqtt_client_conf_t *conf;
558     int status;
559     int i;
560
561     conf = calloc (1, sizeof (*conf));
562     if (conf == NULL)
563     {
564         ERROR ("mqtt plugin: malloc failed.");
565         return (-1);
566     }
567     conf->publish = 0;
568
569     conf->name = NULL;
570     status = cf_util_get_string (ci, &conf->name);
571     if (status != 0)
572     {
573         mqtt_free (conf);
574         return (status);
575     }
576
577     conf->host = strdup (MQTT_DEFAULT_HOST);
578     conf->port = MQTT_DEFAULT_PORT;
579     conf->client_id = NULL;
580     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
581
582     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
583
584     for (i = 0; i < ci->children_num; i++)
585     {
586         oconfig_item_t *child = ci->children + i;
587         if (strcasecmp ("Host", child->key) == 0)
588             cf_util_get_string (child, &conf->host);
589         else if (strcasecmp ("Port", child->key) == 0)
590         {
591             int tmp = cf_util_get_port_number (child);
592             if (tmp < 0)
593                 ERROR ("mqtt plugin: Invalid port number.");
594             else
595                 conf->port = tmp;
596         }
597         else if (strcasecmp ("ClientId", child->key) == 0)
598             cf_util_get_string (child, &conf->client_id);
599         else if (strcasecmp ("User", child->key) == 0)
600             cf_util_get_string (child, &conf->username);
601         else if (strcasecmp ("Password", child->key) == 0)
602             cf_util_get_string (child, &conf->password);
603         else if (strcasecmp ("QoS", child->key) == 0)
604         {
605             int tmp = -1;
606             status = cf_util_get_int (child, &tmp);
607             if ((status != 0) || (tmp < 0) || (tmp > 2))
608                 ERROR ("mqtt plugin: Not a valid QoS setting.");
609             else
610                 conf->qos = tmp;
611         }
612         else if (strcasecmp ("Topic", child->key) == 0)
613             cf_util_get_string (child, &conf->topic);
614         else if (strcasecmp ("CleanSession", child->key) == 0)
615             cf_util_get_boolean (child, &conf->clean_session);
616         else
617             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
618     }
619
620     tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
621     if (tmp == NULL)
622     {
623         ERROR ("mqtt plugin: realloc failed.");
624         mqtt_free (conf);
625         return (-1);
626     }
627     subscribers = tmp;
628     subscribers[subscribers_num] = conf;
629     subscribers_num++;
630
631     return (0);
632 } /* mqtt_config_subscriber */
633
634 /*
635  * <Plugin mqtt>
636  *   <Publish "name">
637  *     # ...
638  *   </Publish>
639  *   <Subscribe "name">
640  *     # ...
641  *   </Subscribe>
642  * </Plugin>
643  */
644 static int mqtt_config (oconfig_item_t *ci)
645 {
646     int i;
647
648     for (i = 0; i < ci->children_num; i++)
649     {
650         oconfig_item_t *child = ci->children + i;
651
652         if (strcasecmp ("Publish", child->key) == 0)
653             mqtt_config_publisher (child);
654         else if (strcasecmp ("Subscribe", child->key) == 0)
655             mqtt_config_subscriber (child);
656         else
657             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
658     }
659
660     return (0);
661 } /* int mqtt_config */
662
663 static int mqtt_init (void)
664 {
665     size_t i;
666
667     mosquitto_lib_init ();
668
669     for (i = 0; i < subscribers_num; i++)
670     {
671         int status;
672
673         if (subscribers[i]->loop)
674             continue;
675
676         status = plugin_thread_create (&subscribers[i]->thread,
677                 /* attrs = */ NULL,
678                 /* func  = */ subscribers_thread,
679                 /* args  = */ subscribers[i]);
680         if (status != 0)
681         {
682             char errbuf[1024];
683             ERROR ("mqtt plugin: pthread_create failed: %s",
684                     sstrerror (errno, errbuf, sizeof (errbuf)));
685             continue;
686         }
687     }
688
689     return (0);
690 } /* mqtt_init */
691
692 void module_register (void)
693 {
694     plugin_register_complex_config ("mqtt", mqtt_config);
695     plugin_register_init ("mqtt", mqtt_init);
696 } /* void module_register */
697
698 /* vim: set sw=4 sts=4 et fdm=marker : */