static int connectivity_dequeue_thread_loop = 0;
static int connectivity_dequeue_thread_error = 0;
static pthread_t connectivity_dequeue_thread_id;
-static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
// static struct mnl_socket *sock;
static int nl_sock = -1;
static int event_id = 0;
+static int unsent_statuses = 0;
static const char *config_keys[] = {"Interface", "IgnoreSelected"};
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
goto err;
- *buf = malloc(strlen((char *)buf2) + 1);
+ *buf = strdup((char *)buf2);
if (*buf == NULL) {
- char errbuf[1024];
- ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
+ STRERRNO);
goto err;
}
- sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
-
yajl_gen_free(g);
return 0;
static interface_list_t *add_interface(const char *interface, int status,
int prev_status) {
- interface_list_t *il;
- char *interface2;
+ interface_list_t *il = calloc(1, sizeof(*il));
- il = malloc(sizeof(*il));
if (il == NULL) {
- char errbuf[1024];
- ERROR("connectivity plugin: malloc failed during add_interface: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("connectivity plugin: calloc failed during add_interface: %s",
+ STRERRNO);
return NULL;
}
- interface2 = strdup(interface);
+ char *interface2 = strdup(interface);
if (interface2 == NULL) {
- char errbuf[1024];
sfree(il);
ERROR("connectivity plugin: strdup failed during add_interface: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ STRERRNO);
return NULL;
}
}
static int connectivity_link_state(struct nlmsghdr *msg) {
- int retval = 0;
struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
struct nlattr *attr;
- const char *dev = NULL;
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_data_lock);
interface_list_t *il = NULL;
ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
"mnl_attr_validate "
"failed.");
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_data_lock);
return MNL_CB_ERROR;
}
- dev = mnl_attr_get_str(attr);
+ const char *dev = mnl_attr_get_str(attr);
// Check the list of interfaces we should monitor, if we've chosen
// a subset. If we don't care about this one, abort.
il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
// If the new status is different than the previous status,
- // store the previous status and set sent to zero
+ // store the previous status and set sent to zero, and set the
+ // global flag to indicate there are statuses to dispatch
if (il->status != prev_status) {
il->prev_status = prev_status;
il->sent = 0;
+ unsent_statuses = 1;
}
DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
break;
}
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_data_lock);
- return retval;
+ return 0;
}
static int msg_handler(struct nlmsghdr *msg) {
connectivity_link_state(msg);
break;
default:
- ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
+ ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d",
msg->nlmsg_type);
break;
}
return 0;
}
-// static int read_event(struct mnl_socket *nl,
-// int (*msg_handler)(struct nlmsghdr *)) {
static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
- int status;
int ret = 0;
- char buf[4096];
- struct nlmsghdr *h;
int recv_flags = MSG_DONTWAIT;
- // if (nl == NULL)
- // return ret;
-
if (nl == -1)
return ret;
while (42) {
- pthread_mutex_lock(&connectivity_lock);
+ char buf[4096];
+
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_netlink_thread_loop <= 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return ret;
}
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
- status = recv(nl, buf, sizeof(buf), recv_flags);
+ int status = recv(nl, buf, sizeof(buf), recv_flags);
if (status < 0) {
// any saved interface status changes. Then continue, but
// block and wait for new messages
if (errno == EWOULDBLOCK || errno == EAGAIN) {
- pthread_mutex_lock(&connectivity_lock);
pthread_cond_signal(&connectivity_cond);
- pthread_mutex_unlock(&connectivity_lock);
recv_flags = 0;
continue;
}
/* Anything else is an error */
- // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
- // %d\n",
- // status);
- ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
+ ERROR("connectivity plugin: read_event: Error recv: %d", status);
return status;
}
recv_flags = MSG_DONTWAIT;
if (status == 0) {
- DEBUG("connectivity plugin: read_event: EOF\n");
+ DEBUG("connectivity plugin: read_event: EOF");
}
/* We need to handle more than one message per 'recvmsg' */
- for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
- h = NLMSG_NEXT(h, status)) {
+ for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
+ NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
/* Finish reading */
if (h->nlmsg_type == NLMSG_DONE)
return ret;
/* Message is some kind of error */
if (h->nlmsg_type == NLMSG_ERROR) {
- ERROR("connectivity plugin: read_event: Message is an error\n");
+ ERROR("connectivity plugin: read_event: Message is an error");
return -1; // Error
}
if (msg_handler) {
ret = (*msg_handler)(h);
if (ret < 0) {
- ERROR("connectivity plugin: read_event: Message handler error %d\n",
+ ERROR("connectivity plugin: read_event: Message handler error %d",
ret);
return ret;
}
} else {
- ERROR("connectivity plugin: read_event: Error NULL message handler\n");
+ ERROR("connectivity plugin: read_event: Error NULL message handler");
return -1;
}
}
return ret;
}
+// NOTE: Caller MUST hold connectivity_data_lock when calling this function
static void send_interface_status() {
for (interface_list_t *il = interface_list_head; il != NULL;
il = il->next) /* {{{ */
il->sent = 1;
}
} /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+
+ unsent_statuses = 0;
}
static int read_interface_status() /* {{{ */
{
- pthread_mutex_lock(&connectivity_lock);
-
- // This first attempt is necessary because the netlink thread
- // might have held the lock while this thread was blocked on
- // the lock acquisition just above. And while the netlink thread
- // had the lock, it could have called pthread_cond_singal, which
- // obviously wouldn't have woken this thread, since this thread
- // was not yet waiting on the condition signal. So we need to
- // loop through the interfaces and check if any have changed
- // status before we wait on the condition signal
- send_interface_status();
+ pthread_mutex_lock(&connectivity_data_lock);
- pthread_cond_wait(&connectivity_cond, &connectivity_lock);
+ if (!unsent_statuses)
+ pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
send_interface_status();
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_data_lock);
return 0;
} /* }}} int *read_interface_status */
static void *connectivity_netlink_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
while (connectivity_netlink_thread_loop > 0) {
- int status;
+ pthread_mutex_unlock(&connectivity_threads_lock);
- pthread_mutex_unlock(&connectivity_lock);
+ int status = read_event(nl_sock, msg_handler);
- status = read_event(nl_sock, msg_handler);
-
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (status < 0) {
connectivity_netlink_thread_error = 1;
break;
}
-
- if (connectivity_netlink_thread_loop <= 0)
- break;
} /* while (connectivity_netlink_thread_loop > 0) */
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return ((void *)0);
} /* }}} void *connectivity_netlink_thread */
static void *connectivity_dequeue_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
while (connectivity_dequeue_thread_loop > 0) {
- int status;
-
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
- status = read_interface_status();
+ int status = read_interface_status();
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (status < 0) {
connectivity_dequeue_thread_error = 1;
break;
}
-
- if (connectivity_dequeue_thread_loop <= 0)
- break;
} /* while (connectivity_dequeue_thread_loop > 0) */
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return ((void *)0);
} /* }}} void *connectivity_dequeue_thread */
static int nl_connect() {
- int rc;
- struct sockaddr_nl sa_nl;
+ struct sockaddr_nl sa_nl = {
+ .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
+ };
nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
if (nl_sock == -1) {
- ERROR("connectivity plugin: socket open failed: %d", errno);
+ ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
return -1;
}
- sa_nl.nl_family = AF_NETLINK;
- sa_nl.nl_groups = RTMGRP_LINK;
- sa_nl.nl_pid = getpid();
-
- rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+ int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
if (rc == -1) {
- ERROR("connectivity plugin: socket bind failed: %d", errno);
+ ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
close(nl_sock);
return -1;
}
{
int status;
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_netlink_thread_loop != 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return (0);
}
if (nl_sock == -1) {
status = nl_connect();
- if (status != 0)
+ if (status != 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
return status;
+ }
}
status = plugin_thread_create(&connectivity_netlink_thread_id,
if (status != 0) {
connectivity_netlink_thread_loop = 0;
ERROR("connectivity plugin: Starting thread failed.");
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
int status2 = close(nl_sock);
if (status2 != 0) {
ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
- status2, strerror(errno));
+ status2, STRERRNO);
} else
nl_sock = -1;
return (-1);
}
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return status;
}
static int start_dequeue_thread(void) /* {{{ */
{
- int status;
-
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_dequeue_thread_loop != 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return (0);
}
connectivity_dequeue_thread_loop = 1;
connectivity_dequeue_thread_error = 0;
- status = plugin_thread_create(&connectivity_dequeue_thread_id,
- /* attr = */ NULL, connectivity_dequeue_thread,
- /* arg = */ (void *)0, "connectivity");
+ int status =
+ plugin_thread_create(&connectivity_dequeue_thread_id,
+ /* attr = */ NULL, connectivity_dequeue_thread,
+ /* arg = */ (void *)0, "connectivity");
if (status != 0) {
connectivity_dequeue_thread_loop = 0;
ERROR("connectivity plugin: Starting dequeue thread failed.");
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return (-1);
}
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return status;
} /* }}} int start_dequeue_thread */
static int start_threads(void) /* {{{ */
{
- int status, status2;
-
- status = start_netlink_thread();
- status2 = start_dequeue_thread();
+ int status = start_netlink_thread();
+ int status2 = start_dequeue_thread();
- if (status < 0)
+ if (status != 0)
return status;
else
return status2;
static int stop_netlink_thread(int shutdown) /* {{{ */
{
- int status;
+ int socket_status, thread_stratus;
if (nl_sock != -1) {
- status = close(nl_sock);
- if (status != 0) {
+ socket_status = close(nl_sock);
+ if (socket_status != 0) {
ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
- status, strerror(errno));
- return (-1);
+ socket_status, STRERRNO);
} else
nl_sock = -1;
- }
+ } else
+ socket_status = 0;
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_netlink_thread_loop == 0) {
- pthread_mutex_unlock(&connectivity_lock);
- return (-1);
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ // Thread has already been terminated, nothing more to attempt
+ return socket_status;
}
+ // Set thread termination status
connectivity_netlink_thread_loop = 0;
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ // Let threads waiting on access to the interface list know to move
+ // on such that they'll see the threads termination status
pthread_cond_broadcast(&connectivity_cond);
- pthread_mutex_unlock(&connectivity_lock);
if (shutdown == 1) {
// Since the thread is blocking, calling pthread_join
DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(connectivity_netlink_thread_id);
+ thread_stratus = pthread_cancel(connectivity_netlink_thread_id);
- if (status != 0 && status != ESRCH) {
- ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
- status = -1;
+ if (thread_stratus != 0 && thread_stratus != ESRCH) {
+ ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
+ thread_stratus);
+ thread_stratus = -1;
} else
- status = 0;
+ thread_stratus = 0;
} else {
- status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
- if (status != 0 && status != ESRCH) {
- ERROR("connectivity plugin: Stopping netlink thread failed.");
- status = -1;
+ thread_stratus =
+ pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+ if (thread_stratus != 0 && thread_stratus != ESRCH) {
+ ERROR("connectivity plugin: Stopping netlink thread failed: %d",
+ thread_stratus);
+ thread_stratus = -1;
} else
- return 0;
+ thread_stratus = 0;
}
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
memset(&connectivity_netlink_thread_id, 0,
sizeof(connectivity_netlink_thread_id));
connectivity_netlink_thread_error = 0;
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
- return status;
+ if (socket_status != 0)
+ return socket_status;
+ else
+ return thread_stratus;
}
static int stop_dequeue_thread(int shutdown) /* {{{ */
{
int status;
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_dequeue_thread_loop == 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return (-1);
}
+ // Set thread termination status
connectivity_dequeue_thread_loop = 0;
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ // Let threads waiting on access to the interface list know to move
+ // on such that they'll see the threads termination status
pthread_cond_broadcast(&connectivity_cond);
- pthread_mutex_unlock(&connectivity_lock);
if (shutdown == 1) {
// Calling pthread_cancel here in
status = 0;
}
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
memset(&connectivity_dequeue_thread_id, 0,
sizeof(connectivity_dequeue_thread_id));
connectivity_dequeue_thread_error = 0;
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
static int stop_threads(int shutdown) /* {{{ */
{
- int status, status2;
+ int status = stop_netlink_thread(shutdown);
+ int status2 = stop_dequeue_thread(shutdown);
- status = stop_netlink_thread(shutdown);
- status2 = stop_dequeue_thread(shutdown);
-
- if (status < 0)
+ if (status != 0)
return status;
else
return status2;
notification_meta_t *m = calloc(1, sizeof(*m));
if (m == NULL) {
- char errbuf[1024];
sfree(buf);
- ERROR("connectivity plugin: unable to allocate metadata: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO);
return;
}
plugin_dispatch_notification(&n);
plugin_notification_meta_free(n.meta);
- // malloc'd in gen_message_payload
+ // strdup'd in gen_message_payload
if (buf != NULL)
sfree(buf);
}
static int connectivity_read(void) /* {{{ */
{
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
if (connectivity_netlink_thread_error != 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
"it.");
if (connectivity_dequeue_thread_error != 0) {
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
"it.");
return (-1);
} /* if (connectivity_dequeue_thread_error != 0) */
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_threads_lock);
return (0);
} /* }}} int connectivity_read */
static int connectivity_shutdown(void) /* {{{ */
{
- interface_list_t *il;
-
DEBUG("connectivity plugin: Shutting down thread.");
if (stop_threads(1) < 0)
return (-1);
- il = interface_list_head;
+ interface_list_t *il = interface_list_head;
while (il != NULL) {
interface_list_t *il_next;