From: Florian Forster Date: Wed, 23 Jan 2008 19:30:06 +0000 (+0100) Subject: network plugin: Have two different threads for handling incoming packets. X-Git-Tag: collectd-4.3.0beta0~10^2^2 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=5abaf1661442bfa9b7deac7e8e5a549d04a194d8;p=collectd.git network plugin: Have two different threads for handling incoming packets. One that only receives and enqueues packets and one which parses the packets and dispatches them to the daemon. This should solve problems with (too) short socket buffers and (very) heavy load. --- diff --git a/src/network.c b/src/network.c index 19344312..17038a45 100644 --- a/src/network.c +++ b/src/network.c @@ -136,6 +136,14 @@ struct part_values_s }; typedef struct part_values_s part_values_t; +struct receive_list_entry_s +{ + char data[BUFF_SIZE]; + int data_len; + struct receive_list_entry_s *next; +}; +typedef struct receive_list_entry_s receive_list_entry_t; + /* * Private variables */ @@ -154,10 +162,17 @@ static int network_config_forward = 0; static sockent_t *sending_sockets = NULL; +static receive_list_entry_t *receive_list_head = NULL; +static receive_list_entry_t *receive_list_tail = NULL; +static pthread_mutex_t receive_list_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t receive_list_cond = PTHREAD_COND_INITIALIZER; + static struct pollfd *listen_sockets = NULL; static int listen_sockets_num = 0; -static pthread_t listen_thread = 0; + static int listen_loop = 0; +static pthread_t receive_thread_id = 0; +static pthread_t dispatch_thread_id = 0; static char send_buffer[BUFF_SIZE]; static char *send_buffer_ptr; @@ -989,6 +1004,37 @@ static int network_add_sending_socket (const char *node, const char *service) return (0); } /* int network_get_listen_socket */ +static void *dispatch_thread (void *arg) +{ + while (42) + { + receive_list_entry_t *ent; + + /* Lock and wait for more data to come in */ + pthread_mutex_lock (&receive_list_lock); + while ((listen_loop == 0) + && (receive_list_head == NULL)) + pthread_cond_wait (&receive_list_cond, &receive_list_lock); + + /* Remove the head entry and unlock */ + ent = receive_list_head; + if (ent != NULL) + receive_list_head = ent->next; + pthread_mutex_unlock (&receive_list_lock); + + /* Check whether we are supposed to exit. We do NOT check `listen_loop' + * because we dispatch all missing packets before shutting down. */ + if (ent == NULL) + break; + + parse_packet (ent->data, ent->data_len); + + sfree (ent); + } /* while (42) */ + + return (NULL); +} /* void *receive_thread */ + static int network_receive (void) { char buffer[BUFF_SIZE]; @@ -1022,6 +1068,8 @@ static int network_receive (void) for (i = 0; (i < listen_sockets_num) && (status > 0); i++) { + receive_list_entry_t *ent; + if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0) continue; status--; @@ -1038,7 +1086,35 @@ static int network_receive (void) return (-1); } - parse_packet (buffer, buffer_len); + ent = malloc (sizeof (receive_list_entry_t)); + if (ent == NULL) + { + ERROR ("network plugin: malloc failed."); + return (-1); + } + memset (ent, '\0', sizeof (receive_list_entry_t)); + + /* Hopefully this be optimized out by the compiler. It + * might help prevent stupid bugs in the future though. + */ + assert (sizeof (ent->data) == sizeof (buffer)); + + memcpy (ent->data, buffer, buffer_len); + ent->data_len = buffer_len; + + pthread_mutex_lock (&receive_list_lock); + if (receive_list_head == NULL) + { + receive_list_head = ent; + receive_list_tail = ent; + } + else + { + receive_list_tail->next = ent; + receive_list_tail = ent; + } + pthread_cond_signal (&receive_list_cond); + pthread_mutex_unlock (&receive_list_lock); } /* for (listen_sockets) */ } /* while (listen_loop == 0) */ @@ -1284,13 +1360,18 @@ static int network_shutdown (void) { listen_loop++; - if (listen_thread != (pthread_t) 0) + /* Kill the listening thread */ + if (receive_thread_id != (pthread_t) 0) { - pthread_kill (listen_thread, SIGTERM); - pthread_join (listen_thread, NULL /* no return value */); - listen_thread = (pthread_t) 0; + pthread_kill (receive_thread_id, SIGTERM); + pthread_join (receive_thread_id, NULL /* no return value */); + receive_thread_id = (pthread_t) 0; } + /* Shutdown the dispatching thread */ + if (dispatch_thread_id != (pthread_t) 0) + pthread_cond_broadcast (&receive_list_cond); + if (send_buffer_fill > 0) flush_buffer (); @@ -1334,13 +1415,26 @@ static int network_init (void) if (sending_sockets != NULL) plugin_register_write ("network", network_write); - if ((listen_sockets_num != 0) && (listen_thread == 0)) + if ((listen_sockets_num != 0) && (receive_thread_id == 0)) { int status; - status = pthread_create (&listen_thread, NULL /* no attributes */, - receive_thread, NULL /* no argument */); + status = pthread_create (&dispatch_thread_id, + NULL /* no attributes */, + dispatch_thread, + NULL /* no argument */); + if (status != 0) + { + char errbuf[1024]; + ERROR ("network: pthread_create failed: %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + } + status = pthread_create (&receive_thread_id, + NULL /* no attributes */, + receive_thread, + NULL /* no argument */); if (status != 0) { char errbuf[1024];