2 * collectd - src/connectivity.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
25 * Aneesh Puttur <aputtur at redhat.com>
32 #include "utils_complain.h"
34 #include <asm/types.h>
37 #include <netinet/in.h>
41 #include <sys/socket.h>
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
48 #include <yajl/yajl_common.h>
49 #include <yajl/yajl_gen.h>
50 #if HAVE_YAJL_YAJL_VERSION_H
51 #include <yajl/yajl_version.h>
53 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
54 #define HAVE_YAJL_V2 1
57 #define MYPROTO NETLINK_ROUTE
59 #define CONNECTIVITY_DOMAIN_FIELD "domain"
60 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
61 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
62 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
63 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
64 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
65 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
66 #define CONNECTIVITY_PRIORITY_FIELD "priority"
67 #define CONNECTIVITY_PRIORITY_VALUE "high"
68 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
69 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
70 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
71 #define CONNECTIVITY_SEQUENCE_VALUE "0"
72 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
73 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define CONNECTIVITY_VERSION_FIELD "version"
75 #define CONNECTIVITY_VERSION_VALUE "1.0"
77 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
78 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
79 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
80 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
81 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
82 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
83 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
84 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \
85 "stateChangeFieldsVersion"
86 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
87 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
92 struct interfacelist_s {
98 long long unsigned int timestamp;
100 struct interfacelist_s *next;
102 typedef struct interfacelist_s interfacelist_t;
107 static interfacelist_t *interfacelist_head = NULL;
109 static int connectivity_thread_loop = 0;
110 static int connectivity_thread_error = 0;
111 static pthread_t connectivity_thread_id;
112 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
114 static struct mnl_socket *sock;
115 static int event_id = 0;
117 static const char *config_keys[] = {"Interface"};
118 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
124 static int gen_message_payload(int state, int old_state, const char *interface,
125 long long unsigned int timestamp, char **buf) {
126 const unsigned char *buf2;
129 #if !defined(HAVE_YAJL_V2)
130 yajl_gen_config conf = {};
137 g = yajl_gen_alloc(NULL);
138 yajl_gen_config(g, yajl_gen_beautify, 0);
141 g = yajl_gen_alloc(&conf, NULL);
146 // *** BEGIN common event header ***
148 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
152 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
153 strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
156 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
157 strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
161 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
162 strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
166 event_id = event_id + 1;
167 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
168 char *event_id_str = malloc(event_id_len);
169 snprintf(event_id_str, event_id_len, "%d", event_id);
171 if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
172 yajl_gen_status_ok) {
180 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
181 strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
185 int event_name_len = 0;
186 event_name_len = event_name_len + strlen(interface); // interface name
187 event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
189 event_name_len + 12; // "interface", 2 spaces and null-terminator
190 char *event_name_str = malloc(event_name_len);
191 memset(event_name_str, '\0', event_name_len);
192 snprintf(event_name_str, event_name_len, "interface %s %s", interface,
193 (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
194 : CONNECTIVITY_EVENT_NAME_UP_VALUE));
196 if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
197 yajl_gen_status_ok) {
198 sfree(event_name_str);
202 sfree(event_name_str);
205 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
206 strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
210 int last_epoch_microsec_len =
211 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
212 char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
213 snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
214 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
216 if (yajl_gen_number(g, last_epoch_microsec_str,
217 strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
218 sfree(last_epoch_microsec_str);
222 sfree(last_epoch_microsec_str);
225 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
226 strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
230 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
231 strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
235 // reportingEntityName
236 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
237 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
241 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
242 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
247 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
248 strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
252 if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
253 strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
258 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
259 strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
263 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
267 // startEpochMicrosec
268 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
269 strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
273 int start_epoch_microsec_len =
274 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
275 char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
276 snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
277 (long long unsigned int)timestamp);
279 if (yajl_gen_number(g, start_epoch_microsec_str,
280 strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
281 sfree(start_epoch_microsec_str);
285 sfree(start_epoch_microsec_str);
288 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
289 strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
292 if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
293 strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
296 // *** END common event header ***
298 // *** BEGIN state change fields ***
300 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
301 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
305 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
309 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
310 strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
315 (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
316 : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
319 g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
320 : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
321 new_state_len) != yajl_gen_status_ok)
325 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
326 strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
331 (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
332 : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
335 g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
336 : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
337 old_state_len) != yajl_gen_status_ok)
340 // stateChangeFieldsVersion
341 if (yajl_gen_string(g,
342 (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
343 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
347 if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
348 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
353 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
354 strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
358 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
362 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
365 // *** END state change fields ***
367 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
370 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
373 *buf = malloc(strlen((char *)buf2) + 1);
375 sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
383 ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
387 static int connectivity_link_state(struct nlmsghdr *msg) {
389 struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
391 const char *dev = NULL;
393 pthread_mutex_lock(&connectivity_lock);
397 /* Scan attribute list for device name. */
398 mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
399 if (mnl_attr_get_type(attr) != IFLA_IFNAME)
402 if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
403 ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
406 pthread_mutex_unlock(&connectivity_lock);
410 dev = mnl_attr_get_str(attr);
412 for (il = interfacelist_head; il != NULL; il = il->next)
413 if (strcmp(dev, il->interface) == 0)
417 DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
421 uint32_t prev_status;
423 prev_status = il->status;
424 il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0);
425 il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
427 // If the new status is different than the previous status,
428 // store the previous status and set sent to zero
429 if (il->status != prev_status) {
430 il->prev_status = prev_status;
434 DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
436 ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
439 // no need to loop again, we found the interface name attr
440 // (otherwise the first if-statement in the loop would
441 // have moved us on with 'continue')
445 pthread_mutex_unlock(&connectivity_lock);
450 static int msg_handler(struct nlmsghdr *msg) {
451 switch (msg->nlmsg_type) {
461 connectivity_link_state(msg);
466 ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
473 static int read_event(struct mnl_socket *nl,
474 int (*msg_handler)(struct nlmsghdr *)) {
483 status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
486 /* Socket non-blocking so bail out once we have read everything */
487 if (errno == EWOULDBLOCK || errno == EAGAIN)
490 /* Anything else is an error */
491 ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
497 DEBUG("connectivity plugin: read_event: EOF\n");
500 /* We need to handle more than one message per 'recvmsg' */
501 for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
502 h = NLMSG_NEXT(h, status)) {
504 if (h->nlmsg_type == NLMSG_DONE)
507 /* Message is some kind of error */
508 if (h->nlmsg_type == NLMSG_ERROR) {
509 ERROR("connectivity plugin: read_event: Message is an error\n");
513 /* Call message handler */
515 ret = (*msg_handler)(h);
517 ERROR("connectivity plugin: read_event: Message handler error %d\n",
522 ERROR("connectivity plugin: read_event: Error NULL message handler\n");
530 static void *connectivity_thread(void *arg) /* {{{ */
532 pthread_mutex_lock(&connectivity_lock);
534 while (connectivity_thread_loop > 0) {
537 pthread_mutex_unlock(&connectivity_lock);
539 status = read_event(sock, msg_handler);
541 pthread_mutex_lock(&connectivity_lock);
544 connectivity_thread_error = 1;
548 if (connectivity_thread_loop <= 0)
550 } /* while (connectivity_thread_loop > 0) */
552 pthread_mutex_unlock(&connectivity_lock);
555 } /* }}} void *connectivity_thread */
557 static int start_thread(void) /* {{{ */
561 pthread_mutex_lock(&connectivity_lock);
563 if (connectivity_thread_loop != 0) {
564 pthread_mutex_unlock(&connectivity_lock);
568 connectivity_thread_loop = 1;
569 connectivity_thread_error = 0;
572 sock = mnl_socket_open(NETLINK_ROUTE);
575 "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
576 pthread_mutex_unlock(&connectivity_lock);
580 if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
582 "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
583 pthread_mutex_unlock(&connectivity_lock);
588 status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
590 /* arg = */ (void *)0, "connectivity");
592 connectivity_thread_loop = 0;
593 ERROR("connectivity plugin: Starting thread failed.");
594 pthread_mutex_unlock(&connectivity_lock);
595 mnl_socket_close(sock);
599 pthread_mutex_unlock(&connectivity_lock);
601 } /* }}} int start_thread */
603 static int stop_thread(int shutdown) /* {{{ */
608 mnl_socket_close(sock);
610 pthread_mutex_lock(&connectivity_lock);
612 if (connectivity_thread_loop == 0) {
613 pthread_mutex_unlock(&connectivity_lock);
617 connectivity_thread_loop = 0;
618 pthread_cond_broadcast(&connectivity_cond);
619 pthread_mutex_unlock(&connectivity_lock);
622 // Since the thread is blocking, calling pthread_join
623 // doesn't actually succeed in stopping it. It will stick around
624 // until a NETLINK message is received on the socket (at which
625 // it will realize that "connectivity_thread_loop" is 0 and will
626 // break out of the read loop and be allowed to die). This is
627 // fine when the process isn't supposed to be exiting, but in
628 // the case of a process shutdown, we don't want to have an
629 // idle thread hanging around. Calling pthread_cancel here in
630 // the case of a shutdown is just assures that the thread is
631 // gone and that the process has been fully terminated.
633 DEBUG("connectivity plugin: Canceling thread for process shutdown");
635 status = pthread_cancel(connectivity_thread_id);
638 ERROR("connectivity plugin: Unable to cancel thread: %d", status);
642 status = pthread_join(connectivity_thread_id, /* return = */ NULL);
644 ERROR("connectivity plugin: Stopping thread failed.");
649 pthread_mutex_lock(&connectivity_lock);
650 memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
651 connectivity_thread_error = 0;
652 pthread_mutex_unlock(&connectivity_lock);
654 DEBUG("connectivity plugin: Finished requesting stop of thread");
657 } /* }}} int stop_thread */
659 static int connectivity_init(void) /* {{{ */
661 if (interfacelist_head == NULL) {
662 NOTICE("connectivity plugin: No interfaces have been configured.");
666 return (start_thread());
667 } /* }}} int connectivity_init */
669 static int connectivity_config(const char *key, const char *value) /* {{{ */
671 if (strcasecmp(key, "Interface") == 0) {
675 il = malloc(sizeof(*il));
678 ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
679 sstrerror(errno, errbuf, sizeof(errbuf)));
683 interface = strdup(value);
684 if (interface == NULL) {
687 ERROR("connectivity plugin: strdup failed connectivity_config: %s",
688 sstrerror(errno, errbuf, sizeof(errbuf)));
692 il->interface = interface;
693 il->status = 2; // "unknown"
695 il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
697 il->next = interfacelist_head;
698 interfacelist_head = il;
705 } /* }}} int connectivity_config */
707 static void connectivity_dispatch_notification(
708 const char *interface, const char *type, /* {{{ */
709 gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
712 NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
715 n.severity = NOTIF_OKAY;
718 gethostname(hostname, sizeof(hostname));
720 sstrncpy(n.host, hostname, sizeof(n.host));
721 sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
722 sstrncpy(n.type, "gauge", sizeof(n.type));
723 sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
725 gen_message_payload(value, old_value, interface, timestamp, &buf);
727 notification_meta_t *m = calloc(1, sizeof(*m));
732 ERROR("connectivity plugin: unable to allocate metadata: %s",
733 sstrerror(errno, errbuf, sizeof(errbuf)));
737 sstrncpy(m->name, "ves", sizeof(m->name));
738 m->nm_value.nm_string = sstrdup(buf);
739 m->type = NM_TYPE_STRING;
742 DEBUG("connectivity plugin: notification message: %s",
743 n.meta->nm_value.nm_string);
745 DEBUG("connectivity plugin: dispatching state %d for interface %s",
746 (int)value, interface);
748 plugin_dispatch_notification(&n);
749 plugin_notification_meta_free(n.meta);
751 // malloc'd in gen_message_payload
756 static int connectivity_read(void) /* {{{ */
758 if (connectivity_thread_error != 0) {
759 ERROR("connectivity plugin: The interface thread had a problem. Restarting "
764 for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) {
765 il->status = 2; // signifies "unknown"
773 } /* if (connectivity_thread_error != 0) */
775 for (interfacelist_t *il = interfacelist_head; il != NULL;
776 il = il->next) /* {{{ */
779 uint32_t prev_status;
782 pthread_mutex_lock(&connectivity_lock);
785 prev_status = il->prev_status;
788 if (status != prev_status && sent == 0) {
789 connectivity_dispatch_notification(il->interface, "gauge", status,
790 prev_status, il->timestamp);
794 pthread_mutex_unlock(&connectivity_lock);
795 } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */
798 } /* }}} int connectivity_read */
800 static int connectivity_shutdown(void) /* {{{ */
804 DEBUG("connectivity plugin: Shutting down thread.");
805 if (stop_thread(1) < 0)
808 il = interfacelist_head;
810 interfacelist_t *il_next;
814 sfree(il->interface);
821 } /* }}} int connectivity_shutdown */
823 void module_register(void) {
824 plugin_register_config("connectivity", connectivity_config, config_keys,
826 plugin_register_init("connectivity", connectivity_init);
827 plugin_register_read("connectivity", connectivity_read);
828 plugin_register_shutdown("connectivity", connectivity_shutdown);
829 } /* void module_register */