*/
static ignorelist_t *ignorelist = NULL;
-static int procevent_thread_loop = 0;
-static int procevent_thread_error = 0;
-static pthread_t procevent_thread_id;
+static int procevent_netlink_thread_loop = 0;
+static int procevent_netlink_thread_error = 0;
+static pthread_t procevent_netlink_thread_id;
+static int procevent_dequeue_thread_loop = 0;
+static int procevent_dequeue_thread_error = 0;
+static pthread_t procevent_dequeue_thread_id;
static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
/*
+ * Prototypes
+ */
+
+static void procevent_dispatch_notification(long pid, const char *type,
+ gauge_t value, char *process,
+ long long unsigned int timestamp);
+
+/*
* Private functions
*/
return 0;
}
+// Read from netlink socket and write to ring buffer
static int read_event() {
int status;
int ret = 0;
int proc_id = -1;
int proc_status = -1;
int proc_extra = -1;
+ int recv_flags = MSG_DONTWAIT;
struct __attribute__((aligned(NLMSG_ALIGNTO))) {
struct nlmsghdr nl_hdr;
struct __attribute__((__packed__)) {
pthread_mutex_lock(&procevent_lock);
- if (procevent_thread_loop <= 0)
+ if (procevent_netlink_thread_loop <= 0) {
+ pthread_mutex_unlock(&procevent_lock);
return ret;
+ }
pthread_mutex_unlock(&procevent_lock);
- status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+ status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
if (status == 0) {
return 0;
- } else if (status == -1) {
- if (errno != EINTR) {
+ } else if (status < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ pthread_mutex_lock(&procevent_lock);
+
+ // There was nothing more to receive for now, so...
+ // If ring head does not equal ring tail, there is data
+ // in the ring buffer for the dequeue thread to read, so
+ // signal it
+ if (ring.head != ring.tail)
+ pthread_cond_signal(&procevent_cond);
+
+ pthread_mutex_unlock(&procevent_lock);
+
+ // Since there was nothing to receive, set recv to block and
+ // try again
+ recv_flags = 0;
+ continue;
+ } else if (errno != EINTR) {
ERROR("procevent plugin: socket receive error: %d", errno);
return -1;
}
}
+ // We successfully received a message, so don't block on the next
+ // read in case there are more (and if there aren't, it will be
+ // handled above in the error-checking)
+ recv_flags = MSG_DONTWAIT;
+
switch (nlcn_msg.proc_ev.what) {
case PROC_EVENT_NONE:
case PROC_EVENT_FORK:
next = 0;
if (next == ring.tail) {
+ // Buffer is full, signal the dequeue thread to process the buffer
+ // and clean it out, and then sleep
WARNING("procevent plugin: ring buffer full");
+
+ pthread_cond_signal(&procevent_cond);
+ pthread_mutex_unlock(&procevent_lock);
+
+ usleep(1000);
+ continue;
} else {
DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
(proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
return ret;
}
-static void *procevent_thread(void *arg) /* {{{ */
+// Read from ring buffer and dispatch to write plugins
+static int read_ring_buffer() {
+ pthread_mutex_lock(&procevent_lock);
+
+ // If there's currently nothing to read from the buffer,
+ // then wait
+ if (ring.head == ring.tail)
+ pthread_cond_wait(&procevent_cond, &procevent_lock);
+
+ while (ring.head != ring.tail) {
+ int next = ring.tail + 1;
+
+ if (next >= ring.maxLen)
+ next = 0;
+
+ if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
+ processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+
+ if (pl != NULL) {
+ // This process is of interest to us, so publish its EXITED status
+ procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
+ ring.buffer[ring.tail][1], pl->process,
+ ring.buffer[ring.tail][3]);
+ DEBUG(
+ "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
+ "list",
+ pl->pid, pl->process);
+ pl->pid = -1;
+ pl->last_status = -1;
+ }
+ } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
+ // a new process has started, so check if we should monitor it
+ processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+
+ // If we had already seen this process name and pid combo before,
+ // and the last message was a "process started" message, don't send
+ // the notfication again
+
+ if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
+ // This process is of interest to us, so publish its STARTED status
+ procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
+ ring.buffer[ring.tail][1], pl->process,
+ ring.buffer[ring.tail][3]);
+
+ pl->last_status = PROCEVENT_STARTED;
+
+ DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
+ "list",
+ pl->pid, pl->process);
+ }
+ }
+
+ ring.tail = next;
+ }
+
+ pthread_mutex_unlock(&procevent_lock);
+
+ return 0;
+}
+
+// Entry point for thread responsible for listening
+// to netlink socket and writing data to ring buffer
+static void *procevent_netlink_thread(void *arg) /* {{{ */
{
pthread_mutex_lock(&procevent_lock);
- while (procevent_thread_loop > 0) {
+ while (procevent_netlink_thread_loop > 0) {
int status;
pthread_mutex_unlock(&procevent_lock);
pthread_mutex_lock(&procevent_lock);
if (status < 0) {
- procevent_thread_error = 1;
+ procevent_netlink_thread_error = 1;
+ break;
+ }
+
+ if (procevent_netlink_thread_loop <= 0)
+ break;
+ } /* while (procevent_netlink_thread_loop > 0) */
+
+ pthread_mutex_unlock(&procevent_lock);
+
+ return ((void *)0);
+} /* }}} void *procevent_netlink_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *procevent_dequeue_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&procevent_lock);
+
+ while (procevent_dequeue_thread_loop > 0) {
+ int status;
+
+ pthread_mutex_unlock(&procevent_lock);
+
+ status = read_ring_buffer();
+
+ pthread_mutex_lock(&procevent_lock);
+
+ if (status < 0) {
+ procevent_dequeue_thread_error = 1;
break;
}
- if (procevent_thread_loop <= 0)
+ if (procevent_dequeue_thread_loop <= 0)
break;
- } /* while (procevent_thread_loop > 0) */
+ } /* while (procevent_dequeue_thread_loop > 0) */
pthread_mutex_unlock(&procevent_lock);
return ((void *)0);
-} /* }}} void *procevent_thread */
+} /* }}} void *procevent_dequeue_thread */
-static int start_thread(void) /* {{{ */
+static int start_netlink_thread(void) /* {{{ */
{
int status;
pthread_mutex_lock(&procevent_lock);
- if (procevent_thread_loop != 0) {
+ if (procevent_netlink_thread_loop != 0) {
pthread_mutex_unlock(&procevent_lock);
return (0);
}
DEBUG("procevent plugin: socket created and bound");
- procevent_thread_loop = 1;
- procevent_thread_error = 0;
+ procevent_netlink_thread_loop = 1;
+ procevent_netlink_thread_error = 0;
- status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
- procevent_thread,
+ status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
+ procevent_netlink_thread,
/* arg = */ (void *)0, "procevent");
if (status != 0) {
- procevent_thread_loop = 0;
- ERROR("procevent plugin: Starting thread failed.");
+ procevent_netlink_thread_loop = 0;
+ ERROR("procevent plugin: Starting netlink thread failed.");
pthread_mutex_unlock(&procevent_lock);
return (-1);
}
pthread_mutex_unlock(&procevent_lock);
- return (0);
-} /* }}} int start_thread */
-static int stop_thread(int shutdown) /* {{{ */
+ return status;
+} /* }}} int start_netlink_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+ int status;
+
+ pthread_mutex_lock(&procevent_lock);
+
+ if (procevent_dequeue_thread_loop != 0) {
+ pthread_mutex_unlock(&procevent_lock);
+ return (0);
+ }
+
+ procevent_dequeue_thread_loop = 1;
+ procevent_dequeue_thread_error = 0;
+
+ status = plugin_thread_create(&procevent_dequeue_thread_id, /* attr = */ NULL,
+ procevent_dequeue_thread,
+ /* arg = */ (void *)0, "procevent");
+ if (status != 0) {
+ procevent_dequeue_thread_loop = 0;
+ ERROR("procevent plugin: Starting dequeue thread failed.");
+ pthread_mutex_unlock(&procevent_lock);
+ return (-1);
+ }
+
+ pthread_mutex_unlock(&procevent_lock);
+
+ return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+ int status, status2;
+
+ status = start_netlink_thread();
+ status2 = start_dequeue_thread();
+
+ if (status < 0)
+ return status;
+ else
+ return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
{
int status;
pthread_mutex_lock(&procevent_lock);
- if (procevent_thread_loop == 0) {
+ if (procevent_netlink_thread_loop == 0) {
pthread_mutex_unlock(&procevent_lock);
return (-1);
}
- procevent_thread_loop = 0;
+ procevent_netlink_thread_loop = 0;
pthread_cond_broadcast(&procevent_cond);
pthread_mutex_unlock(&procevent_lock);
// the case of a shutdown just assures that the thread is
// gone and that the process has been fully terminated.
- DEBUG("procevent plugin: Canceling thread for process shutdown");
+ DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(procevent_thread_id);
+ status = pthread_cancel(procevent_netlink_thread_id);
- if (status != 0) {
- ERROR("procevent plugin: Unable to cancel thread: %d", status);
+ if (status != 0 && status != ESRCH) {
+ ERROR("procevent plugin: Unable to cancel netlink thread: %d", status);
status = -1;
- }
+ } else
+ status = 0;
} else {
- status = pthread_join(procevent_thread_id, /* return = */ NULL);
- if (status != 0) {
- ERROR("procevent plugin: Stopping thread failed.");
+ status = pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+ if (status != 0 && status != ESRCH) {
+ ERROR("procevent plugin: Stopping netlink thread failed.");
status = -1;
- }
+ } else
+ status = 0;
+ }
+
+ pthread_mutex_lock(&procevent_lock);
+ memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
+ procevent_netlink_thread_error = 0;
+ pthread_mutex_unlock(&procevent_lock);
+
+ DEBUG("procevent plugin: Finished requesting stop of netlink thread");
+
+ return (status);
+} /* }}} int stop_netlink_thread */
+
+static int stop_dequeue_thread(int shutdown) /* {{{ */
+{
+ int status;
+
+ pthread_mutex_lock(&procevent_lock);
+
+ if (procevent_dequeue_thread_loop == 0) {
+ pthread_mutex_unlock(&procevent_lock);
+ return (-1);
+ }
+
+ procevent_dequeue_thread_loop = 0;
+ pthread_cond_broadcast(&procevent_cond);
+ pthread_mutex_unlock(&procevent_lock);
+
+ if (shutdown == 1) {
+ // Calling pthread_cancel here in
+ // the case of a shutdown just assures that the thread is
+ // gone and that the process has been fully terminated.
+
+ DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
+
+ status = pthread_cancel(procevent_dequeue_thread_id);
+
+ if (status != 0 && status != ESRCH) {
+ ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
+ status = -1;
+ } else
+ status = 0;
+ } else {
+ status = pthread_join(procevent_dequeue_thread_id, /* return = */ NULL);
+ if (status != 0 && status != ESRCH) {
+ ERROR("procevent plugin: Stopping dequeue thread failed.");
+ status = -1;
+ } else
+ status = 0;
}
pthread_mutex_lock(&procevent_lock);
- memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
- procevent_thread_error = 0;
+ memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
+ procevent_dequeue_thread_error = 0;
pthread_mutex_unlock(&procevent_lock);
- DEBUG("procevent plugin: Finished requesting stop of thread");
+ DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
return (status);
-} /* }}} int stop_thread */
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads(int shutdown) /* {{{ */
+{
+ int status, status2;
+
+ status = stop_netlink_thread(shutdown);
+ status2 = stop_dequeue_thread(shutdown);
+
+ if (status < 0)
+ return status;
+ else
+ return status2;
+} /* }}} int stop_threads */
static int procevent_init(void) /* {{{ */
{
return (-1);
}
- return (start_thread());
+ return (start_threads());
} /* }}} int procevent_init */
static int procevent_config(const char *key, const char *value) /* {{{ */
static int procevent_read(void) /* {{{ */
{
- if (procevent_thread_error != 0) {
- ERROR(
- "procevent plugin: The interface thread had a problem. Restarting it.");
+ pthread_mutex_lock(&procevent_lock);
- stop_thread(0);
+ if (procevent_netlink_thread_error != 0) {
- start_thread();
+ pthread_mutex_unlock(&procevent_lock);
- return (-1);
- } /* if (procevent_thread_error != 0) */
+ ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
- pthread_mutex_lock(&procevent_lock);
+ stop_netlink_thread(0);
- while (ring.head != ring.tail) {
- int next = ring.tail + 1;
-
- if (next >= ring.maxLen)
- next = 0;
+ start_netlink_thread();
- if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
- processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+ return (-1);
+ } /* if (procevent_netlink_thread_error != 0) */
- if (pl != NULL) {
- // This process is of interest to us, so publish its EXITED status
- procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
- ring.buffer[ring.tail][1], pl->process,
- ring.buffer[ring.tail][3]);
- DEBUG(
- "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
- "list",
- pl->pid, pl->process);
- pl->pid = -1;
- pl->last_status = -1;
- }
- } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
- // a new process has started, so check if we should monitor it
- processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+ if (procevent_dequeue_thread_error != 0) {
- // If we had already seen this process name and pid combo before,
- // and the last message was a "process started" message, don't send
- // the notfication again
+ pthread_mutex_unlock(&procevent_lock);
- if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
- // This process is of interest to us, so publish its STARTED status
- procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
- ring.buffer[ring.tail][1], pl->process,
- ring.buffer[ring.tail][3]);
+ ERROR("procevent plugin: The dequeue thread had a problem. Restarting it.");
- pl->last_status = PROCEVENT_STARTED;
+ stop_dequeue_thread(0);
- DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
- "list",
- pl->pid, pl->process);
- }
- }
+ start_dequeue_thread();
- ring.tail = next;
- }
+ return (-1);
+ } /* if (procevent_dequeue_thread_error != 0) */
pthread_mutex_unlock(&procevent_lock);
static int procevent_shutdown(void) /* {{{ */
{
+ int status;
processlist_t *pl;
- DEBUG("procevent plugin: Shutting down thread.");
+ DEBUG("procevent plugin: Shutting down threads.");
- if (stop_thread(1) < 0)
- return (-1);
+ status = stop_threads(1);
for (int i = 0; i < buffer_length; i++) {
free(ring.buffer[i]);
pl = pl_next;
}
- return (0);
+ return status;
} /* }}} int procevent_shutdown */
void module_register(void) {