From: Florian Forster Date: Wed, 11 Mar 2009 08:22:49 +0000 (+0100) Subject: src/libpopulation.c: Add the possibility to receive individuals from peers. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=9cd71de93b412bd24e5e5b512ec0b774f4123b9d;p=libpopulation.git src/libpopulation.c: Add the possibility to receive individuals from peers. --- diff --git a/src/libpopulation.c b/src/libpopulation.c index aa5b9dc..26345f8 100644 --- a/src/libpopulation.c +++ b/src/libpopulation.c @@ -61,6 +61,7 @@ #include "population.h" #include +#include #include #include #include @@ -74,6 +75,9 @@ #include #include #include +#include + +#define NETWORK_BUFFER_SIZE 1450 /* * Data types @@ -101,18 +105,48 @@ struct population_s int *peers; size_t peers_num; +#define POPULATION_FLAG_LISTEN 0x01 +#define POPULATION_FLAG_SHUTDOWN 0x02 + int flags; + pthread_t listen_thread_id; + individual_t fittest; individual_t *individuals; size_t individuals_num; }; +struct listen_thread_args_s +{ + population_t *population; + char *node; + char *service; +}; +typedef struct listen_thread_args_s listen_thread_args_t; + /* * Private functions */ +static char *population_strdup (const char *src) +{ + size_t s; + char *ret; + + if (src == NULL) + return (NULL); + + s = strlen (src) + 1; + ret = (char *) malloc (s); + if (ret == NULL) + return (NULL); + + memcpy (ret, src, s); + return (ret); +} /* char *population_strdup */ + static int population_send_to_peer (population_t *p, void *pi) /* {{{ */ { - char buffer[1450]; + char buffer[NETWORK_BUFFER_SIZE]; size_t buffer_size; size_t buffer_free; char *buffer_ptr; @@ -186,9 +220,132 @@ static int population_send_to_peer (population_t *p, void *pi) /* {{{ */ } pthread_mutex_unlock (&p->lock); + +#if 0 + printf ("population_send_to_peer: Sent individual with rating %i to peer #%i.\n", + p->rate (pi), i); +#endif + return (0); } /* }}} int population_send_to_peer */ +static void *listen_thread (void *data) +{ + listen_thread_args_t *args; + population_t *p; + char *node; + char *service; + int status; + int fd; + + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + + args = (listen_thread_args_t *) data; + p = args->population; + node = args->node; + service = args->service; + + ai_list = NULL; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = AI_PASSIVE; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = 0; + + status = getaddrinfo (node, + (service != NULL) ? service : POPULATION_DEFAULT_PORT, + &ai_hints, &ai_list); + if (status != 0) + { + fprintf (stderr, "listen_thread: getaddrinfo (%s) failed: %s\n", + (node != NULL) ? node : "NULL", gai_strerror (status)); + return ((void *) -1); + } + + fd = -1; + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + continue; + + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + close (fd); + fd = -1; + continue; + } + + break; + } + + freeaddrinfo (ai_list); + + if (fd < 0) + { + fprintf (stderr, "listen_thread: No socket could be opened.\n"); + return ((void *) -1); + } + + pthread_mutex_lock (&p->lock); + p->flags |= POPULATION_FLAG_LISTEN; + while ((p->flags & POPULATION_FLAG_SHUTDOWN) == 0) + { + /* Allocate one extra byte to null-terminate the data. */ + char buffer[NETWORK_BUFFER_SIZE + 1]; + void *pi; + + pthread_mutex_unlock (&p->lock); + + status = recvfrom (fd, buffer, sizeof (buffer) - 1, /* flags = */ 0, + /* from = */ NULL, /* fromlen = */ NULL); + if (status < 1) + { + fprintf (stderr, "listen_thread: recvfrom(2) failed: status = %i; " + "errno = %i;\n", status, errno); + pthread_mutex_lock (&p->lock); + continue; + } + assert (status < sizeof (buffer)); + buffer[sizeof (buffer) - 1] = 0; + + pi = p->unserialize (buffer, (size_t) status); + if (pi == NULL) + { + fprintf (stderr, "listen_thread: p->unserialize returned NULL.\n"); + pthread_mutex_lock (&p->lock); + continue; + } + +#if 0 + printf ("listen_thread: Received individual with rating %i.\n", + p->rate (pi)); +#endif + + population_insert (p, pi); + + p->free (pi); + + pthread_mutex_lock (&p->lock); + } /* while (42) */ + + close (fd); + fd = -1; + + /* clear the listen flag */ + p->flags &= ~(POPULATION_FLAG_LISTEN); + + pthread_mutex_unlock (&p->lock); + return ((void *) 0); +} /* void *listen_thread */ + /* * Constructor and destructor */ @@ -235,6 +392,16 @@ void population_destroy (population_t *p) /* {{{ */ if (p == NULL) return; + pthread_mutex_lock (&p->lock); + p->flags |= POPULATION_FLAG_SHUTDOWN; + if ((p->flags & POPULATION_FLAG_LISTEN) != 0) + { + pthread_kill (p->listen_thread_id, SIGTERM); + pthread_mutex_unlock (&p->lock); + pthread_join (p->listen_thread_id, /* return = */ NULL); + pthread_mutex_lock (&p->lock); + } + if (p->fittest.ptr != NULL) p->free (p->fittest.ptr); p->fittest.ptr = NULL; @@ -307,8 +474,8 @@ int population_set_size (population_t *p, /* {{{ */ return (0); } /* }}} */ -int population_set_serialization (population_t *p, - pi_serialize_f serialize, pi_unserialize_f unserialize) /* {{{ */ +int population_set_serialization (population_t *p, /* {{{ */ + pi_serialize_f serialize, pi_unserialize_f unserialize) { if (p == NULL) return (-1); @@ -394,6 +561,9 @@ int population_add_peer (population_t *p, const char *node, /* {{{ */ } p->peers_num++; + + printf ("population_add_peer: Successfully added peer #%i.\n", + p->peers_num - 1); } pthread_mutex_unlock (&p->lock); @@ -402,6 +572,39 @@ int population_add_peer (population_t *p, const char *node, /* {{{ */ return (0); } /* }}} int population_add_peer */ +int population_start_listen_thread (population_t *p, /* {{{ */ + const char *node, const char *service) +{ + listen_thread_args_t *args; + + pthread_mutex_lock (&p->lock); + if ((p->flags & POPULATION_FLAG_LISTEN) != 0) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_start_listen_thread: " + "Listen thread already started.\n"); + return (-EALREADY); + } + + args = (listen_thread_args_t *) malloc (sizeof (listen_thread_args_t)); + if (args == NULL) + { + fprintf (stderr, "population_start_listen_thread: malloc failed.\n"); + return (-1); + } + + memset (args, 0, sizeof (listen_thread_args_t)); + args->population = p; + args->node = population_strdup (node); + args->service = population_strdup (service); + + pthread_create (&p->listen_thread_id, /* attr = */ NULL, + listen_thread, (void *) args); + + pthread_mutex_unlock (&p->lock); + return (0); +} /* }}} int population_start_listen_thread */ + void *population_get_random (population_t *p) /* {{{ */ { void *ret = NULL; @@ -476,21 +679,8 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ pthread_mutex_lock (&p->lock); - if (p->peers_num > 0) - { - double prob; - - prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0); - if (prob < 0.01) - { - pthread_mutex_unlock (&p->lock); - population_send_to_peer (p, pi); - pthread_mutex_lock (&p->lock); - } - } - /* Keep track of the all time best. */ - if ((p->fittest.ptr == NULL) || (p->fittest.rating > pi_rating)) + if ((p->fittest.ptr == NULL) || (p->fittest.rating >= pi_rating)) { void *temp; @@ -533,6 +723,9 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ chance_j = 1 + p->individuals[j].rating - p->fittest.rating; chance_pi = 1 + pi_rating - p->fittest.rating; + chance_j = chance_j * chance_j; + chance_pi = chance_pi * chance_pi; + chance = (int) (((double) (chance_j + chance_pi)) * (rand() / (RAND_MAX + 1.0))); if (chance < chance_j) /* j looses ;) */ @@ -558,6 +751,29 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ pi = NULL; } + while (p->peers_num > 0) + { + double prob; + size_t j; + void *pi; + + prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0); + if (prob < 0.999) + break; + + pi = population_get_random (p); + if (pi == NULL) + { + fprintf (stderr, "population_insert: population_get_random failed.\n"); + break; + } + + population_send_to_peer (p, pi); + p->free (pi); + + break; + } + return (0); } /* }}} int population_insert */ diff --git a/src/population.h b/src/population.h index 64b3d8f..8dc7c73 100644 --- a/src/population.h +++ b/src/population.h @@ -24,7 +24,8 @@ typedef struct population_s population_t; /* * Constructor and destructor */ -population_t *population_create (pi_rate_f rate, pi_copy_f copy, pi_free_f f); +population_t *population_create (pi_rate_f rate, + pi_copy_f copy, pi_free_f f); void population_destroy (population_t *p); /* @@ -36,6 +37,8 @@ int population_set_serialization (population_t *p, #define POPULATION_DEFAULT_PORT "46835" int population_add_peer (population_t *p, const char *node, const char *port); +int population_start_listen_thread (population_t *p, + const char *node, const char *port); /* * Methods