From: Andrew Bays Date: Thu, 18 Oct 2018 23:02:28 +0000 (-0400) Subject: octo code review changes X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=9b89847f17dd7fa088aa3ab34f3da16131d75eca;p=collectd.git octo code review changes --- diff --git a/src/connectivity.c b/src/connectivity.c index f6356f03..013ef1bf 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -122,11 +122,13 @@ static pthread_t connectivity_netlink_thread_id; static int connectivity_dequeue_thread_loop = 0; static int connectivity_dequeue_thread_error = 0; static pthread_t connectivity_dequeue_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; // static struct mnl_socket *sock; static int nl_sock = -1; static int event_id = 0; +static int unsent_statuses = 0; static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); @@ -378,17 +380,14 @@ static int gen_message_payload(int state, int old_state, const char *interface, if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); if (*buf == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during gen_message_payload: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); goto err; } - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); - yajl_gen_free(g); return 0; @@ -401,23 +400,19 @@ err: static interface_list_t *add_interface(const char *interface, int status, int prev_status) { - interface_list_t *il; - char *interface2; + interface_list_t *il = calloc(1, sizeof(*il)); - il = malloc(sizeof(*il)); if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); return NULL; } - interface2 = strdup(interface); + char *interface2 = strdup(interface); if (interface2 == NULL) { - char errbuf[1024]; sfree(il); ERROR("connectivity plugin: strdup failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); return NULL; } @@ -435,12 +430,10 @@ static interface_list_t *add_interface(const char *interface, int status, } static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); struct nlattr *attr; - const char *dev = NULL; - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); interface_list_t *il = NULL; @@ -453,11 +446,11 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); // Check the list of interfaces we should monitor, if we've chosen // a subset. If we don't care about this one, abort. @@ -492,10 +485,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) { il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); // If the new status is different than the previous status, - // store the previous status and set sent to zero + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch if (il->status != prev_status) { il->prev_status = prev_status; il->sent = 0; + unsent_statuses = 1; } DEBUG("connectivity plugin (%llu): Interface %s status is now %s", @@ -507,9 +502,9 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { @@ -525,39 +520,33 @@ static int msg_handler(struct nlmsghdr *msg) { connectivity_link_state(msg); break; default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", + ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d", msg->nlmsg_type); break; } return 0; } -// static int read_event(struct mnl_socket *nl, -// int (*msg_handler)(struct nlmsghdr *)) { static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { - int status; int ret = 0; - char buf[4096]; - struct nlmsghdr *h; int recv_flags = MSG_DONTWAIT; - // if (nl == NULL) - // return ret; - if (nl == -1) return ret; while (42) { - pthread_mutex_lock(&connectivity_lock); + char buf[4096]; + + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop <= 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ret; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = recv(nl, buf, sizeof(buf), recv_flags); + int status = recv(nl, buf, sizeof(buf), recv_flags); if (status < 0) { @@ -566,19 +555,14 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { // any saved interface status changes. Then continue, but // block and wait for new messages if (errno == EWOULDBLOCK || errno == EAGAIN) { - pthread_mutex_lock(&connectivity_lock); pthread_cond_signal(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); recv_flags = 0; continue; } /* Anything else is an error */ - // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: - // %d\n", - // status); - ERROR("connectivity plugin: read_event: Error recv: %d\n", status); + ERROR("connectivity plugin: read_event: Error recv: %d", status); return status; } @@ -588,19 +572,19 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { recv_flags = MSG_DONTWAIT; if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); + DEBUG("connectivity plugin: read_event: EOF"); } /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { /* Finish reading */ if (h->nlmsg_type == NLMSG_DONE) return ret; /* Message is some kind of error */ if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); + ERROR("connectivity plugin: read_event: Message is an error"); return -1; // Error } @@ -608,12 +592,12 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { if (msg_handler) { ret = (*msg_handler)(h); if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", + ERROR("connectivity plugin: read_event: Message handler error %d", ret); return ret; } } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); + ERROR("connectivity plugin: read_event: Error NULL message handler"); return -1; } } @@ -622,6 +606,7 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { return ret; } +// NOTE: Caller MUST hold connectivity_data_lock when calling this function static void send_interface_status() { for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) /* {{{ */ @@ -640,102 +625,82 @@ static void send_interface_status() { il->sent = 1; } } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + unsent_statuses = 0; } static int read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); - - // This first attempt is necessary because the netlink thread - // might have held the lock while this thread was blocked on - // the lock acquisition just above. And while the netlink thread - // had the lock, it could have called pthread_cond_singal, which - // obviously wouldn't have woken this thread, since this thread - // was not yet waiting on the condition signal. So we need to - // loop through the interfaces and check if any have changed - // status before we wait on the condition signal - send_interface_status(); + pthread_mutex_lock(&connectivity_data_lock); - pthread_cond_wait(&connectivity_cond, &connectivity_lock); + if (!unsent_statuses) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); send_interface_status(); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return 0; } /* }}} int *read_interface_status */ static void *connectivity_netlink_thread(void *arg) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); while (connectivity_netlink_thread_loop > 0) { - int status; + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_unlock(&connectivity_lock); + int status = read_event(nl_sock, msg_handler); - status = read_event(nl_sock, msg_handler); - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { connectivity_netlink_thread_error = 1; break; } - - if (connectivity_netlink_thread_loop <= 0) - break; } /* while (connectivity_netlink_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); } /* }}} void *connectivity_netlink_thread */ static void *connectivity_dequeue_thread(void *arg) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); while (connectivity_dequeue_thread_loop > 0) { - int status; - - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = read_interface_status(); + int status = read_interface_status(); - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { connectivity_dequeue_thread_error = 1; break; } - - if (connectivity_dequeue_thread_loop <= 0) - break; } /* while (connectivity_dequeue_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); } /* }}} void *connectivity_dequeue_thread */ static int nl_connect() { - int rc; - struct sockaddr_nl sa_nl; + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(), + }; nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); if (nl_sock == -1) { - ERROR("connectivity plugin: socket open failed: %d", errno); + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); return -1; } - sa_nl.nl_family = AF_NETLINK; - sa_nl.nl_groups = RTMGRP_LINK; - sa_nl.nl_pid = getpid(); - - rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); if (rc == -1) { - ERROR("connectivity plugin: socket bind failed: %d", errno); + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); close(nl_sock); return -1; } @@ -747,10 +712,10 @@ static int start_netlink_thread(void) /* {{{ */ { int status; - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return (0); } @@ -760,8 +725,10 @@ static int start_netlink_thread(void) /* {{{ */ if (nl_sock == -1) { status = nl_connect(); - if (status != 0) + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return status; + } } status = plugin_thread_create(&connectivity_netlink_thread_id, @@ -770,61 +737,58 @@ static int start_netlink_thread(void) /* {{{ */ if (status != 0) { connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); int status2 = close(nl_sock); if (status2 != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, - status2, strerror(errno)); + status2, STRERRNO); } else nl_sock = -1; return (-1); } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return status; } static int start_dequeue_thread(void) /* {{{ */ { - int status; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_dequeue_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return (0); } connectivity_dequeue_thread_loop = 1; connectivity_dequeue_thread_error = 0; - status = plugin_thread_create(&connectivity_dequeue_thread_id, - /* attr = */ NULL, connectivity_dequeue_thread, - /* arg = */ (void *)0, "connectivity"); + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); if (status != 0) { connectivity_dequeue_thread_loop = 0; ERROR("connectivity plugin: Starting dequeue thread failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return (-1); } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return status; } /* }}} int start_dequeue_thread */ static int start_threads(void) /* {{{ */ { - int status, status2; - - status = start_netlink_thread(); - status2 = start_dequeue_thread(); + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); - if (status < 0) + if (status != 0) return status; else return status2; @@ -832,28 +796,33 @@ static int start_threads(void) /* {{{ */ static int stop_netlink_thread(int shutdown) /* {{{ */ { - int status; + int socket_status, thread_stratus; if (nl_sock != -1) { - status = close(nl_sock); - if (status != 0) { + socket_status = close(nl_sock); + if (socket_status != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, - status, strerror(errno)); - return (-1); + socket_status, STRERRNO); } else nl_sock = -1; - } + } else + socket_status = 0; - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; } + // Set thread termination status connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); if (shutdown == 1) { // Since the thread is blocking, calling pthread_join @@ -869,47 +838,57 @@ static int stop_netlink_thread(int shutdown) /* {{{ */ DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_netlink_thread_id); + thread_stratus = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status); - status = -1; + if (thread_stratus != 0 && thread_stratus != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_stratus); + thread_stratus = -1; } else - status = 0; + thread_stratus = 0; } else { - status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Stopping netlink thread failed."); - status = -1; + thread_stratus = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_stratus != 0 && thread_stratus != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_stratus); + thread_stratus = -1; } else - return 0; + thread_stratus = 0; } - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); memset(&connectivity_netlink_thread_id, 0, sizeof(connectivity_netlink_thread_id)); connectivity_netlink_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); - return status; + if (socket_status != 0) + return socket_status; + else + return thread_stratus; } static int stop_dequeue_thread(int shutdown) /* {{{ */ { int status; - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_dequeue_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return (-1); } + // Set thread termination status connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); if (shutdown == 1) { // Calling pthread_cancel here in @@ -934,11 +913,11 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */ status = 0; } - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); memset(&connectivity_dequeue_thread_id, 0, sizeof(connectivity_dequeue_thread_id)); connectivity_dequeue_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); @@ -947,12 +926,10 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */ static int stop_threads(int shutdown) /* {{{ */ { - int status, status2; + int status = stop_netlink_thread(shutdown); + int status2 = stop_dequeue_thread(shutdown); - status = stop_netlink_thread(shutdown); - status2 = stop_dequeue_thread(shutdown); - - if (status < 0) + if (status != 0) return status; else return status2; @@ -1010,10 +987,8 @@ connectivity_dispatch_notification(const char *interface, const char *type, notification_meta_t *m = calloc(1, sizeof(*m)); if (m == NULL) { - char errbuf[1024]; sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO); return; } @@ -1031,18 +1006,18 @@ connectivity_dispatch_notification(const char *interface, const char *type, plugin_dispatch_notification(&n); plugin_notification_meta_free(n.meta); - // malloc'd in gen_message_payload + // strdup'd in gen_message_payload if (buf != NULL) sfree(buf); } static int connectivity_read(void) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_error != 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); @@ -1063,7 +1038,7 @@ static int connectivity_read(void) /* {{{ */ if (connectivity_dequeue_thread_error != 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); ERROR("connectivity plugin: The dequeue thread had a problem. Restarting " "it."); @@ -1075,20 +1050,18 @@ static int connectivity_read(void) /* {{{ */ return (-1); } /* if (connectivity_dequeue_thread_error != 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return (0); } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interface_list_t *il; - DEBUG("connectivity plugin: Shutting down thread."); if (stop_threads(1) < 0) return (-1); - il = interface_list_head; + interface_list_t *il = interface_list_head; while (il != NULL) { interface_list_t *il_next;