Use a separate dequeue thread to dispatch notifications
authorAndrew Bays <abays@redhat.com>
Wed, 17 Oct 2018 16:44:22 +0000 (12:44 -0400)
committerAndrew Bays <andrew.bays@gmail.com>
Thu, 5 Sep 2019 13:21:22 +0000 (09:21 -0400)
src/collectd.conf.pod
src/connectivity.c
src/types.db

index 77ca09e..102955a 100644 (file)
@@ -1600,6 +1600,13 @@ LoadPlugin connectivity
   Interface eth1
 </Plugin>
 
+This example shows C<connectivity plugin> monitoring all interfaces except "eth1".
+LoadPlugin connectivity
+<Plugin connectivity>
+  Interface eth1
+  IgnoreSelected true
+</Plugin>
+
 =over 4
 
 =item B<Interface> I<interface_name>
index e39bce1..f6356f0 100644 (file)
@@ -94,6 +94,7 @@
 /*
  * Private data types
  */
+
 struct interface_list_s {
   char *interface;
 
@@ -109,23 +110,37 @@ typedef struct interface_list_s interface_list_t;
 /*
  * Private variables
  */
+
 static ignorelist_t *ignorelist = NULL;
 
 static interface_list_t *interface_list_head = NULL;
 static int monitor_all_interfaces = 1;
 
-static int connectivity_thread_loop = 0;
-static int connectivity_thread_error = 0;
-static pthread_t connectivity_thread_id;
+static int connectivity_netlink_thread_loop = 0;
+static int connectivity_netlink_thread_error = 0;
+static pthread_t connectivity_netlink_thread_id;
+static int connectivity_dequeue_thread_loop = 0;
+static int connectivity_dequeue_thread_error = 0;
+static pthread_t connectivity_dequeue_thread_id;
 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
-static struct mnl_socket *sock;
+// static struct mnl_socket *sock;
+static int nl_sock = -1;
 static int event_id = 0;
 
-static const char *config_keys[] = {"Interface"};
+static const char *config_keys[] = {"Interface", "IgnoreSelected"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
 
 /*
+ * Prototype
+ */
+
+static void
+connectivity_dispatch_notification(const char *interface, const char *type,
+                                   gauge_t value, gauge_t old_value,
+                                   long long unsigned int timestamp);
+
+/*
  * Private functions
  */
 
@@ -517,151 +532,326 @@ static int msg_handler(struct nlmsghdr *msg) {
   return 0;
 }
 
-static int read_event(struct mnl_socket *nl,
-                      int (*msg_handler)(struct nlmsghdr *)) {
+// static int read_event(struct mnl_socket *nl,
+//                       int (*msg_handler)(struct nlmsghdr *)) {
+static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
   int status;
   int ret = 0;
   char buf[4096];
   struct nlmsghdr *h;
+  int recv_flags = MSG_DONTWAIT;
 
-  if (nl == NULL)
+  // if (nl == NULL)
+  //   return ret;
+
+  if (nl == -1)
     return ret;
 
-  status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
+  while (42) {
+    pthread_mutex_lock(&connectivity_lock);
 
-  if (status < 0) {
-    /* Socket non-blocking so bail out once we have read everything */
-    if (errno == EWOULDBLOCK || errno == EAGAIN)
+    if (connectivity_netlink_thread_loop <= 0) {
+      pthread_mutex_unlock(&connectivity_lock);
       return ret;
+    }
 
-    /* Anything else is an error */
-    ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
-          status);
-    return status;
-  }
+    pthread_mutex_unlock(&connectivity_lock);
 
-  if (status == 0) {
-    DEBUG("connectivity plugin: read_event: EOF\n");
-  }
+    status = recv(nl, buf, sizeof(buf), recv_flags);
 
-  /* We need to handle more than one message per 'recvmsg' */
-  for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
-       h = NLMSG_NEXT(h, status)) {
-    /* Finish reading */
-    if (h->nlmsg_type == NLMSG_DONE)
-      return ret;
+    if (status < 0) {
+
+      // If there were no more messages to drain from the socket,
+      // then signal the dequeue thread and allow it to dispatch
+      // any saved interface status changes.  Then continue, but
+      // block and wait for new messages
+      if (errno == EWOULDBLOCK || errno == EAGAIN) {
+        pthread_mutex_lock(&connectivity_lock);
+        pthread_cond_signal(&connectivity_cond);
+        pthread_mutex_unlock(&connectivity_lock);
+
+        recv_flags = 0;
+        continue;
+      }
 
-    /* Message is some kind of error */
-    if (h->nlmsg_type == NLMSG_ERROR) {
-      ERROR("connectivity plugin: read_event: Message is an error\n");
-      return -1; // Error
+      /* Anything else is an error */
+      // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
+      // %d\n",
+      //       status);
+      ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
+      return status;
     }
 
-    /* Call message handler */
-    if (msg_handler) {
-      ret = (*msg_handler)(h);
-      if (ret < 0) {
-        ERROR("connectivity plugin: read_event: Message handler error %d\n",
-              ret);
+    // Message received successfully, so we'll stop blocking on the
+    // receive call for now (until we get a "would block" error, which
+    // will be handled above)
+    recv_flags = MSG_DONTWAIT;
+
+    if (status == 0) {
+      DEBUG("connectivity plugin: read_event: EOF\n");
+    }
+
+    /* We need to handle more than one message per 'recvmsg' */
+    for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
+         h = NLMSG_NEXT(h, status)) {
+      /* Finish reading */
+      if (h->nlmsg_type == NLMSG_DONE)
         return ret;
+
+      /* Message is some kind of error */
+      if (h->nlmsg_type == NLMSG_ERROR) {
+        ERROR("connectivity plugin: read_event: Message is an error\n");
+        return -1; // Error
+      }
+
+      /* Call message handler */
+      if (msg_handler) {
+        ret = (*msg_handler)(h);
+        if (ret < 0) {
+          ERROR("connectivity plugin: read_event: Message handler error %d\n",
+                ret);
+          return ret;
+        }
+      } else {
+        ERROR("connectivity plugin: read_event: Error NULL message handler\n");
+        return -1;
       }
-    } else {
-      ERROR("connectivity plugin: read_event: Error NULL message handler\n");
-      return -1;
     }
   }
 
   return ret;
 }
 
-static void *connectivity_thread(void *arg) /* {{{ */
+static void send_interface_status() {
+  for (interface_list_t *il = interface_list_head; il != NULL;
+       il = il->next) /* {{{ */
+  {
+    uint32_t status;
+    uint32_t prev_status;
+    uint32_t sent;
+
+    status = il->status;
+    prev_status = il->prev_status;
+    sent = il->sent;
+
+    if (status != prev_status && sent == 0) {
+      connectivity_dispatch_notification(il->interface, "gauge", status,
+                                         prev_status, il->timestamp);
+      il->sent = 1;
+    }
+  } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+}
+
+static int read_interface_status() /* {{{ */
 {
   pthread_mutex_lock(&connectivity_lock);
 
-  while (connectivity_thread_loop > 0) {
+  // This first attempt is necessary because the netlink thread
+  // might have held the lock while this thread was blocked on
+  // the lock acquisition just above.  And while the netlink thread
+  // had the lock, it could have called pthread_cond_singal, which
+  // obviously wouldn't have woken this thread, since this thread
+  // was not yet waiting on the condition signal.  So we need to
+  // loop through the interfaces and check if any have changed
+  // status before we wait on the condition signal
+  send_interface_status();
+
+  pthread_cond_wait(&connectivity_cond, &connectivity_lock);
+
+  send_interface_status();
+
+  pthread_mutex_unlock(&connectivity_lock);
+
+  return 0;
+} /* }}} int *read_interface_status */
+
+static void *connectivity_netlink_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_lock);
+
+  while (connectivity_netlink_thread_loop > 0) {
+    int status;
+
+    pthread_mutex_unlock(&connectivity_lock);
+
+    status = read_event(nl_sock, msg_handler);
+
+    pthread_mutex_lock(&connectivity_lock);
+
+    if (status < 0) {
+      connectivity_netlink_thread_error = 1;
+      break;
+    }
+
+    if (connectivity_netlink_thread_loop <= 0)
+      break;
+  } /* while (connectivity_netlink_thread_loop > 0) */
+
+  pthread_mutex_unlock(&connectivity_lock);
+
+  return ((void *)0);
+} /* }}} void *connectivity_netlink_thread */
+
+static void *connectivity_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_lock);
+
+  while (connectivity_dequeue_thread_loop > 0) {
     int status;
 
     pthread_mutex_unlock(&connectivity_lock);
 
-    status = read_event(sock, msg_handler);
+    status = read_interface_status();
 
     pthread_mutex_lock(&connectivity_lock);
 
     if (status < 0) {
-      connectivity_thread_error = 1;
+      connectivity_dequeue_thread_error = 1;
       break;
     }
 
-    if (connectivity_thread_loop <= 0)
+    if (connectivity_dequeue_thread_loop <= 0)
       break;
-  } /* while (connectivity_thread_loop > 0) */
+  } /* while (connectivity_dequeue_thread_loop > 0) */
 
   pthread_mutex_unlock(&connectivity_lock);
 
   return ((void *)0);
