src/rrdd.c: A first take at implementing a cache.
authorFlorian Forster <octo@verplant.org>
Sat, 21 Jun 2008 13:44:43 +0000 (15:44 +0200)
committerFlorian Forster <octo@verplant.org>
Sat, 21 Jun 2008 13:44:43 +0000 (15:44 +0200)
This is WORK IN PROGRESS and not working correctly yet! Also in this commit:
Many other unstructured changes, since this we're still in the big bang phase
of this project ;)

src/rrdd.c

index 13fdc90..6625da3 100644 (file)
  *   Florian octo Forster <octo at verplant.org>
  **/
 
+#define RRDD_DEBUG 1
+
 #include "rrdd.h"
 
+#if RRDD_DEBUG
+# define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
+#else
+# define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
+#endif
+
 /*
  * Types
  */
@@ -31,6 +39,21 @@ struct listen_socket_s
 };
 typedef struct listen_socket_s listen_socket_t;
 
+struct cache_item_s;
+typedef struct cache_item_s cache_item_t;
+struct cache_item_s
+{
+  char *file;
+  char **values;
+  int values_num;
+  time_t last_flush_time;
+#define CI_FLAGS_IN_TREE  0x01
+#define CI_FLAGS_IN_QUEUE 0x02
+  int flags;
+
+  cache_item_t *next;
+};
+
 /*
  * Variables
  */
@@ -41,31 +64,352 @@ static int do_shutdown = 0;
 
 static pthread_t queue_thread;
 
+static pthread_t *connetion_threads = NULL;
+static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static int connetion_threads_num = 0;
+
+/* Cache stuff */
+static avl_tree_t     *cache_tree = NULL;
+static cache_item_t   *cache_queue_head = NULL;
+static cache_item_t   *cache_queue_tail = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
+
 /* 
  * Functions
  */
+static void sig_int_handler (int signal) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_int_handler */
+
+static void sig_term_handler (int signal) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_term_handler */
+
+static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
+{
+  cache_item_t *c0 = (cache_item_t *) v0;
+  cache_item_t *c1 = (cache_item_t *) v1;
+
+  assert (c0->file != NULL);
+  assert (c1->file != NULL);
+
+  return (strcmp (c0->file, c1->file));
+} /* }}} int cache_tree_compare */
+
+static void cache_tree_free (void *v) /* {{{ */
+{
+  cache_item_t *c = (cache_item_t *) v;
+
+  assert (c->values_num == 0);
+  assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
+  assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
+
+  free (c->file);
+  c->file = NULL;
+  free (c);
+} /* }}} void cache_tree_free */
+
 static void *queue_thread_main (void *args) /* {{{ */
 {
-  while (do_shutdown == 0)
+  pthread_mutex_lock (&cache_lock);
+  while ((do_shutdown == 0) || (cache_queue_head != NULL))
   {
-    syslog (LOG_DEBUG, "queue_thread_main: Just woke up.");
-    sleep (1);
+    cache_item_t *ci;
+
+    char *file;
+    char **values;
+    int values_num;
+    int i;
+
+    if (cache_queue_head == NULL)
+      pthread_cond_wait (&cache_cond, &cache_lock);
+
+    if (cache_queue_head == NULL)
+      continue;
+
+    ci = cache_queue_head;
+
+    /* copy the relevant parts */
+    file = strdup (ci->file);
+    if (file == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
+      continue;
+    }
+
+    values = ci->values;
+    values_num = ci->values_num;
+
+    ci->values = NULL;
+    ci->values_num = 0;
+
+    ci->last_flush_time = time (NULL);
+    ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+
+    cache_queue_head = ci->next;
+    if (cache_queue_head == NULL)
+      cache_queue_tail = NULL;
+    ci->next = NULL;
+
+    pthread_mutex_unlock (&cache_lock);
+
+    RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
+        file, values_num, (void *) values);
+
+    free (file);
+    for (i = 0; i < values_num; i++)
+      free (values[i]);
+
+    pthread_mutex_lock (&cache_lock);
   } /* while (do_shutdown == 0) */
+  pthread_mutex_unlock (&cache_lock);
 
-  syslog (LOG_DEBUG, "queue_thread_main: Exiting.");
+  RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
 
   return (NULL);
 } /* }}} void *queue_thread_main */
 
-static void sig_int_handler (int signal) /* {{{ */
+static int handle_request_update (int fd, /* {{{ */
+    char *buffer, int buffer_size)
 {
-  do_shutdown++;
-} /* }}} void sig_int_handler */
+  char *file;
+  char *value;
+  char *buffer_ptr;
+  int values_num = 0;
 
