memcached plugin: Refactor the memcached_query_daemon() function.
authorFlorian Forster <octo@collectd.org>
Mon, 3 Sep 2012 06:22:16 +0000 (08:22 +0200)
committerFlorian Forster <octo@collectd.org>
Mon, 3 Sep 2012 06:22:16 +0000 (08:22 +0200)
The connecting code has been broken out in separate functions and the
writing and reading from the socket no longer uses poll(2),
non-blocking I/O and a custom built retry logic. Instead block on I/O and
let the read-thread-pool do its thing.

src/memcached.c

index b96845e..2df2887 100644 (file)
@@ -1,7 +1,7 @@
 /**
  * collectd - src/memcached.c, based on src/hddtemp.c
  * Copyright (C) 2007       Antony Dovgal
- * Copyright (C) 2007-2010  Florian Forster
+ * Copyright (C) 2007-2012  Florian Forster
  * Copyright (C) 2009       Doug MacEachern
  * Copyright (C) 2009       Franck Lombardi
  * Copyright (C) 2012       Nicolas Szalay
 #include "plugin.h"
 #include "configfile.h"
 
-# include <poll.h>
-# include <netdb.h>
-# include <sys/socket.h>
-# include <sys/un.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
-
-/* Hack to work around the missing define in AIX */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 
 #define MEMCACHED_DEF_HOST "127.0.0.1"
 #define MEMCACHED_DEF_PORT "11211"
 
-#define MEMCACHED_RETRY_COUNT 100
-
 struct memcached_s
 {
   char *name;
@@ -74,177 +66,172 @@ static void memcached_free (memcached_t *st)
   sfree (st->port);
 }
 
-static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data)
+static int memcached_connect_unix (memcached_t *st)
 {
-  int fd = -1;
-  ssize_t status;
-  int buffer_fill;
-  int i = 0;
+  struct sockaddr_un serv_addr;
+  int fd;
 
-  memcached_t *st;
-  st = user_data->data;
-  if (st->socket != NULL)
+  memset (&serv_addr, 0, sizeof (serv_addr));
+  serv_addr.sun_family = AF_UNIX;
+  sstrncpy (serv_addr.sun_path, st->socket,
+      sizeof (serv_addr.sun_path));
+
+  /* create our socket descriptor */
+  fd = socket (AF_UNIX, SOCK_STREAM, 0);
+  if (fd < 0)
   {
-    struct sockaddr_un serv_addr;
-
-     memset (&serv_addr, 0, sizeof (serv_addr));
-     serv_addr.sun_family = AF_UNIX;
-     sstrncpy (serv_addr.sun_path, st->socket,
-     sizeof (serv_addr.sun_path));
-
-     /* create our socket descriptor */
-     fd = socket (AF_UNIX, SOCK_STREAM, 0);
-     if (fd < 0) {
-       char errbuf[1024];
-       ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
-       sizeof (errbuf)));
-       return -1;
-     }
+    char errbuf[1024];
+    ERROR ("memcached: memcached_connect_unix: socket(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    return (-1);
   }
-  else
-  {
-    const char *host;
-    const char *port;
 
-    struct addrinfo  ai_hints;
-    struct addrinfo *ai_list, *ai_ptr;
-    int              ai_return = 0;
+  return (fd);
+} /* int memcached_connect_unix */
+
+static int memcached_connect_inet (memcached_t *st)
+{
+  char *host;
+  char *port;
 
-    memset (&ai_hints, '\0', sizeof (ai_hints));
-    ai_hints.ai_flags    = 0;
+  struct addrinfo  ai_hints;
+  struct addrinfo *ai_list, *ai_ptr;
+  int status;
+  int fd = -1;
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags    = 0;
 #ifdef AI_ADDRCONFIG
-    ai_hints.ai_flags   |= AI_ADDRCONFIG;
+  ai_hints.ai_flags   |= AI_ADDRCONFIG;
 #endif
-    ai_hints.ai_family   = AF_UNSPEC;
-    ai_hints.ai_socktype = SOCK_STREAM;
-    ai_hints.ai_protocol = 0;
+  ai_hints.ai_family   = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+  ai_hints.ai_protocol = 0;
 
-    host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
-    port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
+  host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
+  port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
 
-    if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
+  ai_list = NULL;
+  status = getaddrinfo (host, port, &ai_hints, &ai_list);
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached: memcached_connect_inet: getaddrinfo(%s,%s) failed: %s",
+        host, port,
+        (status == EAI_SYSTEM)
+        ? sstrerror (errno, errbuf, sizeof (errbuf))
+        : gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    /* create our socket descriptor */
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
       char errbuf[1024];
-      ERROR ("memcached: getaddrinfo (%s, %s): %s",
-          host, port,
-          (ai_return == EAI_SYSTEM)
-          ? sstrerror (errno, errbuf, sizeof (errbuf))
-          : gai_strerror (ai_return));
-      return -1;
+      WARNING ("memcached: memcached_connect_inet: socket(2) failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      continue;
     }
 
-    for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
-      /* create our socket descriptor */
-      fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
-      if (fd < 0) {
-        char errbuf[1024];
-        ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
-        continue;
-      }
-
-      /* connect to the memcached daemon */
-      status = (ssize_t) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
-      if (status != 0) {
-        shutdown (fd, SHUT_RDWR);
-        close (fd);
-        fd = -1;
-        continue;
-      }
-
-      /* A socket could be opened and connecting succeeded. We're
-       * done. */
-      break;
+    /* connect to the memcached daemon */
+    status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      shutdown (fd, SHUT_RDWR);
+      close (fd);
+      fd = -1;
+      continue;
     }
 