-} /* }}} void *connectivity_thread */
+} /* }}} void *connectivity_dequeue_thread */
+
+static int nl_connect() {
+  int rc;
+  struct sockaddr_nl sa_nl;
+
+  nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
+  if (nl_sock == -1) {
+    ERROR("connectivity plugin: socket open failed: %d", errno);
+    return -1;
+  }
+
+  sa_nl.nl_family = AF_NETLINK;
+  sa_nl.nl_groups = RTMGRP_LINK;
+  sa_nl.nl_pid = getpid();
+
+  rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+  if (rc == -1) {
+    ERROR("connectivity plugin: socket bind failed: %d", errno);
+    close(nl_sock);
+    return -1;
+  }
+
+  return 0;
+}
 
-static int start_thread(void) /* {{{ */
+static int start_netlink_thread(void) /* {{{ */
 {
   int status;
 
   pthread_mutex_lock(&connectivity_lock);
 
-  if (connectivity_thread_loop != 0) {
+  if (connectivity_netlink_thread_loop != 0) {
     pthread_mutex_unlock(&connectivity_lock);
     return (0);
   }
 
-  connectivity_thread_loop = 1;
-  connectivity_thread_error = 0;
+  connectivity_netlink_thread_loop = 1;
+  connectivity_netlink_thread_error = 0;
 
-  if (sock == NULL) {
-    sock = mnl_socket_open(NETLINK_ROUTE);
-    if (sock == NULL) {
-      ERROR(
-          "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
-      pthread_mutex_unlock(&connectivity_lock);
-      return (-1);
-    }
+  if (nl_sock == -1) {
+    status = nl_connect();
 
-    if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
-      ERROR(
-          "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
-      pthread_mutex_unlock(&connectivity_lock);
-      return (1);
-    }
+    if (status != 0)
+      return status;
   }
 
-  status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
-                                connectivity_thread,
+  status = plugin_thread_create(&connectivity_netlink_thread_id,
+                                /* attr = */ NULL, connectivity_netlink_thread,
                                 /* arg = */ (void *)0, "connectivity");
   if (status != 0) {
-    connectivity_thread_loop = 0;
+    connectivity_netlink_thread_loop = 0;
     ERROR("connectivity plugin: Starting thread failed.");
     pthread_mutex_unlock(&connectivity_lock);
-    mnl_socket_close(sock);
+
+    int status2 = close(nl_sock);
+
+    if (status2 != 0) {
+      ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+            status2, strerror(errno));
+    } else
+      nl_sock = -1;
+
     return (-1);
   }
 
   pthread_mutex_unlock(&connectivity_lock);
-  return (0);
-} /* }}} int start_thread */
 
-static int stop_thread(int shutdown) /* {{{ */
+  return status;
+}
+
+static int start_dequeue_thread(void) /* {{{ */
 {
   int status;
 
-  if (sock != NULL)
-    mnl_socket_close(sock);
+  pthread_mutex_lock(&connectivity_lock);
+
+  if (connectivity_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&connectivity_lock);
+    return (0);
+  }
+
+  connectivity_dequeue_thread_loop = 1;
+  connectivity_dequeue_thread_error = 0;
+
+  status = plugin_thread_create(&connectivity_dequeue_thread_id,
+                                /* attr = */ NULL, connectivity_dequeue_thread,
+                                /* arg = */ (void *)0, "connectivity");
+  if (status != 0) {
+    connectivity_dequeue_thread_loop = 0;
+    ERROR("connectivity plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&connectivity_lock);
+    return (-1);
+  }
+
+  pthread_mutex_unlock(&connectivity_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status, status2;
+
+  status = start_netlink_thread();
+  status2 = start_dequeue_thread();
+
+  if (status < 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+  int status;
+
+  if (nl_sock != -1) {
+    status = close(nl_sock);
+    if (status != 0) {
+      ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+            status, strerror(errno));
+      return (-1);
+    } else
+      nl_sock = -1;
+  }
 
   pthread_mutex_lock(&connectivity_lock);
 
-  if (connectivity_thread_loop == 0) {
+  if (connectivity_netlink_thread_loop == 0) {
     pthread_mutex_unlock(&connectivity_lock);
     return (-1);
   }
 
-  connectivity_thread_loop = 0;
+  connectivity_netlink_thread_loop = 0;
   pthread_cond_broadcast(&connectivity_cond);
   pthread_mutex_unlock(&connectivity_lock);
 
@@ -669,7 +859,7 @@ static int stop_thread(int shutdown) /* {{{ */
     // Since the thread is blocking, calling pthread_join
     // doesn't actually succeed in stopping it.  It will stick around
     // until a NETLINK message is received on the socket (at which
-    // it will realize that "connectivity_thread_loop" is 0 and will
+    // it will realize that "connectivity_netlink_thread_loop" is 0 and will
     // break out of the read loop and be allowed to die).  This is
     // fine when the process isn't supposed to be exiting, but in
     // the case of a process shutdown, we don't want to have an
@@ -677,31 +867,96 @@ static int stop_thread(int shutdown) /* {{{ */
     // the case of a shutdown is just assures that the thread is
     // gone and that the process has been fully terminated.
 
-    DEBUG("connectivity plugin: Canceling thread for process shutdown");
+    DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
 
-    status = pthread_cancel(connectivity_thread_id);
+    status = pthread_cancel(connectivity_netlink_thread_id);
 
-    if (status != 0) {
-      ERROR("connectivity plugin: Unable to cancel thread: %d", status);
+    if (status != 0 && status != ESRCH) {
+      ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
       status = -1;
-    }
+    } else
+      status = 0;
   } else {
-    status = pthread_join(connectivity_thread_id, /* return = */ NULL);
-    if (status != 0) {
-      ERROR("connectivity plugin: Stopping thread failed.");
+    status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("connectivity plugin: Stopping netlink thread failed.");
       status = -1;
-    }
+    } else
+      return 0;
+  }
+
+  pthread_mutex_lock(&connectivity_lock);
+  memset(&connectivity_netlink_thread_id, 0,
+         sizeof(connectivity_netlink_thread_id));
+  connectivity_netlink_thread_error = 0;
+  pthread_mutex_unlock(&connectivity_lock);
+
+  DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
+
+  return status;
+}
+
+static int stop_dequeue_thread(int shutdown) /* {{{ */
+{
+  int status;
+
+  pthread_mutex_lock(&connectivity_lock);
+
+  if (connectivity_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&connectivity_lock);
+    return (-1);
+  }
+
+  connectivity_dequeue_thread_loop = 0;
+  pthread_cond_broadcast(&connectivity_cond);
+  pthread_mutex_unlock(&connectivity_lock);
+
+  if (shutdown == 1) {
+    // Calling pthread_cancel here in
+    // the case of a shutdown just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
+
+    status = pthread_cancel(connectivity_dequeue_thread_id);
+
+    if (status != 0 && status != ESRCH) {
+      ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
+      status = -1;
+    } else
+      status = 0;
+  } else {
+    status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("connectivity plugin: Stopping dequeue thread failed.");
+      status = -1;
+    } else
+      status = 0;
   }
 
   pthread_mutex_lock(&connectivity_lock);
