/*
* Private data types
*/
+
struct interface_list_s {
char *interface;
/*
* Private variables
*/
+
static ignorelist_t *ignorelist = NULL;
static interface_list_t *interface_list_head = NULL;
static int monitor_all_interfaces = 1;
-static int connectivity_thread_loop = 0;
-static int connectivity_thread_error = 0;
-static pthread_t connectivity_thread_id;
+static int connectivity_netlink_thread_loop = 0;
+static int connectivity_netlink_thread_error = 0;
+static pthread_t connectivity_netlink_thread_id;
+static int connectivity_dequeue_thread_loop = 0;
+static int connectivity_dequeue_thread_error = 0;
+static pthread_t connectivity_dequeue_thread_id;
static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
-static struct mnl_socket *sock;
+// static struct mnl_socket *sock;
+static int nl_sock = -1;
static int event_id = 0;
-static const char *config_keys[] = {"Interface"};
+static const char *config_keys[] = {"Interface", "IgnoreSelected"};
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
/*
+ * Prototype
+ */
+
+static void
+connectivity_dispatch_notification(const char *interface, const char *type,
+ gauge_t value, gauge_t old_value,
+ long long unsigned int timestamp);
+
+/*
* Private functions
*/
return 0;
}
-static int read_event(struct mnl_socket *nl,
- int (*msg_handler)(struct nlmsghdr *)) {
+// static int read_event(struct mnl_socket *nl,
+// int (*msg_handler)(struct nlmsghdr *)) {
+static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
int status;
int ret = 0;
char buf[4096];
struct nlmsghdr *h;
+ int recv_flags = MSG_DONTWAIT;
- if (nl == NULL)
+ // if (nl == NULL)
+ // return ret;
+
+ if (nl == -1)
return ret;
- status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
+ while (42) {
+ pthread_mutex_lock(&connectivity_lock);
- if (status < 0) {
- /* Socket non-blocking so bail out once we have read everything */
- if (errno == EWOULDBLOCK || errno == EAGAIN)
+ if (connectivity_netlink_thread_loop <= 0) {
+ pthread_mutex_unlock(&connectivity_lock);
return ret;
+ }
- /* Anything else is an error */
- ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
- status);
- return status;
- }
+ pthread_mutex_unlock(&connectivity_lock);
- if (status == 0) {
- DEBUG("connectivity plugin: read_event: EOF\n");
- }
+ status = recv(nl, buf, sizeof(buf), recv_flags);
- /* We need to handle more than one message per 'recvmsg' */
- for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
- h = NLMSG_NEXT(h, status)) {
- /* Finish reading */
- if (h->nlmsg_type == NLMSG_DONE)
- return ret;
+ if (status < 0) {
+
+ // If there were no more messages to drain from the socket,
+ // then signal the dequeue thread and allow it to dispatch
+ // any saved interface status changes. Then continue, but
+ // block and wait for new messages
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ pthread_mutex_lock(&connectivity_lock);
+ pthread_cond_signal(&connectivity_cond);
+ pthread_mutex_unlock(&connectivity_lock);
+
+ recv_flags = 0;
+ continue;
+ }
- /* Message is some kind of error */
- if (h->nlmsg_type == NLMSG_ERROR) {
- ERROR("connectivity plugin: read_event: Message is an error\n");
- return -1; // Error
+ /* Anything else is an error */
+ // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
+ // %d\n",
+ // status);
+ ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
+ return status;
}
- /* Call message handler */
- if (msg_handler) {
- ret = (*msg_handler)(h);
- if (ret < 0) {
- ERROR("connectivity plugin: read_event: Message handler error %d\n",
- ret);
+ // Message received successfully, so we'll stop blocking on the
+ // receive call for now (until we get a "would block" error, which
+ // will be handled above)
+ recv_flags = MSG_DONTWAIT;
+
+ if (status == 0) {
+ DEBUG("connectivity plugin: read_event: EOF\n");
+ }
+
+ /* We need to handle more than one message per 'recvmsg' */
+ for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
+ h = NLMSG_NEXT(h, status)) {
+ /* Finish reading */
+ if (h->nlmsg_type == NLMSG_DONE)
return ret;
+
+ /* Message is some kind of error */
+ if (h->nlmsg_type == NLMSG_ERROR) {
+ ERROR("connectivity plugin: read_event: Message is an error\n");
+ return -1; // Error
+ }
+
+ /* Call message handler */
+ if (msg_handler) {
+ ret = (*msg_handler)(h);
+ if (ret < 0) {
+ ERROR("connectivity plugin: read_event: Message handler error %d\n",
+ ret);
+ return ret;
+ }
+ } else {
+ ERROR("connectivity plugin: read_event: Error NULL message handler\n");
+ return -1;
}
- } else {
- ERROR("connectivity plugin: read_event: Error NULL message handler\n");
- return -1;
}
}
return ret;
}
-static void *connectivity_thread(void *arg) /* {{{ */
+static void send_interface_status() {
+ for (interface_list_t *il = interface_list_head; il != NULL;
+ il = il->next) /* {{{ */
+ {
+ uint32_t status;
+ uint32_t prev_status;
+ uint32_t sent;
+
+ status = il->status;
+ prev_status = il->prev_status;
+ sent = il->sent;
+
+ if (status != prev_status && sent == 0) {
+ connectivity_dispatch_notification(il->interface, "gauge", status,
+ prev_status, il->timestamp);
+ il->sent = 1;
+ }
+ } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+}
+
+static int read_interface_status() /* {{{ */
{
pthread_mutex_lock(&connectivity_lock);
- while (connectivity_thread_loop > 0) {
+ // This first attempt is necessary because the netlink thread
+ // might have held the lock while this thread was blocked on
+ // the lock acquisition just above. And while the netlink thread
+ // had the lock, it could have called pthread_cond_singal, which
+ // obviously wouldn't have woken this thread, since this thread
+ // was not yet waiting on the condition signal. So we need to
+ // loop through the interfaces and check if any have changed
+ // status before we wait on the condition signal
+ send_interface_status();
+
+ pthread_cond_wait(&connectivity_cond, &connectivity_lock);
+
+ send_interface_status();
+
+ pthread_mutex_unlock(&connectivity_lock);
+
+ return 0;
+} /* }}} int *read_interface_status */
+
+static void *connectivity_netlink_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_lock);
+
+ while (connectivity_netlink_thread_loop > 0) {
+ int status;
+
+ pthread_mutex_unlock(&connectivity_lock);
+
+ status = read_event(nl_sock, msg_handler);
+
+ pthread_mutex_lock(&connectivity_lock);
+
+ if (status < 0) {
+ connectivity_netlink_thread_error = 1;
+ break;
+ }
+
+ if (connectivity_netlink_thread_loop <= 0)
+ break;
+ } /* while (connectivity_netlink_thread_loop > 0) */
+
+ pthread_mutex_unlock(&connectivity_lock);
+
+ return ((void *)0);
+} /* }}} void *connectivity_netlink_thread */
+
+static void *connectivity_dequeue_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_lock);
+
+ while (connectivity_dequeue_thread_loop > 0) {
int status;
pthread_mutex_unlock(&connectivity_lock);
- status = read_event(sock, msg_handler);
+ status = read_interface_status();
pthread_mutex_lock(&connectivity_lock);
if (status < 0) {
- connectivity_thread_error = 1;
+ connectivity_dequeue_thread_error = 1;
break;
}
- if (connectivity_thread_loop <= 0)
+ if (connectivity_dequeue_thread_loop <= 0)
break;
- } /* while (connectivity_thread_loop > 0) */
+ } /* while (connectivity_dequeue_thread_loop > 0) */
pthread_mutex_unlock(&connectivity_lock);
return ((void *)0);
-} /* }}} void *connectivity_thread */
+} /* }}} void *connectivity_dequeue_thread */
+
+static int nl_connect() {
+ int rc;
+ struct sockaddr_nl sa_nl;
+
+ nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
+ if (nl_sock == -1) {
+ ERROR("connectivity plugin: socket open failed: %d", errno);
+ return -1;
+ }
+
+ sa_nl.nl_family = AF_NETLINK;
+ sa_nl.nl_groups = RTMGRP_LINK;
+ sa_nl.nl_pid = getpid();
+
+ rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+ if (rc == -1) {
+ ERROR("connectivity plugin: socket bind failed: %d", errno);
+ close(nl_sock);
+ return -1;
+ }
+
+ return 0;
+}
-static int start_thread(void) /* {{{ */
+static int start_netlink_thread(void) /* {{{ */
{
int status;
pthread_mutex_lock(&connectivity_lock);
- if (connectivity_thread_loop != 0) {
+ if (connectivity_netlink_thread_loop != 0) {
pthread_mutex_unlock(&connectivity_lock);
return (0);
}
- connectivity_thread_loop = 1;
- connectivity_thread_error = 0;
+ connectivity_netlink_thread_loop = 1;
+ connectivity_netlink_thread_error = 0;
- if (sock == NULL) {
- sock = mnl_socket_open(NETLINK_ROUTE);
- if (sock == NULL) {
- ERROR(
- "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
- pthread_mutex_unlock(&connectivity_lock);
- return (-1);
- }
+ if (nl_sock == -1) {
+ status = nl_connect();
- if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
- ERROR(
- "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
- pthread_mutex_unlock(&connectivity_lock);
- return (1);
- }
+ if (status != 0)
+ return status;
}
- status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
- connectivity_thread,
+ status = plugin_thread_create(&connectivity_netlink_thread_id,
+ /* attr = */ NULL, connectivity_netlink_thread,
/* arg = */ (void *)0, "connectivity");
if (status != 0) {
- connectivity_thread_loop = 0;
+ connectivity_netlink_thread_loop = 0;
ERROR("connectivity plugin: Starting thread failed.");
pthread_mutex_unlock(&connectivity_lock);
- mnl_socket_close(sock);
+
+ int status2 = close(nl_sock);
+
+ if (status2 != 0) {
+ ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+ status2, strerror(errno));
+ } else
+ nl_sock = -1;
+
return (-1);
}
pthread_mutex_unlock(&connectivity_lock);
- return (0);
-} /* }}} int start_thread */
-static int stop_thread(int shutdown) /* {{{ */
+ return status;
+}
+
+static int start_dequeue_thread(void) /* {{{ */
{
int status;
- if (sock != NULL)
- mnl_socket_close(sock);
+ pthread_mutex_lock(&connectivity_lock);
+
+ if (connectivity_dequeue_thread_loop != 0) {
+ pthread_mutex_unlock(&connectivity_lock);
+ return (0);
+ }
+
+ connectivity_dequeue_thread_loop = 1;
+ connectivity_dequeue_thread_error = 0;
+
+ status = plugin_thread_create(&connectivity_dequeue_thread_id,
+ /* attr = */ NULL, connectivity_dequeue_thread,
+ /* arg = */ (void *)0, "connectivity");
+ if (status != 0) {
+ connectivity_dequeue_thread_loop = 0;
+ ERROR("connectivity plugin: Starting dequeue thread failed.");
+ pthread_mutex_unlock(&connectivity_lock);
+ return (-1);
+ }
+
+ pthread_mutex_unlock(&connectivity_lock);
+
+ return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+ int status, status2;
+
+ status = start_netlink_thread();
+ status2 = start_dequeue_thread();
+
+ if (status < 0)
+ return status;
+ else
+ return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+ int status;
+
+ if (nl_sock != -1) {
+ status = close(nl_sock);
+ if (status != 0) {
+ ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+ status, strerror(errno));
+ return (-1);
+ } else
+ nl_sock = -1;
+ }
pthread_mutex_lock(&connectivity_lock);
- if (connectivity_thread_loop == 0) {
+ if (connectivity_netlink_thread_loop == 0) {
pthread_mutex_unlock(&connectivity_lock);
return (-1);
}
- connectivity_thread_loop = 0;
+ connectivity_netlink_thread_loop = 0;
pthread_cond_broadcast(&connectivity_cond);
pthread_mutex_unlock(&connectivity_lock);
// Since the thread is blocking, calling pthread_join
// doesn't actually succeed in stopping it. It will stick around
// until a NETLINK message is received on the socket (at which
- // it will realize that "connectivity_thread_loop" is 0 and will
+ // it will realize that "connectivity_netlink_thread_loop" is 0 and will
// break out of the read loop and be allowed to die). This is
// fine when the process isn't supposed to be exiting, but in
// the case of a process shutdown, we don't want to have an
// the case of a shutdown is just assures that the thread is
// gone and that the process has been fully terminated.
- DEBUG("connectivity plugin: Canceling thread for process shutdown");
+ DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(connectivity_thread_id);
+ status = pthread_cancel(connectivity_netlink_thread_id);
- if (status != 0) {
- ERROR("connectivity plugin: Unable to cancel thread: %d", status);
+ if (status != 0 && status != ESRCH) {
+ ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
status = -1;
- }
+ } else
+ status = 0;
} else {
- status = pthread_join(connectivity_thread_id, /* return = */ NULL);
- if (status != 0) {
- ERROR("connectivity plugin: Stopping thread failed.");
+ status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+ if (status != 0 && status != ESRCH) {
+ ERROR("connectivity plugin: Stopping netlink thread failed.");
status = -1;
- }
+ } else
+ return 0;
+ }
+
+ pthread_mutex_lock(&connectivity_lock);
+ memset(&connectivity_netlink_thread_id, 0,
+ sizeof(connectivity_netlink_thread_id));
+ connectivity_netlink_thread_error = 0;
+ pthread_mutex_unlock(&connectivity_lock);
+
+ DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
+
+ return status;
+}
+
+static int stop_dequeue_thread(int shutdown) /* {{{ */
+{
+ int status;
+
+ pthread_mutex_lock(&connectivity_lock);
+
+ if (connectivity_dequeue_thread_loop == 0) {
+ pthread_mutex_unlock(&connectivity_lock);
+ return (-1);
+ }
+
+ connectivity_dequeue_thread_loop = 0;
+ pthread_cond_broadcast(&connectivity_cond);
+ pthread_mutex_unlock(&connectivity_lock);
+
+ if (shutdown == 1) {
+ // Calling pthread_cancel here in
+ // the case of a shutdown just assures that the thread is
+ // gone and that the process has been fully terminated.
+
+ DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
+
+ status = pthread_cancel(connectivity_dequeue_thread_id);
+
+ if (status != 0 && status != ESRCH) {
+ ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
+ status = -1;
+ } else
+ status = 0;
+ } else {
+ status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
+ if (status != 0 && status != ESRCH) {
+ ERROR("connectivity plugin: Stopping dequeue thread failed.");
+ status = -1;
+ } else
+ status = 0;
}
pthread_mutex_lock(&connectivity_lock);
- memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
- connectivity_thread_error = 0;
+ memset(&connectivity_dequeue_thread_id, 0,
+ sizeof(connectivity_dequeue_thread_id));
+ connectivity_dequeue_thread_error = 0;
pthread_mutex_unlock(&connectivity_lock);
- DEBUG("connectivity plugin: Finished requesting stop of thread");
+ DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
return (status);
-} /* }}} int stop_thread */
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads(int shutdown) /* {{{ */
+{
+ int status, status2;
+
+ status = stop_netlink_thread(shutdown);
+ status2 = stop_dequeue_thread(shutdown);
+
+ if (status < 0)
+ return status;
+ else
+ return status2;
+} /* }}} int stop_threads */
static int connectivity_init(void) /* {{{ */
{
"be monitored");
}
- return (start_thread());
+ return (start_threads());
} /* }}} int connectivity_init */
static int connectivity_config(const char *key, const char *value) /* {{{ */
if (strcasecmp(key, "Interface") == 0) {
ignorelist_add(ignorelist, value);
monitor_all_interfaces = 0;
+ } else if (strcasecmp(key, "IgnoreSelected") == 0) {
+ int invert = 1;
+ if (IS_TRUE(value))
+ invert = 0;
+ ignorelist_set_invert(ignorelist, invert);
} else {
return (-1);
}
return (0);
} /* }}} int connectivity_config */
-static void connectivity_dispatch_notification(
- const char *interface, const char *type, /* {{{ */
- gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
+static void
+connectivity_dispatch_notification(const char *interface, const char *type,
+ gauge_t value, gauge_t old_value,
+ long long unsigned int timestamp) {
char *buf = NULL;
notification_t n = {
NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
static int connectivity_read(void) /* {{{ */
{
- if (connectivity_thread_error != 0) {
- ERROR("connectivity plugin: The interface thread had a problem. Restarting "
+ pthread_mutex_lock(&connectivity_lock);
+
+ if (connectivity_netlink_thread_error != 0) {
+
+ pthread_mutex_unlock(&connectivity_lock);
+
+ ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
"it.");
- stop_thread(0);
+ stop_netlink_thread(0);
for (interface_list_t *il = interface_list_head; il != NULL;
il = il->next) {
il->sent = 0;
}
- start_thread();
+ start_netlink_thread();
return (-1);
- } /* if (connectivity_thread_error != 0) */
+ } /* if (connectivity_netlink_thread_error != 0) */
- for (interface_list_t *il = interface_list_head; il != NULL;
- il = il->next) /* {{{ */
- {
- uint32_t status;
- uint32_t prev_status;
- uint32_t sent;
+ if (connectivity_dequeue_thread_error != 0) {
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_lock);
- status = il->status;
- prev_status = il->prev_status;
- sent = il->sent;
+ ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
+ "it.");
- if (status != prev_status && sent == 0) {
- connectivity_dispatch_notification(il->interface, "gauge", status,
- prev_status, il->timestamp);
- il->sent = 1;
- }
+ stop_dequeue_thread(0);
- pthread_mutex_unlock(&connectivity_lock);
- } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+ start_dequeue_thread();
+
+ return (-1);
+ } /* if (connectivity_dequeue_thread_error != 0) */
+
+ pthread_mutex_unlock(&connectivity_lock);
return (0);
} /* }}} int connectivity_read */
interface_list_t *il;
DEBUG("connectivity plugin: Shutting down thread.");
- if (stop_thread(1) < 0)
+ if (stop_threads(1) < 0)
return (-1);
il = interface_list_head;