From: Andrew Bays Date: Wed, 17 Oct 2018 16:44:22 +0000 (-0400) Subject: Use a separate dequeue thread to dispatch notifications X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=7023393d66c024facbcc5f6603e52bc86a6a930e;p=collectd.git Use a separate dequeue thread to dispatch notifications --- diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 4f12d3b5..b5e6e09b 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -1600,6 +1600,13 @@ LoadPlugin connectivity Interface eth1 +This example shows C monitoring all interfaces except "eth1". +LoadPlugin connectivity + + Interface eth1 + IgnoreSelected true + + =over 4 =item B I diff --git a/src/connectivity.c b/src/connectivity.c index e39bce17..f6356f03 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -94,6 +94,7 @@ /* * Private data types */ + struct interface_list_s { char *interface; @@ -109,23 +110,37 @@ typedef struct interface_list_s interface_list_t; /* * Private variables */ + static ignorelist_t *ignorelist = NULL; static interface_list_t *interface_list_head = NULL; static int monitor_all_interfaces = 1; -static int connectivity_thread_loop = 0; -static int connectivity_thread_error = 0; -static pthread_t connectivity_thread_id; +static int connectivity_netlink_thread_loop = 0; +static int connectivity_netlink_thread_error = 0; +static pthread_t connectivity_netlink_thread_id; +static int connectivity_dequeue_thread_loop = 0; +static int connectivity_dequeue_thread_error = 0; +static pthread_t connectivity_dequeue_thread_id; static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -static struct mnl_socket *sock; +// static struct mnl_socket *sock; +static int nl_sock = -1; static int event_id = 0; -static const char *config_keys[] = {"Interface"}; +static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* + * Prototype + */ + +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp); + +/* * Private functions */ @@ -517,151 +532,326 @@ static int msg_handler(struct nlmsghdr *msg) { return 0; } -static int read_event(struct mnl_socket *nl, - int (*msg_handler)(struct nlmsghdr *)) { +// static int read_event(struct mnl_socket *nl, +// int (*msg_handler)(struct nlmsghdr *)) { +static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { int status; int ret = 0; char buf[4096]; struct nlmsghdr *h; + int recv_flags = MSG_DONTWAIT; - if (nl == NULL) + // if (nl == NULL) + // return ret; + + if (nl == -1) return ret; - status = mnl_socket_recvfrom(nl, buf, sizeof(buf)); + while (42) { + pthread_mutex_lock(&connectivity_lock); - if (status < 0) { - /* Socket non-blocking so bail out once we have read everything */ - if (errno == EWOULDBLOCK || errno == EAGAIN) + if (connectivity_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&connectivity_lock); return ret; + } - /* Anything else is an error */ - ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n", - status); - return status; - } + pthread_mutex_unlock(&connectivity_lock); - if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); - } + status = recv(nl, buf, sizeof(buf), recv_flags); - /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { - /* Finish reading */ - if (h->nlmsg_type == NLMSG_DONE) - return ret; + if (status < 0) { + + // If there were no more messages to drain from the socket, + // then signal the dequeue thread and allow it to dispatch + // any saved interface status changes. Then continue, but + // block and wait for new messages + if (errno == EWOULDBLOCK || errno == EAGAIN) { + pthread_mutex_lock(&connectivity_lock); + pthread_cond_signal(&connectivity_cond); + pthread_mutex_unlock(&connectivity_lock); + + recv_flags = 0; + continue; + } - /* Message is some kind of error */ - if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); - return -1; // Error + /* Anything else is an error */ + // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: + // %d\n", + // status); + ERROR("connectivity plugin: read_event: Error recv: %d\n", status); + return status; } - /* Call message handler */ - if (msg_handler) { - ret = (*msg_handler)(h); - if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", - ret); + // Message received successfully, so we'll stop blocking on the + // receive call for now (until we get a "would block" error, which + // will be handled above) + recv_flags = MSG_DONTWAIT; + + if (status == 0) { + DEBUG("connectivity plugin: read_event: EOF\n"); + } + + /* We need to handle more than one message per 'recvmsg' */ + for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); + h = NLMSG_NEXT(h, status)) { + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) return ret; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) { + ERROR("connectivity plugin: read_event: Message is an error\n"); + return -1; // Error + } + + /* Call message handler */ + if (msg_handler) { + ret = (*msg_handler)(h); + if (ret < 0) { + ERROR("connectivity plugin: read_event: Message handler error %d\n", + ret); + return ret; + } + } else { + ERROR("connectivity plugin: read_event: Error NULL message handler\n"); + return -1; } - } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); - return -1; } } return ret; } -static void *connectivity_thread(void *arg) /* {{{ */ +static void send_interface_status() { + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) /* {{{ */ + { + uint32_t status; + uint32_t prev_status; + uint32_t sent; + + status = il->status; + prev_status = il->prev_status; + sent = il->sent; + + if (status != prev_status && sent == 0) { + connectivity_dispatch_notification(il->interface, "gauge", status, + prev_status, il->timestamp); + il->sent = 1; + } + } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ +} + +static int read_interface_status() /* {{{ */ { pthread_mutex_lock(&connectivity_lock); - while (connectivity_thread_loop > 0) { + // This first attempt is necessary because the netlink thread + // might have held the lock while this thread was blocked on + // the lock acquisition just above. And while the netlink thread + // had the lock, it could have called pthread_cond_singal, which + // obviously wouldn't have woken this thread, since this thread + // was not yet waiting on the condition signal. So we need to + // loop through the interfaces and check if any have changed + // status before we wait on the condition signal + send_interface_status(); + + pthread_cond_wait(&connectivity_cond, &connectivity_lock); + + send_interface_status(); + + pthread_mutex_unlock(&connectivity_lock); + + return 0; +} /* }}} int *read_interface_status */ + +static void *connectivity_netlink_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_lock); + + while (connectivity_netlink_thread_loop > 0) { + int status; + + pthread_mutex_unlock(&connectivity_lock); + + status = read_event(nl_sock, msg_handler); + + pthread_mutex_lock(&connectivity_lock); + + if (status < 0) { + connectivity_netlink_thread_error = 1; + break; + } + + if (connectivity_netlink_thread_loop <= 0) + break; + } /* while (connectivity_netlink_thread_loop > 0) */ + + pthread_mutex_unlock(&connectivity_lock); + + return ((void *)0); +} /* }}} void *connectivity_netlink_thread */ + +static void *connectivity_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_lock); + + while (connectivity_dequeue_thread_loop > 0) { int status; pthread_mutex_unlock(&connectivity_lock); - status = read_event(sock, msg_handler); + status = read_interface_status(); pthread_mutex_lock(&connectivity_lock); if (status < 0) { - connectivity_thread_error = 1; + connectivity_dequeue_thread_error = 1; break; } - if (connectivity_thread_loop <= 0) + if (connectivity_dequeue_thread_loop <= 0) break; - } /* while (connectivity_thread_loop > 0) */ + } /* while (connectivity_dequeue_thread_loop > 0) */ pthread_mutex_unlock(&connectivity_lock); return ((void *)0); -} /* }}} void *connectivity_thread */ +} /* }}} void *connectivity_dequeue_thread */ + +static int nl_connect() { + int rc; + struct sockaddr_nl sa_nl; + + nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); + if (nl_sock == -1) { + ERROR("connectivity plugin: socket open failed: %d", errno); + return -1; + } + + sa_nl.nl_family = AF_NETLINK; + sa_nl.nl_groups = RTMGRP_LINK; + sa_nl.nl_pid = getpid(); + + rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + if (rc == -1) { + ERROR("connectivity plugin: socket bind failed: %d", errno); + close(nl_sock); + return -1; + } + + return 0; +} -static int start_thread(void) /* {{{ */ +static int start_netlink_thread(void) /* {{{ */ { int status; pthread_mutex_lock(&connectivity_lock); - if (connectivity_thread_loop != 0) { + if (connectivity_netlink_thread_loop != 0) { pthread_mutex_unlock(&connectivity_lock); return (0); } - connectivity_thread_loop = 1; - connectivity_thread_error = 0; + connectivity_netlink_thread_loop = 1; + connectivity_netlink_thread_error = 0; - if (sock == NULL) { - sock = mnl_socket_open(NETLINK_ROUTE); - if (sock == NULL) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_open failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); - } + if (nl_sock == -1) { + status = nl_connect(); - if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_bind failed."); - pthread_mutex_unlock(&connectivity_lock); - return (1); - } + if (status != 0) + return status; } - status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL, - connectivity_thread, + status = plugin_thread_create(&connectivity_netlink_thread_id, + /* attr = */ NULL, connectivity_netlink_thread, /* arg = */ (void *)0, "connectivity"); if (status != 0) { - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); pthread_mutex_unlock(&connectivity_lock); - mnl_socket_close(sock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, strerror(errno)); + } else + nl_sock = -1; + return (-1); } pthread_mutex_unlock(&connectivity_lock); - return (0); -} /* }}} int start_thread */ -static int stop_thread(int shutdown) /* {{{ */ + return status; +} + +static int start_dequeue_thread(void) /* {{{ */ { int status; - if (sock != NULL) - mnl_socket_close(sock); + pthread_mutex_lock(&connectivity_lock); + + if (connectivity_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_lock); + return (0); + } + + connectivity_dequeue_thread_loop = 1; + connectivity_dequeue_thread_error = 0; + + status = plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); + if (status != 0) { + connectivity_dequeue_thread_loop = 0; + ERROR("connectivity plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&connectivity_lock); + return (-1); + } + + pthread_mutex_unlock(&connectivity_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status, status2; + + status = start_netlink_thread(); + status2 = start_dequeue_thread(); + + if (status < 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int status; + + if (nl_sock != -1) { + status = close(nl_sock); + if (status != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status, strerror(errno)); + return (-1); + } else + nl_sock = -1; + } pthread_mutex_lock(&connectivity_lock); - if (connectivity_thread_loop == 0) { + if (connectivity_netlink_thread_loop == 0) { pthread_mutex_unlock(&connectivity_lock); return (-1); } - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; pthread_cond_broadcast(&connectivity_cond); pthread_mutex_unlock(&connectivity_lock); @@ -669,7 +859,7 @@ static int stop_thread(int shutdown) /* {{{ */ // Since the thread is blocking, calling pthread_join // doesn't actually succeed in stopping it. It will stick around // until a NETLINK message is received on the socket (at which - // it will realize that "connectivity_thread_loop" is 0 and will + // it will realize that "connectivity_netlink_thread_loop" is 0 and will // break out of the read loop and be allowed to die). This is // fine when the process isn't supposed to be exiting, but in // the case of a process shutdown, we don't want to have an @@ -677,31 +867,96 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_thread_id); + status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0) { - ERROR("connectivity plugin: Unable to cancel thread: %d", status); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status); status = -1; - } + } else + status = 0; } else { - status = pthread_join(connectivity_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("connectivity plugin: Stopping thread failed."); + status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed."); status = -1; - } + } else + return 0; + } + + pthread_mutex_lock(&connectivity_lock); + memset(&connectivity_netlink_thread_id, 0, + sizeof(connectivity_netlink_thread_id)); + connectivity_netlink_thread_error = 0; + pthread_mutex_unlock(&connectivity_lock); + + DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); + + return status; +} + +static int stop_dequeue_thread(int shutdown) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&connectivity_lock); + + if (connectivity_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_lock); + return (-1); + } + + connectivity_dequeue_thread_loop = 0; + pthread_cond_broadcast(&connectivity_cond); + pthread_mutex_unlock(&connectivity_lock); + + if (shutdown == 1) { + // Calling pthread_cancel here in + // the case of a shutdown just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); + + status = pthread_cancel(connectivity_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; + } else { + status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Stopping dequeue thread failed."); + status = -1; + } else + status = 0; } pthread_mutex_lock(&connectivity_lock); - memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id)); - connectivity_thread_error = 0; + memset(&connectivity_dequeue_thread_id, 0, + sizeof(connectivity_dequeue_thread_id)); + connectivity_dequeue_thread_error = 0; pthread_mutex_unlock(&connectivity_lock); - DEBUG("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); return (status); -} /* }}} int stop_thread */ +} /* }}} int stop_dequeue_thread */ + +static int stop_threads(int shutdown) /* {{{ */ +{ + int status, status2; + + status = stop_netlink_thread(shutdown); + status2 = stop_dequeue_thread(shutdown); + + if (status < 0) + return status; + else + return status2; +} /* }}} int stop_threads */ static int connectivity_init(void) /* {{{ */ { @@ -710,7 +965,7 @@ static int connectivity_init(void) /* {{{ */ "be monitored"); } - return (start_thread()); + return (start_threads()); } /* }}} int connectivity_init */ static int connectivity_config(const char *key, const char *value) /* {{{ */ @@ -722,6 +977,11 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ if (strcasecmp(key, "Interface") == 0) { ignorelist_add(ignorelist, value); monitor_all_interfaces = 0; + } else if (strcasecmp(key, "IgnoreSelected") == 0) { + int invert = 1; + if (IS_TRUE(value)) + invert = 0; + ignorelist_set_invert(ignorelist, invert); } else { return (-1); } @@ -729,9 +989,10 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ return (0); } /* }}} int connectivity_config */ -static void connectivity_dispatch_notification( - const char *interface, const char *type, /* {{{ */ - gauge_t value, gauge_t old_value, long long unsigned int timestamp) { +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp) { char *buf = NULL; notification_t n = { NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; @@ -777,11 +1038,16 @@ static void connectivity_dispatch_notification( static int connectivity_read(void) /* {{{ */ { - if (connectivity_thread_error != 0) { - ERROR("connectivity plugin: The interface thread had a problem. Restarting " + pthread_mutex_lock(&connectivity_lock); + + if (connectivity_netlink_thread_error != 0) { + + pthread_mutex_unlock(&connectivity_lock); + + ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); - stop_thread(0); + stop_netlink_thread(0); for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) { @@ -790,32 +1056,26 @@ static int connectivity_read(void) /* {{{ */ il->sent = 0; } - start_thread(); + start_netlink_thread(); return (-1); - } /* if (connectivity_thread_error != 0) */ + } /* if (connectivity_netlink_thread_error != 0) */ - for (interface_list_t *il = interface_list_head; il != NULL; - il = il->next) /* {{{ */ - { - uint32_t status; - uint32_t prev_status; - uint32_t sent; + if (connectivity_dequeue_thread_error != 0) { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_lock); - status = il->status; - prev_status = il->prev_status; - sent = il->sent; + ERROR("connectivity plugin: The dequeue thread had a problem. Restarting " + "it."); - if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); - il->sent = 1; - } + stop_dequeue_thread(0); - pthread_mutex_unlock(&connectivity_lock); - } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + start_dequeue_thread(); + + return (-1); + } /* if (connectivity_dequeue_thread_error != 0) */ + + pthread_mutex_unlock(&connectivity_lock); return (0); } /* }}} int connectivity_read */ @@ -825,7 +1085,7 @@ static int connectivity_shutdown(void) /* {{{ */ interface_list_t *il; DEBUG("connectivity plugin: Shutting down thread."); - if (stop_thread(1) < 0) + if (stop_threads(1) < 0) return (-1); il = interface_list_head; diff --git a/src/types.db b/src/types.db index 3f8b5818..f4933ee3 100644 --- a/src/types.db +++ b/src/types.db @@ -33,7 +33,6 @@ compression uncompressed:DERIVE:0:U, compressed:DERIVE:0:U compression_ratio value:GAUGE:0:2 commands value:DERIVE:0:U connections value:DERIVE:0:U -connectivity value:GAUGE:0:2 conntrack value:GAUGE:0:4294967295 contextswitch value:DERIVE:0:U cookies value:DERIVE:0:U