population_set_replacement_method: New function.
[libpopulation.git] / src / libpopulation.c
index c5d3c4e..c27fd9b 100644 (file)
@@ -1,6 +1,6 @@
 /**
- * libevolve - src/evolve.c
- * Copyright (C) 2008 Florian octo Forster
+ * libpopulation - src/evolve.c
+ * Copyright (C) 2008,2009  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -61,6 +61,7 @@
 #include "population.h"
 
 #include <stdlib.h>
+#include <assert.h>
 #include <errno.h>
 #include <stdint.h>
 #include <inttypes.h>
@@ -74,6 +75,9 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netdb.h>
+#include <signal.h>
+
+#define NETWORK_BUFFER_SIZE 1450
 
 /*
  * Data types
@@ -101,18 +105,49 @@ struct population_s
   int *peers;
   size_t peers_num;
 
+#define POPULATION_FLAG_LISTEN   0x01
+#define POPULATION_FLAG_SHUTDOWN 0x02
+#define POPULATION_FLAG_EXPLORE  0x10
+  int flags;
+  pthread_t listen_thread_id;
+
   individual_t fittest;
 
   individual_t *individuals;
   size_t individuals_num;
 };
 
+struct listen_thread_args_s
+{
+  population_t *population;
+  char *node;
+  char *service;
+};
+typedef struct listen_thread_args_s listen_thread_args_t;
+
 /*
  * Private functions
  */
+static char *population_strdup (const char *src)
+{
+  size_t s;
+  char *ret;
+
+  if (src == NULL)
+    return (NULL);
+
+  s = strlen (src) + 1;
+  ret = (char *) malloc (s);
+  if (ret == NULL)
+    return (NULL);
+
+  memcpy (ret, src, s);
+  return (ret);
+} /* char *population_strdup */
+
 static int population_send_to_peer (population_t *p, void *pi) /* {{{ */
 {
-  char buffer[1450];
+  char buffer[NETWORK_BUFFER_SIZE];
   size_t buffer_size;
   size_t buffer_free;
   char *buffer_ptr;
@@ -186,9 +221,132 @@ static int population_send_to_peer (population_t *p, void *pi) /* {{{ */
   }
 
   pthread_mutex_unlock (&p->lock);
+
+#if 0
+  printf ("population_send_to_peer: Sent individual with rating %i to peer #%i.\n",
+      p->rate (pi), i);
+#endif
+
   return (0);
 } /* }}} int population_send_to_peer */
 
+static void *listen_thread (void *data)
+{
+  listen_thread_args_t *args;
+  population_t *p;
+  char *node;
+  char *service;
+  int status;
+  int fd;
+
+  struct addrinfo  ai_hints;
+  struct addrinfo *ai_list;
+  struct addrinfo *ai_ptr;
+
+  args    = (listen_thread_args_t *) data;
+  p       = args->population;
+  node    = args->node;
+  service = args->service;
+
+  ai_list = NULL;
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = AI_PASSIVE;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_DGRAM;
+  ai_hints.ai_protocol = 0;
+
+  status = getaddrinfo (node, 
+      (service != NULL) ? service : POPULATION_DEFAULT_PORT,
+      &ai_hints, &ai_list);
+  if (status != 0)
+  {
+    fprintf (stderr, "listen_thread: getaddrinfo (%s) failed: %s\n",
+        (node != NULL) ? node : "NULL", gai_strerror (status));
+    return ((void *) -1);
+  }
+
+  fd = -1;
+  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+      continue;
+
+    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      close (fd);
+      fd = -1;
+      continue;
+    }
+
+    break;
+  }
+
+  freeaddrinfo (ai_list);
+
+  if (fd < 0)
+  {
+    fprintf (stderr, "listen_thread: No socket could be opened.\n");
+    return ((void *) -1);
+  }
+
+  pthread_mutex_lock (&p->lock);
+  p->flags |= POPULATION_FLAG_LISTEN;
+  while ((p->flags & POPULATION_FLAG_SHUTDOWN) == 0)
+  {
+    /* Allocate one extra byte to null-terminate the data. */
+    char buffer[NETWORK_BUFFER_SIZE + 1];
+    void *pi;
+
+    pthread_mutex_unlock (&p->lock);
+
+    status = recvfrom (fd, buffer, sizeof (buffer) - 1, /* flags = */ 0,
+        /* from = */ NULL, /* fromlen = */ NULL);
+    if (status < 1)
+    {
+      fprintf (stderr, "listen_thread: recvfrom(2) failed: status = %i; "
+          "errno = %i;\n", status, errno);
+      pthread_mutex_lock (&p->lock);
+      continue;
+    }
+    assert (status < sizeof (buffer));
+    buffer[sizeof (buffer) - 1] = 0;
+
+    pi = p->unserialize (buffer, (size_t) status);
+    if (pi == NULL)
+    {
+      fprintf (stderr, "listen_thread: p->unserialize returned NULL.\n");
+      pthread_mutex_lock (&p->lock);
+      continue;
+    }
+
+#if 0
+    printf ("listen_thread: Received individual with rating %i.\n",
+        p->rate (pi));
+#endif
+
+    population_insert (p, pi);
+
+    p->free (pi);
+
+    pthread_mutex_lock (&p->lock);
+  } /* while (42) */
+
+  close (fd);
+  fd = -1;
+
+  /* clear the listen flag */
+  p->flags &= ~(POPULATION_FLAG_LISTEN);
+
+  pthread_mutex_unlock (&p->lock);
+  return ((void *) 0);
+} /* void *listen_thread */
+
 /*
  * Constructor and destructor
  */