-static void sig_term_handler (int signal) /* {{{ */
+  time_t now;
+
+  avl_node_t *node;
+  cache_item_t ci_temp;
+  cache_item_t *ci;
+
+  char answer[4096];
+
+  now = time (NULL);
+
+  RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
+      fd, (void *) buffer, buffer_size);
+
+  buffer_ptr = buffer;
+
+  file = buffer_ptr;
+  buffer_ptr += strlen (file) + 1;
+
+  ci_temp.file = file;
+
+  pthread_mutex_lock (&cache_lock);
+
+  node = avl_search (cache_tree, (void *) &ci_temp);
+  if (node == NULL)
+  {
+    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
+    if (ci == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      return (-1);
+    }
+    memset (ci, 0, sizeof (cache_item_t));
+
+    ci->file = strdup (file);
+    if (ci->file == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      free (ci);
+      return (-1);
+    }
+
+    ci->values = NULL;
+    ci->values_num = 0;
+    ci->last_flush_time = now;
+    ci->flags = CI_FLAGS_IN_TREE;
+
+    if (avl_insert (cache_tree, (void *) ci) == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed.");
+      free (ci->file);
+      free (ci);
+      return (-1);
+    }
+
+    RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.",
+        ci->file);
+  }
+  else /* if (ci != NULL) */
+  {
+    ci = (cache_item_t *) node->item;
+  }
+  assert (ci != NULL);
+
+  while (*buffer_ptr != 0)
+  {
+    char **temp;
+
+    value = buffer_ptr;
+    buffer_ptr += strlen (value) + 1;
+
+    temp = (char **) realloc (ci->values,
+        sizeof (char *) * (ci->values_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+      continue;
+    }
+    ci->values = temp;
+
+    ci->values[ci->values_num] = strdup (value);
+    if (ci->values[ci->values_num] == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+      continue;
+    }
+    ci->values_num++;
+
+    values_num++;
+  }
+
+  /* FIXME: Timeout should not be hard-coded. */
+  if (((now - ci->last_flush_time) > 300)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
+  {
+    RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
+        ci->file);
+
+    assert (ci->next == NULL);
+
+    if (cache_queue_tail == NULL)
+      cache_queue_head = ci;
+    else
+      cache_queue_tail->next = ci;
+    cache_queue_tail = ci;
+
+    pthread_cond_signal (&cache_cond);
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+
+  snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
+  answer[sizeof (answer) - 1] = 0;
+
+  write (fd, answer, sizeof (answer));
+
+  return (0);
+} /* }}} int handle_request_update */
+
+static int handle_request (int fd) /* {{{ */
 {
-  do_shutdown++;
-} /* }}} void sig_term_handler */
+  char buffer[4096];
+  int buffer_size;
+
+  RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
+
+  buffer_size = read (fd, buffer, sizeof (buffer));
+  if (buffer_size < 1)
+  {
+    RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
+    return (-1);
+  }
+  assert (((size_t) buffer_size) <= sizeof (buffer));
+
+  if ((buffer[buffer_size - 2] != 0)
+      || (buffer[buffer_size - 1] != 0))
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
+    return (-1);
+  }
+
+  /* fields in the buffer a separated by null bytes. */
+  if (strcmp (buffer, "update") == 0)
+  {
+    int offset = strlen ("update") + 1;
+    return (handle_request_update (fd, buffer + offset,
+          buffer_size - offset));
+  }
+  else
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
+    return (-1);
+  }
+} /* }}} int handle_request */
+
+static void *connection_thread_main (void *args) /* {{{ */
+{
+  pthread_t self;
+  int i;
+  int fd;
+  
+  fd = *((int *) args);
+
+  while (do_shutdown == 0)
+  {
+    struct pollfd pollfd;
+    int status;
+
+    pollfd.fd = fd;
+    pollfd.events = POLLIN | POLLPRI;
+    pollfd.revents = 0;
+
+    status = poll (&pollfd, 1, /* timeout = */ 500);
+    if (status == 0) /* timeout */
+      continue;
+    else if (status < 0) /* error */
+    {
+      status = errno;
+      if (status == EINTR)
+        continue;
+      RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
+      continue;
+    }
+
+    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
+    {
+      close (fd);
+      break;
+    }
+    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
+    {
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned "
+          "something unexpected.");
+      close (fd);
+      break;
+    }
+
+    status = handle_request (fd);
+    if (status != 0)
+    {
+      close (fd);
+      break;
+    }
+  }
+
+  self = pthread_self ();
+  /* Remove this thread from the connection threads list */
+  pthread_mutex_lock (&connetion_threads_lock);
+  /* Find out own index in the array */
+  for (i = 0; i < connetion_threads_num; i++)
+    if (pthread_equal (connetion_threads[i], self) != 0)
+      break;
+  assert (i < connetion_threads_num);
+
+  /* Move the trailing threads forward. */
+  if (i < (connetion_threads_num - 1))
+  {
+    memmove (connetion_threads + i,
+        connetion_threads + i + 1,
+        sizeof (pthread_t) * (connetion_threads_num - i - 1));
+  }
+
+  connetion_threads_num--;
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  free (args);
+  return (NULL);
+} /* }}} void *connection_thread_main */
 
 static int open_listen_socket (const char *path) /* {{{ */
 {
@@ -78,7 +422,7 @@ static int open_listen_socket (const char *path) /* {{{ */
       sizeof (listen_fds[0]) * (listen_fds_num + 1));
   if (temp == NULL)
   {
-    syslog (LOG_ERR, "open_listen_socket: realloc failed.\n");
+    RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
     return (-1);
   }
   listen_fds = temp;