-    freeaddrinfo (ai_list);
+    /* A socket could be opened and connecting succeeded. We're done. */
+    break;
   }
 
+  freeaddrinfo (ai_list);
+  return (fd);
+} /* int memcached_connect_inet */
+
+static int memcached_connect (memcached_t *st)
+{
+  if (st->socket != NULL)
+    return (memcached_connect_unix (st));
+  else
+    return (memcached_connect_inet (st));
+}
+
+static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
+{
+  int fd = -1;
+  int status;
+  size_t buffer_fill;
+
+  fd = memcached_connect (st);
   if (fd < 0) {
     ERROR ("memcached: Could not connect to daemon.");
     return -1;
   }
 
-  if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
-    ERROR ("memcached: Could not send command to the memcached daemon.");
-    return -1;
-  }
-
+  status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
+  if (status != 0)
   {
-    struct pollfd p;
-    int status;
-
-    memset (&p, 0, sizeof (p));
-    p.fd = fd;
-    p.events = POLLIN | POLLERR | POLLHUP;
-    p.revents = 0;
-
-    status = poll (&p, /* nfds = */ 1,
-        /* timeout = */ CDTIME_T_TO_MS (interval_g));
-    if (status <= 0)
-    {
-      if (status == 0)
-      {
-        ERROR ("memcached: poll(2) timed out after %.3f seconds.",
-            CDTIME_T_TO_DOUBLE (interval_g));
-      }
-      else
-      {
-        char errbuf[1024];
-        ERROR ("memcached: poll(2) failed: %s",
-            sstrerror (errno, errbuf, sizeof (errbuf)));
-      }
-      shutdown (fd, SHUT_RDWR);
-      close (fd);
-      return (-1);
-    }
+    char errbuf[1024];
+    ERROR ("memcached plugin: write(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    shutdown(fd, SHUT_RDWR);
+    close (fd);
+    return (-1);
   }
 
   /* receive data from the memcached daemon */
-  memset (buffer, '\0', buffer_size);
+  memset (buffer, 0, buffer_size);
 
   buffer_fill = 0;
-  while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
-    if (i > MEMCACHED_RETRY_COUNT) {
-      ERROR("recv() timed out");
-      break;
-    }
-    i++;
-
-    if (status == -1) {
+  while ((status = (int) recv (fd, buffer + buffer_fill,
+          buffer_size - buffer_fill, /* flags = */ 0)) != 0)
+  {
+    char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
+    if (status < 0)
+    {
       char errbuf[1024];
 
-      if (errno == EAGAIN) {
-        continue;
-      }
+      if ((errno == EAGAIN) || (errno == EINTR))
+          continue;
 
       ERROR ("memcached: Error reading from socket: %s",
           sstrerror (errno, errbuf, sizeof (errbuf)));
       shutdown(fd, SHUT_RDWR);
       close (fd);
-      return -1;
+      return (-1);
     }
-    buffer_fill += status;
 
-    if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
-      /* we got all the data */
+    buffer_fill += (size_t) status;
+    if (buffer_fill > buffer_size)
+    {
+      buffer_fill = buffer_size;
+      WARNING ("memcached plugin: Message was truncated.");
       break;
     }
-  }
 
-  if (buffer_fill >= buffer_size) {
-    buffer[buffer_size - 1] = '\0';
-    WARNING ("memcached: Message from memcached has been truncated.");
-  } else if (buffer_fill == 0) {
-    WARNING ("memcached: Peer has unexpectedly shut down the socket. "
-        "Buffer: `%s'", buffer);
-    shutdown(fd, SHUT_RDWR);
-    close(fd);
-    return -1;
+    /* If buffer ends in end_token, we have all the data. */
+    if (memcmp (buffer + buffer_fill - sizeof (end_token),
+          end_token, sizeof (end_token)) == 0)
+      break;
+  } /* while (recv) */
+
+  status = 0;
+  if (buffer_fill == 0)
+  {
+    WARNING ("memcached plugin: No data returned by memcached.");
+    status = -1;
   }
 
   shutdown(fd, SHUT_RDWR);
   close(fd);
-  return 0;
-}
+  return (status);
+} /* int memcached_query_daemon */
 
 static int memcached_add_read_callback (memcached_t *st)
 {
@@ -508,7 +495,7 @@ static int memcached_read (user_data_t *user_data)
   st = user_data->data;
 
   /* get data from daemon */
-  if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) {
+  if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
     return -1;
   }
 
@@ -645,7 +632,7 @@ static int memcached_read (user_data_t *user_data)
   }
 
   return 0;
-}
+} /* int memcached_read */
 
 void module_register (void)
 {