@@ -235,6 +393,16 @@ void population_destroy (population_t *p) /* {{{ */
   if (p == NULL)
     return;
 
+  pthread_mutex_lock (&p->lock);
+  p->flags |= POPULATION_FLAG_SHUTDOWN;
+  if ((p->flags & POPULATION_FLAG_LISTEN) != 0)
+  {
+    pthread_kill (p->listen_thread_id, SIGTERM);
+    pthread_mutex_unlock (&p->lock);
+    pthread_join (p->listen_thread_id, /* return = */ NULL);
+    pthread_mutex_lock (&p->lock);
+  }
+
   if (p->fittest.ptr != NULL)
     p->free (p->fittest.ptr);
   p->fittest.ptr = NULL;
@@ -307,8 +475,8 @@ int population_set_size (population_t *p, /* {{{ */
   return (0);
 } /* }}} */
 
-int population_set_serialization (population_t *p,
-    pi_serialize_f serialize, pi_unserialize_f unserialize) /* {{{ */
+int population_set_serialization (population_t *p, /* {{{ */
+    pi_serialize_f serialize, pi_unserialize_f unserialize)
 {
   if (p == NULL)
     return (-1);
@@ -322,6 +490,27 @@ int population_set_serialization (population_t *p,
   return (0);
 } /* }}} int population_set_serialization */
 
+int population_set_replacement_method (population_t *p, int method) /* {{{ */
+{
+  int status = 0;
+
+  if (p == NULL)
+    return (EINVAL);
+
+  pthread_mutex_lock (&p->lock);
+
+  if (method == POPULATION_REPLACEMENT_EXPLOIT)
+    p->flags &= ~POPULATION_FLAG_EXPLORE;
+  else if (method == POPULATION_REPLACEMENT_EXPLORE)
+    p->flags |= POPULATION_FLAG_EXPLORE;
+  else
+    status = EINVAL;
+
+  pthread_mutex_unlock (&p->lock);
+
+  return (0);
+} /* }}} int population_set_replacement_method */
+
 int population_add_peer (population_t *p, const char *node, /* {{{ */
     const char *port)
 {
@@ -394,6 +583,9 @@ int population_add_peer (population_t *p, const char *node, /* {{{ */
     }
 
     p->peers_num++;
+
+    printf ("population_add_peer: Successfully added peer #%i.\n",
+        p->peers_num - 1);
   }
   pthread_mutex_unlock (&p->lock);
 
@@ -402,6 +594,39 @@ int population_add_peer (population_t *p, const char *node, /* {{{ */
   return (0);
 } /* }}} int population_add_peer */
 
+int population_start_listen_thread (population_t *p, /* {{{ */
+    const char *node, const char *service)
+{
+  listen_thread_args_t *args;
+
+  pthread_mutex_lock (&p->lock);
+  if ((p->flags & POPULATION_FLAG_LISTEN) != 0)
+  {
+    pthread_mutex_unlock (&p->lock);
+    fprintf (stderr, "population_start_listen_thread: "
+        "Listen thread already started.\n");
+    return (-EALREADY);
+  }
+
+  args = (listen_thread_args_t *) malloc (sizeof (listen_thread_args_t));
+  if (args == NULL)
+  {
+    fprintf (stderr, "population_start_listen_thread: malloc failed.\n");
+    return (-1);
+  }
+
+  memset (args, 0, sizeof (listen_thread_args_t));
+  args->population = p;
+  args->node = population_strdup (node);
+  args->service = population_strdup (service);
+
+  pthread_create (&p->listen_thread_id, /* attr = */ NULL,
+      listen_thread, (void *) args);
+
+  pthread_mutex_unlock (&p->lock);
+  return (0);
+} /* }}} int population_start_listen_thread */
+
 void *population_get_random (population_t *p) /* {{{ */
 {
   void *ret = NULL;
@@ -456,7 +681,7 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
 {
   void *pi;
   int pi_rating;
-  int num_tries;
+  int sent_to_peer;
   int i;
 
   if (p == NULL)
@@ -472,25 +697,31 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
     return (-1);
   }
 
-  pi_rating = p->rate (pi);
-
-  pthread_mutex_lock (&p->lock);
-
+  /*
+   * With a small chance, send this individual to somewhere else.
+   * `sent_to_peer = -1' is used to signal the following code that this
+   * individual has been sent to somewhere else and doesn't go into the local
+   * population.
+   */
+  sent_to_peer = 0;
   if (p->peers_num > 0)
   {
     double prob;
 
     prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0);
-    if (prob < 0.01)
+    if (prob <= 0.001)
     {
-      pthread_mutex_unlock (&p->lock);
       population_send_to_peer (p, pi);
-      pthread_mutex_lock (&p->lock);
+      sent_to_peer = 1;
     }
   }
 
+  pi_rating = p->rate (pi);
+
+  pthread_mutex_lock (&p->lock);
+
   /* Keep track of the all time best. */
-  if ((p->fittest.ptr == NULL) || (p->fittest.rating > pi_rating))
+  if ((p->fittest.ptr == NULL) || (p->fittest.rating >= pi_rating))
   {
     void *temp;
 
@@ -504,18 +735,21 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
     }
   }
 