@@ -87,7 +431,7 @@ static int open_listen_socket (const char *path) /* {{{ */
   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
   if (fd < 0)
   {
-    syslog (LOG_ERR, "open_listen_socket: socket(2) failed.\n");
+    RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
     return (-1);
   }
 
@@ -98,8 +442,18 @@ static int open_listen_socket (const char *path) /* {{{ */
   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
   if (status != 0)
   {
-    syslog (LOG_ERR, "open_listen_socket: bind(2) failed.\n");
+    RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
     close (fd);
+    unlink (path);
+    return (-1);
+  }
+
+  status = listen (fd, /* backlog = */ 10);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
+    close (fd);
+    unlink (path);
     return (-1);
   }
   
@@ -132,41 +486,92 @@ static void *listen_thread_main (void *args) /* {{{ */
 {
   char buffer[4096];
   int status;
+  int i;
 
   status = open_listen_socket (RRDD_SOCK_PATH);
   if (status != 0)
   {
-    syslog (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
+    RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
     return (NULL);
   }
 
   while (do_shutdown == 0)
   {
-    syslog (LOG_DEBUG, "listen_thread_main: Just woke up.");
+    int *client_sd;
+    struct sockaddr_un client_sa;
+    socklen_t client_sa_size;
+    pthread_t tid;
 
-    status = recv (listen_fds[0].fd, buffer, sizeof (buffer),
-        /* flags = */ 0);
-    if (status == 0)
+    client_sd = (int *) malloc (sizeof (int));
+    if (client_sd == NULL)
     {
+      RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+      sleep (120);
       continue;
     }
-    else if (status < 0)
+
+    client_sa_size = sizeof (client_sa);
+    /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */
+    *client_sd = accept (listen_fds[0].fd,
+        (struct sockaddr *) &client_sa, &client_sa_size);
+    if (*client_sd < 0)
     {
-      if (errno == EINTR)
-        continue;
+      RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
+      continue;
+    }
+
+    RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
+        *client_sd);
+
+    status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
+        /* args = */ (void *) client_sd);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
+      close (*client_sd);
+      free (client_sd);
+      continue;
+    }
+
+    /* FIXME: bug: if the new thread is run first, it may run into the
+     *   assert (i < connetion_threads_num);
+     * assertion. */
+    pthread_mutex_lock (&connetion_threads_lock);
+    {
+      pthread_t *temp;
+
+      temp = (pthread_t *) realloc (connetion_threads,
+          sizeof (pthread_t) * (connetion_threads_num + 1));
+      if (temp == NULL)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: realloc failed.");
+      }
       else
       {
-        syslog (LOG_ERR, "listen_thread_main: recv failed.");
-        continue;
+        connetion_threads = temp;
+        connetion_threads[connetion_threads_num] = tid;
+        connetion_threads_num++;
       }
     }
-
-    syslog (LOG_DEBUG, "listen_thread_main: Received %i bytes.\n", status);
+    pthread_mutex_unlock (&connetion_threads_lock);
   } /* while (do_shutdown == 0) */
 
   close_listen_sockets ();
 
-  syslog (LOG_DEBUG, "listen_thread_main: Exiting.");
+  pthread_mutex_lock (&connetion_threads_lock);
+  while (connetion_threads_num > 0)
+  {
+    pthread_t wait_for;
+
+    wait_for = connetion_threads[0];
+
+    pthread_mutex_unlock (&connetion_threads_lock);
+    pthread_join (wait_for, /* retval = */ NULL);
+    pthread_mutex_lock (&connetion_threads_lock);
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
 
   return (NULL);
 } /* }}} void *listen_thread_main */
@@ -176,6 +581,7 @@ static int daemonize (void) /* {{{ */
   pid_t child;
   int status;
 
+#if !RRDD_DEBUG
   child = fork ();
   if (child < 0)
   {
@@ -201,6 +607,7 @@ static int daemonize (void) /* {{{ */
   open ("/dev/null", O_RDWR);
   dup (0);
   dup (0);
+#endif /* RRDD_DEBUG */
 
   {
     struct sigaction sa;
@@ -216,12 +623,19 @@ static int daemonize (void) /* {{{ */
 
   openlog ("rrdd", LOG_PID, LOG_DAEMON);
 
+  cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free);
+  if (cache_tree == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed.");
+    return (-1);
+  }
+
   memset (&queue_thread, 0, sizeof (queue_thread));
   status = pthread_create (&queue_thread, /* attr = */ NULL,
       queue_thread_main, /* args = */ NULL);
   if (status != 0)
   {
-    syslog (LOG_ERR, "daemonize: pthread_create failed.");
+    RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
     return (-1);
   }
 
@@ -230,6 +644,8 @@ static int daemonize (void) /* {{{ */
 
 static int cleanup (void) /* {{{ */
 {
+  RRDD_LOG (LOG_DEBUG, "cleanup ()");
+
   do_shutdown++;
 
   pthread_join (queue_thread, /* return = */ NULL);