-  memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
-  connectivity_thread_error = 0;
+  memset(&connectivity_dequeue_thread_id, 0,
+         sizeof(connectivity_dequeue_thread_id));
+  connectivity_dequeue_thread_error = 0;
   pthread_mutex_unlock(&connectivity_lock);
 
-  DEBUG("connectivity plugin: Finished requesting stop of thread");
+  DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
 
   return (status);
-} /* }}} int stop_thread */
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads(int shutdown) /* {{{ */
+{
+  int status, status2;
+
+  status = stop_netlink_thread(shutdown);
+  status2 = stop_dequeue_thread(shutdown);
+
+  if (status < 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
 
 static int connectivity_init(void) /* {{{ */
 {
@@ -710,7 +965,7 @@ static int connectivity_init(void) /* {{{ */
            "be monitored");
   }
 
-  return (start_thread());
+  return (start_threads());
 } /* }}} int connectivity_init */
 
 static int connectivity_config(const char *key, const char *value) /* {{{ */
@@ -722,6 +977,11 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
   if (strcasecmp(key, "Interface") == 0) {
     ignorelist_add(ignorelist, value);
     monitor_all_interfaces = 0;
+  } else if (strcasecmp(key, "IgnoreSelected") == 0) {
+    int invert = 1;
+    if (IS_TRUE(value))
+      invert = 0;
+    ignorelist_set_invert(ignorelist, invert);
   } else {
     return (-1);
   }
