static int procevent_dequeue_thread_loop = 0;
static int procevent_dequeue_thread_error = 0;
static pthread_t procevent_dequeue_thread_id;
-static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
static int nl_sock = -1;
static int buffer_length;
static circbuf_t ring;
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
goto err;
- *buf = malloc(strlen((char *)buf2) + 1);
+ *buf = strdup((char *)buf2);
if (*buf == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: malloc failed during gen_message_payload: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
+ STRERRNO);
goto err;
}
- sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
-
yajl_gen_free(g);
return 0;
}
// Does /proc/<pid>/comm contain a process name we are interested in?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
static processlist_t *process_check(long pid) {
- int len, is_match, retval;
char file[BUFSIZE];
FILE *fh;
char buffer[BUFSIZE];
- len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
+ int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
if ((len < 0) || (len >= BUFSIZE)) {
WARNING("procevent process_check: process name too large");
return NULL;
}
- retval = fscanf(fh, "%[^\n]", buffer);
+ int retval = fscanf(fh, "%[^\n]", buffer);
if (retval < 0) {
WARNING("procevent process_check: unable to read comm file for pid %ld",
// associate <pid> with it (with the same process name as the existing).
//
- pthread_mutex_lock(&procevent_list_lock);
-
- processlist_t *pl;
processlist_t *match = NULL;
- for (pl = processlist_head; pl != NULL; pl = pl->next) {
+ for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
- is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
+ int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
if (is_match == 1) {
DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
"(%s)",
pid, buffer);
- processlist_t *pl2;
- char *process;
-
- pl2 = malloc(sizeof(*pl2));
+ processlist_t *pl2 = calloc(1, sizeof(*pl2));
if (pl2 == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: malloc failed during process_check: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- pthread_mutex_unlock(&procevent_list_lock);
+ ERROR("procevent plugin: calloc failed during process_check: %s",
+ STRERRNO);
return NULL;
}
- process = strdup(buffer);
+ char *process = strdup(buffer);
if (process == NULL) {
- char errbuf[1024];
sfree(pl2);
ERROR("procevent plugin: strdup failed during process_check: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- pthread_mutex_unlock(&procevent_list_lock);
+ STRERRNO);
return NULL;
}
match = pl2;
}
- pthread_mutex_unlock(&procevent_list_lock);
-
return match;
}
// Does our map have this PID or name?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
static processlist_t *process_map_check(long pid, char *process) {
- processlist_t *pl;
-
- pthread_mutex_lock(&procevent_list_lock);
-
- for (pl = processlist_head; pl != NULL; pl = pl->next) {
+ for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
int match_pid = 0;
int match_process = 0;
int match = 0;
match = 1;
if (match == 1) {
- pthread_mutex_unlock(&procevent_list_lock);
return pl;
}
}
- pthread_mutex_unlock(&procevent_list_lock);
-
return NULL;
}
static int process_map_refresh(void) {
- DIR *proc;
-
errno = 0;
- proc = opendir(PROCDIR);
+ DIR *proc = opendir(PROCDIR);
+
if (proc == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
return -1;
}
while (42) {
- struct dirent *dent;
- int len;
char file[BUFSIZE];
struct stat statbuf;
- int status;
-
errno = 0;
- dent = readdir(proc);
+ struct dirent *dent = readdir(proc);
if (dent == NULL) {
- char errbuf[4096];
-
if (errno == 0) /* end of directory */
break;
ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ STRERRNO);
closedir(proc);
return -1;
}
if (dent->d_name[0] == '.')
continue;
- len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
+ int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
if ((len < 0) || (len >= BUFSIZE))
continue;
- status = stat(file, &statbuf);
+ int status = stat(file, &statbuf);
if (status != 0) {
- char errbuf[4096];
- WARNING("procevent plugin: stat (%s) failed: %s", file,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
continue;
}
// Check if we need to store this pid/name combo in our processlist_t linked
// list
int this_pid = atoi(dent->d_name);
+ pthread_mutex_lock(&procevent_data_lock);
processlist_t *pl = process_check(this_pid);
+ pthread_mutex_unlock(&procevent_data_lock);
if (pl != NULL)
DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
}
static int nl_connect() {
- int rc;
- struct sockaddr_nl sa_nl;
+ struct sockaddr_nl sa_nl = {
+ .nl_family = AF_NETLINK, .nl_groups = CN_IDX_PROC, .nl_pid = getpid(),
+ };
nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
if (nl_sock == -1) {
return -1;
}
- sa_nl.nl_family = AF_NETLINK;
- sa_nl.nl_groups = CN_IDX_PROC;
- sa_nl.nl_pid = getpid();
-
- rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+ int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
if (rc == -1) {
ERROR("procevent plugin: socket bind failed: %d", errno);
close(nl_sock);
}
static int set_proc_ev_listen(bool enable) {
- int rc;
struct __attribute__((aligned(NLMSG_ALIGNTO))) {
struct nlmsghdr nl_hdr;
struct __attribute__((__packed__)) {
nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
- rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+ int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
if (rc == -1) {
ERROR("procevent plugin: subscribing to netlink process events failed: %d",
errno);
// Read from netlink socket and write to ring buffer
static int read_event() {
- int status;
- int ret = 0;
- int proc_id = -1;
- int proc_status = -1;
- int proc_extra = -1;
int recv_flags = MSG_DONTWAIT;
struct __attribute__((aligned(NLMSG_ALIGNTO))) {
struct nlmsghdr nl_hdr;
} nlcn_msg;
if (nl_sock == -1)
- return ret;
+ return 0;
while (42) {
+ int proc_id = -1;
+ int proc_status = -1;
+ int proc_extra = -1;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop <= 0) {
- pthread_mutex_unlock(&procevent_lock);
- return ret;
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return 0;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
+ int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
if (status == 0) {
return 0;
} else if (status < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_data_lock);
// There was nothing more to receive for now, so...
// If ring head does not equal ring tail, there is data
if (ring.head != ring.tail)
pthread_cond_signal(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
// Since there was nothing to receive, set recv to block and
// try again
// in the ring buffer for consumption by the main polling thread.
if (proc_status != -1) {
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_data_lock);
int next = ring.head + 1;
if (next >= ring.maxLen)
WARNING("procevent plugin: ring buffer full");
pthread_cond_signal(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
usleep(1000);
continue;
ring.head = next;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
}
}
- return ret;
+ return 0;
}
// Read from ring buffer and dispatch to write plugins
static int read_ring_buffer() {
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_data_lock);
// If there's currently nothing to read from the buffer,
// then wait
if (ring.head == ring.tail)
- pthread_cond_wait(&procevent_cond, &procevent_lock);
+ pthread_cond_wait(&procevent_cond, &procevent_data_lock);
while (ring.head != ring.tail) {
int next = ring.tail + 1;
ring.tail = next;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
return 0;
}
// to netlink socket and writing data to ring buffer
static void *procevent_netlink_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
while (procevent_netlink_thread_loop > 0) {
- int status;
-
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- usleep(1000);
+ int status = read_event();
- status = read_event();
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (status < 0) {
procevent_netlink_thread_error = 1;
break;
}
-
- if (procevent_netlink_thread_loop <= 0)
- break;
} /* while (procevent_netlink_thread_loop > 0) */
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return ((void *)0);
} /* }}} void *procevent_netlink_thread */
// ring buffer and dispatching notifications
static void *procevent_dequeue_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
while (procevent_dequeue_thread_loop > 0) {
- int status;
-
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- status = read_ring_buffer();
+ int status = read_ring_buffer();
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (status < 0) {
procevent_dequeue_thread_error = 1;
break;
}
-
- if (procevent_dequeue_thread_loop <= 0)
- break;
} /* while (procevent_dequeue_thread_loop > 0) */
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return ((void *)0);
} /* }}} void *procevent_dequeue_thread */
{
int status;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop != 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (0);
}
if (nl_sock == -1) {
status = nl_connect();
- if (status != 0)
+ if (status != 0) {
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
+ }
status = set_proc_ev_listen(true);
- if (status == -1)
+ if (status == -1) {
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
+ }
}
DEBUG("procevent plugin: socket created and bound");
if (status != 0) {
procevent_netlink_thread_loop = 0;
ERROR("procevent plugin: Starting netlink thread failed.");
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
+
+ int status2 = close(nl_sock);
+
+ if (status2 != 0) {
+ ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
+ status2, STRERRNO);
+ } else
+ nl_sock = -1;
+
return (-1);
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
} /* }}} int start_netlink_thread */
{
int status;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_dequeue_thread_loop != 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (0);
}
if (status != 0) {
procevent_dequeue_thread_loop = 0;
ERROR("procevent plugin: Starting dequeue thread failed.");
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (-1);
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
} /* }}} int start_dequeue_thread */
static int start_threads(void) /* {{{ */
{
- int status, status2;
+ int status = start_netlink_thread();
+ int status2 = start_dequeue_thread();
- status = start_netlink_thread();
- status2 = start_dequeue_thread();
-
- if (status < 0)
+ if (status != 0)
return status;
else
return status2;
static int stop_netlink_thread(int shutdown) /* {{{ */
{
- int status;
+ int socket_status, thread_status;
if (nl_sock != -1) {
- status = close(nl_sock);
- if (status != 0) {
+ socket_status = close(nl_sock);
+ if (socket_status != 0) {
ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
- status, strerror(errno));
+ socket_status, strerror(errno));
return (-1);
} else
nl_sock = -1;
- }
+ } else
+ socket_status = 0;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop == 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (-1);
}
procevent_netlink_thread_loop = 0;
+ pthread_mutex_unlock(&procevent_thread_lock);
+
pthread_cond_broadcast(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
if (shutdown == 1) {
// Calling pthread_cancel here in
DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(procevent_netlink_thread_id);
+ thread_status = pthread_cancel(procevent_netlink_thread_id);
- if (status != 0 && status != ESRCH) {
- ERROR("procevent plugin: Unable to cancel netlink thread: %d", status);
- status = -1;
+ if (thread_status != 0 && thread_status != ESRCH) {
+ ERROR("procevent plugin: Unable to cancel netlink thread: %d",
+ thread_status);
+ thread_status = -1;
} else
- status = 0;
+ thread_status = 0;
} else {
- status = pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
- if (status != 0 && status != ESRCH) {
+ thread_status =
+ pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+ if (thread_status != 0 && thread_status != ESRCH) {
ERROR("procevent plugin: Stopping netlink thread failed.");
- status = -1;
+ thread_status = -1;
} else
- status = 0;
+ thread_status = 0;
}
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
procevent_netlink_thread_error = 0;
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
DEBUG("procevent plugin: Finished requesting stop of netlink thread");
- return (status);
+ if (socket_status != 0)
+ return socket_status;
+ else
+ return thread_status;
} /* }}} int stop_netlink_thread */
static int stop_dequeue_thread(int shutdown) /* {{{ */
{
int status;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_dequeue_thread_loop == 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (-1);
}
procevent_dequeue_thread_loop = 0;
+ pthread_mutex_unlock(&procevent_thread_lock);
+
pthread_cond_broadcast(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
if (shutdown == 1) {
// Calling pthread_cancel here in
status = 0;
}
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
procevent_dequeue_thread_error = 0;
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
static int stop_threads(int shutdown) /* {{{ */
{
- int status, status2;
-
- status = stop_netlink_thread(shutdown);
- status2 = stop_dequeue_thread(shutdown);
+ int status = stop_netlink_thread(shutdown);
+ int status2 = stop_dequeue_thread(shutdown);
if (status < 0)
return status;
static int procevent_init(void) /* {{{ */
{
- int status;
-
ring.head = 0;
ring.tail = 0;
ring.maxLen = buffer_length;
- ring.buffer = (long long unsigned int **)malloc(
- buffer_length * sizeof(long long unsigned int *));
+ ring.buffer = (long long unsigned int **)calloc(
+ buffer_length, sizeof(long long unsigned int *));
for (int i = 0; i < buffer_length; i++) {
- ring.buffer[i] = (long long unsigned int *)malloc(
- PROCEVENT_FIELDS * sizeof(long long unsigned int));
+ ring.buffer[i] = (long long unsigned int *)calloc(
+ PROCEVENT_FIELDS, sizeof(long long unsigned int));
}
- status = process_map_refresh();
+ int status = process_map_refresh();
if (status == -1) {
ERROR("procevent plugin: Initial process mapping failed.");
static int procevent_config(const char *key, const char *value) /* {{{ */
{
- int status;
-
if (ignorelist == NULL)
ignorelist = ignorelist_create(/* invert = */ 1);
ignorelist_add(ignorelist, value);
} else if (strcasecmp(key, "ProcessRegex") == 0) {
#if HAVE_REGEX_H
- status = ignorelist_add(ignorelist, value);
+ int status = ignorelist_add(ignorelist, value);
if (status != 0) {
ERROR("procevent plugin: invalid regular expression: %s", value);
notification_meta_t *m = calloc(1, sizeof(*m));
if (m == NULL) {
- char errbuf[1024];
sfree(buf);
- ERROR("procevent plugin: unable to allocate metadata: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("procevent plugin: unable to allocate metadata: %s", STRERRNO);
return;
}
plugin_dispatch_notification(&n);
plugin_notification_meta_free(n.meta);
- // malloc'd in gen_message_payload
+ // strdup'd in gen_message_payload
if (buf != NULL)
sfree(buf);
}
static int procevent_read(void) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_error != 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
if (procevent_dequeue_thread_error != 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
ERROR("procevent plugin: The dequeue thread had a problem. Restarting it.");
return (-1);
} /* if (procevent_dequeue_thread_error != 0) */
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return (0);
} /* }}} int procevent_read */
static int procevent_shutdown(void) /* {{{ */
{
- int status;
- processlist_t *pl;
-
DEBUG("procevent plugin: Shutting down threads.");
- status = stop_threads(1);
+ int status = stop_threads(1);
for (int i = 0; i < buffer_length; i++) {
free(ring.buffer[i]);
free(ring.buffer);
- pl = processlist_head;
+ processlist_t *pl = processlist_head;
while (pl != NULL) {
processlist_t *pl_next;