-  if (p->individuals_num <= 0)
+  if ((sent_to_peer != 0) || (p->individuals_num <= 0))
   {
     pthread_mutex_unlock (&p->lock);
     p->free (pi);
-    return (-1);
+    return (0);
   }
 
-  num_tries = (int) ceil (log (p->individuals_num) / log (2.0));
-  for (i = 0; i < num_tries; i++)
+  do
   {
     size_t j;
 
+    int chance_j;
+    int chance_pi;
+    int chance;
+
     j = (size_t) (((double) p->individuals_num) * (rand() / (RAND_MAX + 1.0)));
 
     if (p->individuals[j].ptr == NULL)
@@ -526,7 +760,20 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
       break;
     }
 
-    if (pi_rating < p->individuals[j].rating)
+    /* large distance from fittest => high probability of losing. */
+    chance_j = 1 + p->individuals[j].rating - p->fittest.rating;
+    chance_pi = 1 + pi_rating - p->fittest.rating;
+
+    chance_j = chance_j * chance_j;
+    chance_pi = chance_pi * chance_pi;
+
+    chance = (int) (((double) (chance_j + chance_pi))
+        * (rand() / (RAND_MAX + 1.0)));
+
+    if (p->flags & POPULATION_FLAG_EXPLORE)
+      chance *= .5;
+
+    if (chance < chance_j) /* j looses ;) */
     {
       void *temp0;
       int temp1;
@@ -539,15 +786,12 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
       p->individuals[j].rating = pi_rating;
       pi_rating = temp1;
     }
-  }
+  } while (0);
 
   pthread_mutex_unlock (&p->lock);
 
   if (pi != NULL)
-  {
     p->free (pi);
-    pi = NULL;
-  }
 
   return (0);
 } /* }}} int population_insert */