@@ -729,9 +989,10 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
   return (0);
 } /* }}} int connectivity_config */
 
-static void connectivity_dispatch_notification(
-    const char *interface, const char *type, /* {{{ */
-    gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
+static void
+connectivity_dispatch_notification(const char *interface, const char *type,
+                                   gauge_t value, gauge_t old_value,
+                                   long long unsigned int timestamp) {
   char *buf = NULL;
   notification_t n = {
       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
@@ -777,11 +1038,16 @@ static void connectivity_dispatch_notification(
 
 static int connectivity_read(void) /* {{{ */
 {
-  if (connectivity_thread_error != 0) {
-    ERROR("connectivity plugin: The interface thread had a problem. Restarting "
+  pthread_mutex_lock(&connectivity_lock);
+
+  if (connectivity_netlink_thread_error != 0) {
+
+    pthread_mutex_unlock(&connectivity_lock);
+
+    ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
           "it.");
 
-    stop_thread(0);
+    stop_netlink_thread(0);
 
     for (interface_list_t *il = interface_list_head; il != NULL;
          il = il->next) {
@@ -790,32 +1056,26 @@ static int connectivity_read(void) /* {{{ */
       il->sent = 0;
     }
 
-    start_thread();
+    start_netlink_thread();
 
     return (-1);
-  } /* if (connectivity_thread_error != 0) */
+  } /* if (connectivity_netlink_thread_error != 0) */
 
-  for (interface_list_t *il = interface_list_head; il != NULL;
-       il = il->next) /* {{{ */
-  {
-    uint32_t status;
-    uint32_t prev_status;
-    uint32_t sent;
+  if (connectivity_dequeue_thread_error != 0) {
 
-    pthread_mutex_lock(&connectivity_lock);
+    pthread_mutex_unlock(&connectivity_lock);
 
-    status = il->status;
-    prev_status = il->prev_status;
-    sent = il->sent;
+    ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
+          "it.");
 
-    if (status != prev_status && sent == 0) {
-      connectivity_dispatch_notification(il->interface, "gauge", status,
-                                         prev_status, il->timestamp);
-      il->sent = 1;
-    }
+    stop_dequeue_thread(0);
 
-    pthread_mutex_unlock(&connectivity_lock);
-  } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+    start_dequeue_thread();
+
+    return (-1);
+  } /* if (connectivity_dequeue_thread_error != 0) */
+
+  pthread_mutex_unlock(&connectivity_lock);
 
   return (0);
 } /* }}} int connectivity_read */
@@ -825,7 +1085,7 @@ static int connectivity_shutdown(void) /* {{{ */
   interface_list_t *il;
 
   DEBUG("connectivity plugin: Shutting down thread.");
-  if (stop_thread(1) < 0)
+  if (stop_threads(1) < 0)
     return (-1);
 
   il = interface_list_head;
index 51b80d5..6c08936 100644 (file)
@@ -33,7 +33,6 @@ compression             uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
 compression_ratio       value:GAUGE:0:2
 commands                value:DERIVE:0:U
 connections             value:DERIVE:0:U
-connectivity            value:GAUGE:0:2
 conntrack               value:GAUGE:0:4294967295
 contextswitch           value:DERIVE:0:U
 cookies                 value:DERIVE:0:U