Drain netlink socket before sleeping, then filter repeated process start msgs
authorAndrew Bays <andrew.bays@gmail.com>
Fri, 12 Oct 2018 17:47:43 +0000 (13:47 -0400)
committerAndrew Bays <andrew.bays@gmail.com>
Fri, 12 Oct 2018 17:47:43 +0000 (13:47 -0400)
src/procevent.c

index aba6617..78c9182 100644 (file)
@@ -112,7 +112,8 @@ typedef struct {
 struct processlist_s {
   char *process;
 
-  uint32_t pid;
+  long pid;
+  int32_t last_status;
 
   struct processlist_s *next;
 };
@@ -345,8 +346,9 @@ static int gen_message_payload(int state, int pid, char *process,
     goto err;
 
   if (yajl_gen_string(
-          g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
-                                   : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
+                                : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
       yajl_gen_status_ok)
@@ -405,8 +407,9 @@ static int gen_message_payload(int state, int pid, char *process,
     goto err;
 
   if (yajl_gen_string(
-          g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
-                                   : PROCEVENT_VF_STATUS_NORMAL_VALUE),
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
+                                : PROCEVENT_VF_STATUS_NORMAL_VALUE),
           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
       yajl_gen_status_ok)
@@ -510,11 +513,17 @@ static processlist_t *process_check(int pid) {
 
       if (pl->pid == pid) {
         // this is a match, and we've already stored the exact pid/name combo
+        DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
+              "incoming PID %d",
+              pl->process, pl->pid, pid);
         match = pl;
         break;
       } else if (pl->pid == -1) {
         // this is a match, and we've found a candidate processlist_t to store
         // this new pid/name combo
+        DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
+              "PID %d",
+              pl->pid, pid);
         pl->pid = pid;
         match = pl;
         break;
@@ -522,6 +531,9 @@ static processlist_t *process_check(int pid) {
         // this is a match, but another instance of this process has already
         // claimed this pid/name combo,
         // so keep looking
+        DEBUG("procevent plugin: found pl object with matching name for "
+              "incoming PID %d, but object is in use by PID %ld",
+              pid, pl->pid);
         match = pl;
         continue;
       }
@@ -703,7 +715,7 @@ static int nl_connect() {
 
   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
   if (nl_sock == -1) {
-    ERROR("procevent plugin: socket open failed.");
+    ERROR("procevent plugin: socket open failed: %d", errno);
     return -1;
   }
 
@@ -713,7 +725,7 @@ static int nl_connect() {
 
   rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
   if (rc == -1) {
-    ERROR("procevent plugin: socket bind failed.");
+    ERROR("procevent plugin: socket bind failed: %d", errno);
     close(nl_sock);
     return -1;
   }
@@ -744,7 +756,8 @@ static int set_proc_ev_listen(bool enable) {
 
   rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
   if (rc == -1) {
-    ERROR("procevent plugin: subscribing to netlink process events failed.");
+    ERROR("procevent plugin: subscribing to netlink process events failed: %d",
+          errno);
     return -1;
   }
 
@@ -768,72 +781,82 @@ static int read_event() {
   if (nl_sock == -1)
     return ret;
 
-  status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+  while (42) {
 
-  if (status == 0) {
-    return 0;
-  } else if (status == -1) {
-    if (errno != EINTR) {
-      ERROR("procevent plugin: socket receive error: %d", errno);
-      return -1;
+    pthread_mutex_lock(&procevent_lock);
+
+    if (procevent_thread_loop <= 0)
+      return ret;
+
+    pthread_mutex_unlock(&procevent_lock);
+
+    status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+
+    if (status == 0) {
+      return 0;
+    } else if (status == -1) {
+      if (errno != EINTR) {
+        ERROR("procevent plugin: socket receive error: %d", errno);
+        return -1;
+      }
     }
-  }
 
-  switch (nlcn_msg.proc_ev.what) {
-  case PROC_EVENT_NONE:
-  case PROC_EVENT_FORK:
-  case PROC_EVENT_UID:
-  case PROC_EVENT_GID:
-    // Not of interest in current version
-    break;
-  case PROC_EVENT_EXEC:
-    proc_status = PROCEVENT_STARTED;
-    proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
-    break;
-  case PROC_EVENT_EXIT:
-    proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
-    proc_status = PROCEVENT_EXITED;
-    proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
-    break;
-  default:
-    break;
-  }
+    switch (nlcn_msg.proc_ev.what) {
+    case PROC_EVENT_NONE:
+    case PROC_EVENT_FORK:
+    case PROC_EVENT_UID:
+    case PROC_EVENT_GID:
+      // Not of interest in current version
+      break;
+    case PROC_EVENT_EXEC:
+      proc_status = PROCEVENT_STARTED;
+      proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
+      break;
+    case PROC_EVENT_EXIT:
+      proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
+      proc_status = PROCEVENT_EXITED;
+      proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
+      break;
+    default:
+      break;
+    }
 
-  // If we're interested in this process status event, place the event
-  // in the ring buffer for consumption by the main polling thread.
+    // If we're interested in this process status event, place the event
+    // in the ring buffer for consumption by the main polling thread.
 
-  if (proc_status != -1) {
-    pthread_mutex_lock(&procevent_lock);
+    if (proc_status != -1) {
+      pthread_mutex_lock(&procevent_lock);
 
-    int next = ring.head + 1;
-    if (next >= ring.maxLen)
-      next = 0;
+      int next = ring.head + 1;
+      if (next >= ring.maxLen)
+        next = 0;
 
-    if (next == ring.tail) {
-      WARNING("procevent plugin: ring buffer full");
-    } else {
-      DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
-            (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
-            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
-
-      if (proc_status == PROCEVENT_EXITED) {
-        ring.buffer[ring.head][0] = proc_id;
-        ring.buffer[ring.head][1] = proc_status;
-        ring.buffer[ring.head][2] = proc_extra;
-        ring.buffer[ring.head][3] =
-            (long long unsigned int)CDTIME_T_TO_US(cdtime());
+      if (next == ring.tail) {
+        WARNING("procevent plugin: ring buffer full");
       } else {
-        ring.buffer[ring.head][0] = proc_id;
-        ring.buffer[ring.head][1] = proc_status;
-        ring.buffer[ring.head][2] = 0;
-        ring.buffer[ring.head][3] =
-            (long long unsigned int)CDTIME_T_TO_US(cdtime());
+        DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
+              (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
+              (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+
+        if (proc_status == PROCEVENT_EXITED) {
+          ring.buffer[ring.head][0] = proc_id;
+          ring.buffer[ring.head][1] = proc_status;
+          ring.buffer[ring.head][2] = proc_extra;
+          ring.buffer[ring.head][3] =
+              (long long unsigned int)CDTIME_T_TO_US(cdtime());
+        } else {
+          ring.buffer[ring.head][0] = proc_id;
+          ring.buffer[ring.head][1] = proc_status;
+          ring.buffer[ring.head][2] = 0;
+          ring.buffer[ring.head][3] =
+              (long long unsigned int)CDTIME_T_TO_US(cdtime());
+        }
+
+        ring.head = next;
       }
 
-      ring.head = next;
+      pthread_mutex_unlock(&procevent_lock);
     }
-
-    pthread_mutex_unlock(&procevent_lock);
   }
 
   return ret;
@@ -1104,19 +1127,27 @@ static int procevent_read(void) /* {{{ */
               "list",
               pl->pid, pl->process);
         pl->pid = -1;
+        pl->last_status = -1;
       }
     } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
       // a new process has started, so check if we should monitor it
       processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
 
-      if (pl != NULL) {
+      // If we had already seen this process name and pid combo before,
+      // and the last message was a "process started" message, don't send
+      // the notfication again
+
+      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
         // This process is of interest to us, so publish its STARTED status
         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
                                         ring.buffer[ring.tail][1], pl->process,
                                         ring.buffer[ring.tail][3]);
-        DEBUG(
-            "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
-            pl->pid, pl->process);
+
+        pl->last_status = PROCEVENT_STARTED;
+
+        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
+              "list",
+              pl->pid, pl->process);
       }
     }