From: Andrew Bays Date: Wed, 17 Oct 2018 14:20:39 +0000 (-0400) Subject: Use a separate dequeue thread to dispatch notifications X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=0ad3a0a1b687b79e5ce09e1e54e9a7dc4c924461;p=collectd.git Use a separate dequeue thread to dispatch notifications --- diff --git a/src/procevent.c b/src/procevent.c index a7a0107a..fc8e4302 100644 --- a/src/procevent.c +++ b/src/procevent.c @@ -124,9 +124,12 @@ typedef struct processlist_s processlist_t; */ static ignorelist_t *ignorelist = NULL; -static int procevent_thread_loop = 0; -static int procevent_thread_error = 0; -static pthread_t procevent_thread_id; +static int procevent_netlink_thread_loop = 0; +static int procevent_netlink_thread_error = 0; +static pthread_t procevent_netlink_thread_id; +static int procevent_dequeue_thread_loop = 0; +static int procevent_dequeue_thread_error = 0; +static pthread_t procevent_dequeue_thread_id; static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER; @@ -140,6 +143,14 @@ static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* + * Prototypes + */ + +static void procevent_dispatch_notification(long pid, const char *type, + gauge_t value, char *process, + long long unsigned int timestamp); + +/* * Private functions */ @@ -762,12 +773,14 @@ static int set_proc_ev_listen(bool enable) { return 0; } +// Read from netlink socket and write to ring buffer static int read_event() { int status; int ret = 0; int proc_id = -1; int proc_status = -1; int proc_extra = -1; + int recv_flags = MSG_DONTWAIT; struct __attribute__((aligned(NLMSG_ALIGNTO))) { struct nlmsghdr nl_hdr; struct __attribute__((__packed__)) { @@ -783,22 +796,45 @@ static int read_event() { pthread_mutex_lock(&procevent_lock); - if (procevent_thread_loop <= 0) + if (procevent_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&procevent_lock); return ret; + } pthread_mutex_unlock(&procevent_lock); - status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); + status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags); if (status == 0) { return 0; - } else if (status == -1) { - if (errno != EINTR) { + } else if (status < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + pthread_mutex_lock(&procevent_lock); + + // There was nothing more to receive for now, so... + // If ring head does not equal ring tail, there is data + // in the ring buffer for the dequeue thread to read, so + // signal it + if (ring.head != ring.tail) + pthread_cond_signal(&procevent_cond); + + pthread_mutex_unlock(&procevent_lock); + + // Since there was nothing to receive, set recv to block and + // try again + recv_flags = 0; + continue; + } else if (errno != EINTR) { ERROR("procevent plugin: socket receive error: %d", errno); return -1; } } + // We successfully received a message, so don't block on the next + // read in case there are more (and if there aren't, it will be + // handled above in the error-checking) + recv_flags = MSG_DONTWAIT; + switch (nlcn_msg.proc_ev.what) { case PROC_EVENT_NONE: case PROC_EVENT_FORK: @@ -830,7 +866,15 @@ static int read_event() { next = 0; if (next == ring.tail) { + // Buffer is full, signal the dequeue thread to process the buffer + // and clean it out, and then sleep WARNING("procevent plugin: ring buffer full"); + + pthread_cond_signal(&procevent_cond); + pthread_mutex_unlock(&procevent_lock); + + usleep(1000); + continue; } else { DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id, (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"), @@ -860,11 +904,73 @@ static int read_event() { return ret; } -static void *procevent_thread(void *arg) /* {{{ */ +// Read from ring buffer and dispatch to write plugins +static int read_ring_buffer() { + pthread_mutex_lock(&procevent_lock); + + // If there's currently nothing to read from the buffer, + // then wait + if (ring.head == ring.tail) + pthread_cond_wait(&procevent_cond, &procevent_lock); + + while (ring.head != ring.tail) { + int next = ring.tail + 1; + + if (next >= ring.maxLen) + next = 0; + + if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) { + processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL); + + if (pl != NULL) { + // This process is of interest to us, so publish its EXITED status + procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", + ring.buffer[ring.tail][1], pl->process, + ring.buffer[ring.tail][3]); + DEBUG( + "procevent plugin: PID %ld (%s) EXITED, removing PID from process " + "list", + pl->pid, pl->process); + pl->pid = -1; + pl->last_status = -1; + } + } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) { + // a new process has started, so check if we should monitor it + processlist_t *pl = process_check(ring.buffer[ring.tail][0]); + + // If we had already seen this process name and pid combo before, + // and the last message was a "process started" message, don't send + // the notfication again + + if (pl != NULL && pl->last_status != PROCEVENT_STARTED) { + // This process is of interest to us, so publish its STARTED status + procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", + ring.buffer[ring.tail][1], pl->process, + ring.buffer[ring.tail][3]); + + pl->last_status = PROCEVENT_STARTED; + + DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process " + "list", + pl->pid, pl->process); + } + } + + ring.tail = next; + } + + pthread_mutex_unlock(&procevent_lock); + + return 0; +} + +// Entry point for thread responsible for listening +// to netlink socket and writing data to ring buffer +static void *procevent_netlink_thread(void *arg) /* {{{ */ { pthread_mutex_lock(&procevent_lock); - while (procevent_thread_loop > 0) { + while (procevent_netlink_thread_loop > 0) { int status; pthread_mutex_unlock(&procevent_lock); @@ -876,26 +982,55 @@ static void *procevent_thread(void *arg) /* {{{ */ pthread_mutex_lock(&procevent_lock); if (status < 0) { - procevent_thread_error = 1; + procevent_netlink_thread_error = 1; + break; + } + + if (procevent_netlink_thread_loop <= 0) + break; + } /* while (procevent_netlink_thread_loop > 0) */ + + pthread_mutex_unlock(&procevent_lock); + + return ((void *)0); +} /* }}} void *procevent_netlink_thread */ + +// Entry point for thread responsible for reading from +// ring buffer and dispatching notifications +static void *procevent_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&procevent_lock); + + while (procevent_dequeue_thread_loop > 0) { + int status; + + pthread_mutex_unlock(&procevent_lock); + + status = read_ring_buffer(); + + pthread_mutex_lock(&procevent_lock); + + if (status < 0) { + procevent_dequeue_thread_error = 1; break; } - if (procevent_thread_loop <= 0) + if (procevent_dequeue_thread_loop <= 0) break; - } /* while (procevent_thread_loop > 0) */ + } /* while (procevent_dequeue_thread_loop > 0) */ pthread_mutex_unlock(&procevent_lock); return ((void *)0); -} /* }}} void *procevent_thread */ +} /* }}} void *procevent_dequeue_thread */ -static int start_thread(void) /* {{{ */ +static int start_netlink_thread(void) /* {{{ */ { int status; pthread_mutex_lock(&procevent_lock); - if (procevent_thread_loop != 0) { + if (procevent_netlink_thread_loop != 0) { pthread_mutex_unlock(&procevent_lock); return (0); } @@ -913,24 +1048,67 @@ static int start_thread(void) /* {{{ */ DEBUG("procevent plugin: socket created and bound"); - procevent_thread_loop = 1; - procevent_thread_error = 0; + procevent_netlink_thread_loop = 1; + procevent_netlink_thread_error = 0; - status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL, - procevent_thread, + status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL, + procevent_netlink_thread, /* arg = */ (void *)0, "procevent"); if (status != 0) { - procevent_thread_loop = 0; - ERROR("procevent plugin: Starting thread failed."); + procevent_netlink_thread_loop = 0; + ERROR("procevent plugin: Starting netlink thread failed."); pthread_mutex_unlock(&procevent_lock); return (-1); } pthread_mutex_unlock(&procevent_lock); - return (0); -} /* }}} int start_thread */ -static int stop_thread(int shutdown) /* {{{ */ + return status; +} /* }}} int start_netlink_thread */ + +static int start_dequeue_thread(void) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&procevent_lock); + + if (procevent_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&procevent_lock); + return (0); + } + + procevent_dequeue_thread_loop = 1; + procevent_dequeue_thread_error = 0; + + status = plugin_thread_create(&procevent_dequeue_thread_id, /* attr = */ NULL, + procevent_dequeue_thread, + /* arg = */ (void *)0, "procevent"); + if (status != 0) { + procevent_dequeue_thread_loop = 0; + ERROR("procevent plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&procevent_lock); + return (-1); + } + + pthread_mutex_unlock(&procevent_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status, status2; + + status = start_netlink_thread(); + status2 = start_dequeue_thread(); + + if (status < 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ { int status; @@ -946,12 +1124,12 @@ static int stop_thread(int shutdown) /* {{{ */ pthread_mutex_lock(&procevent_lock); - if (procevent_thread_loop == 0) { + if (procevent_netlink_thread_loop == 0) { pthread_mutex_unlock(&procevent_lock); return (-1); } - procevent_thread_loop = 0; + procevent_netlink_thread_loop = 0; pthread_cond_broadcast(&procevent_cond); pthread_mutex_unlock(&procevent_lock); @@ -960,31 +1138,94 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("procevent plugin: Canceling thread for process shutdown"); + DEBUG("procevent plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(procevent_thread_id); + status = pthread_cancel(procevent_netlink_thread_id); - if (status != 0) { - ERROR("procevent plugin: Unable to cancel thread: %d", status); + if (status != 0 && status != ESRCH) { + ERROR("procevent plugin: Unable to cancel netlink thread: %d", status); status = -1; - } + } else + status = 0; } else { - status = pthread_join(procevent_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("procevent plugin: Stopping thread failed."); + status = pthread_join(procevent_netlink_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("procevent plugin: Stopping netlink thread failed."); status = -1; - } + } else + status = 0; + } + + pthread_mutex_lock(&procevent_lock); + memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id)); + procevent_netlink_thread_error = 0; + pthread_mutex_unlock(&procevent_lock); + + DEBUG("procevent plugin: Finished requesting stop of netlink thread"); + + return (status); +} /* }}} int stop_netlink_thread */ + +static int stop_dequeue_thread(int shutdown) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&procevent_lock); + + if (procevent_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&procevent_lock); + return (-1); + } + + procevent_dequeue_thread_loop = 0; + pthread_cond_broadcast(&procevent_cond); + pthread_mutex_unlock(&procevent_lock); + + if (shutdown == 1) { + // Calling pthread_cancel here in + // the case of a shutdown just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("procevent plugin: Canceling dequeue thread for process shutdown"); + + status = pthread_cancel(procevent_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; + } else { + status = pthread_join(procevent_dequeue_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("procevent plugin: Stopping dequeue thread failed."); + status = -1; + } else + status = 0; } pthread_mutex_lock(&procevent_lock); - memset(&procevent_thread_id, 0, sizeof(procevent_thread_id)); - procevent_thread_error = 0; + memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id)); + procevent_dequeue_thread_error = 0; pthread_mutex_unlock(&procevent_lock); - DEBUG("procevent plugin: Finished requesting stop of thread"); + DEBUG("procevent plugin: Finished requesting stop of dequeue thread"); return (status); -} /* }}} int stop_thread */ +} /* }}} int stop_dequeue_thread */ + +static int stop_threads(int shutdown) /* {{{ */ +{ + int status, status2; + + status = stop_netlink_thread(shutdown); + status2 = stop_dequeue_thread(shutdown); + + if (status < 0) + return status; + else + return status2; +} /* }}} int stop_threads */ static int procevent_init(void) /* {{{ */ { @@ -1013,7 +1254,7 @@ static int procevent_init(void) /* {{{ */ return (-1); } - return (start_thread()); + return (start_threads()); } /* }}} int procevent_init */ static int procevent_config(const char *key, const char *value) /* {{{ */ @@ -1095,64 +1336,33 @@ static void procevent_dispatch_notification(long pid, static int procevent_read(void) /* {{{ */ { - if (procevent_thread_error != 0) { - ERROR( - "procevent plugin: The interface thread had a problem. Restarting it."); + pthread_mutex_lock(&procevent_lock); - stop_thread(0); + if (procevent_netlink_thread_error != 0) { - start_thread(); + pthread_mutex_unlock(&procevent_lock); - return (-1); - } /* if (procevent_thread_error != 0) */ + ERROR("procevent plugin: The netlink thread had a problem. Restarting it."); - pthread_mutex_lock(&procevent_lock); + stop_netlink_thread(0); - while (ring.head != ring.tail) { - int next = ring.tail + 1; - - if (next >= ring.maxLen) - next = 0; + start_netlink_thread(); - if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) { - processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL); + return (-1); + } /* if (procevent_netlink_thread_error != 0) */ - if (pl != NULL) { - // This process is of interest to us, so publish its EXITED status - procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", - ring.buffer[ring.tail][1], pl->process, - ring.buffer[ring.tail][3]); - DEBUG( - "procevent plugin: PID %ld (%s) EXITED, removing PID from process " - "list", - pl->pid, pl->process); - pl->pid = -1; - pl->last_status = -1; - } - } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) { - // a new process has started, so check if we should monitor it - processlist_t *pl = process_check(ring.buffer[ring.tail][0]); + if (procevent_dequeue_thread_error != 0) { - // If we had already seen this process name and pid combo before, - // and the last message was a "process started" message, don't send - // the notfication again + pthread_mutex_unlock(&procevent_lock); - if (pl != NULL && pl->last_status != PROCEVENT_STARTED) { - // This process is of interest to us, so publish its STARTED status - procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", - ring.buffer[ring.tail][1], pl->process, - ring.buffer[ring.tail][3]); + ERROR("procevent plugin: The dequeue thread had a problem. Restarting it."); - pl->last_status = PROCEVENT_STARTED; + stop_dequeue_thread(0); - DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process " - "list", - pl->pid, pl->process); - } - } + start_dequeue_thread(); - ring.tail = next; - } + return (-1); + } /* if (procevent_dequeue_thread_error != 0) */ pthread_mutex_unlock(&procevent_lock); @@ -1161,12 +1371,12 @@ static int procevent_read(void) /* {{{ */ static int procevent_shutdown(void) /* {{{ */ { + int status; processlist_t *pl; - DEBUG("procevent plugin: Shutting down thread."); + DEBUG("procevent plugin: Shutting down threads."); - if (stop_thread(1) < 0) - return (-1); + status = stop_threads(1); for (int i = 0; i < buffer_length; i++) { free(ring.buffer[i]); @@ -1186,7 +1396,7 @@ static int procevent_shutdown(void) /* {{{ */ pl = pl_next; } - return (0); + return status; } /* }}} int procevent_shutdown */ void module_register(void) {