From: Florian Forster Date: Sun, 1 Mar 2009 08:43:54 +0000 (+0100) Subject: src/plugin.c, network, rrdtool: Improved thread shutdown. X-Git-Tag: collectd-4.7.0~122 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=a261da68f819bec318f819238f7fe5c428d50af1;p=collectd.git src/plugin.c, network, rrdtool: Improved thread shutdown. Just some comments, info messages and not assigning zero to pthread_t. --- diff --git a/src/network.c b/src/network.c index c9a92ac5..7e559867 100644 --- a/src/network.c +++ b/src/network.c @@ -168,19 +168,27 @@ static pthread_mutex_t receive_list_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t receive_list_cond = PTHREAD_COND_INITIALIZER; static struct pollfd *listen_sockets = NULL; -static int listen_sockets_num = 0; - -static int listen_loop = 0; -static pthread_t receive_thread_id = 0; -static pthread_t dispatch_thread_id = 0; - -static char send_buffer[BUFF_SIZE]; -static char *send_buffer_ptr; -static int send_buffer_fill; -static value_list_t send_buffer_vl = VALUE_LIST_STATIC; -static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; - -static c_avl_tree_t *cache_tree = NULL; +static int listen_sockets_num = 0; + +/* The receive and dispatch threads will run as long as `listen_loop' is set to + * zero. */ +static int listen_loop = 0; +static int receive_thread_running = 0; +static pthread_t receive_thread_id; +static int dispatch_thread_running = 0; +static pthread_t dispatch_thread_id; + +/* Buffer in which to-be-sent network packets are constructed. */ +static char send_buffer[BUFF_SIZE]; +static char *send_buffer_ptr; +static int send_buffer_fill; +static value_list_t send_buffer_vl = VALUE_LIST_STATIC; +static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; + +/* In this cache we store all the values we received, so we can send out only + * those values which were *not* received via the network plugin, too. This is + * used for the `Forward false' option. */ +static c_avl_tree_t *cache_tree = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static time_t cache_flush_last = 0; static int cache_flush_interval = 1800; @@ -1329,6 +1337,10 @@ static int network_receive (void) return (-1); } + /* TODO: Possible performance enhancement: Do not free + * these entries in the dispatch thread but put them in + * another list, so we don't have to allocate more and + * more of these structures. */ ent = malloc (sizeof (receive_list_entry_t)); if (ent == NULL) { @@ -1709,16 +1721,23 @@ static int network_shutdown (void) listen_loop++; /* Kill the listening thread */ - if (receive_thread_id != (pthread_t) 0) + if (receive_thread_running != 0) { + INFO ("network plugin: Stopping receive thread."); pthread_kill (receive_thread_id, SIGTERM); pthread_join (receive_thread_id, NULL /* no return value */); - receive_thread_id = (pthread_t) 0; + memset (&receive_thread_id, 0, sizeof (receive_thread_id)); + receive_thread_running = 0; } /* Shutdown the dispatching thread */ - if (dispatch_thread_id != (pthread_t) 0) + if (dispatch_thread_running != 0) + { + INFO ("network plugin: Stopping dispatch thread."); + pthread_join (dispatch_thread_id, /* ret = */ NULL); pthread_cond_broadcast (&receive_list_cond); + dispatch_thread_running = 0; + } if (send_buffer_fill > 0) flush_buffer (); @@ -1775,10 +1794,15 @@ static int network_init (void) /* user_data = */ NULL); } - if ((listen_sockets_num != 0) && (receive_thread_id == 0)) + /* If no threads need to be started, return here. */ + if ((listen_sockets_num == 0) + || ((dispatch_thread_running != 0) + && (receive_thread_running != 0))) + return (0); + + if (dispatch_thread_running == 0) { int status; - status = pthread_create (&dispatch_thread_id, NULL /* no attributes */, dispatch_thread, @@ -1790,7 +1814,15 @@ static int network_init (void) sstrerror (errno, errbuf, sizeof (errbuf))); } + else + { + dispatch_thread_running = 1; + } + } + if (receive_thread_running == 0) + { + int status; status = pthread_create (&receive_thread_id, NULL /* no attributes */, receive_thread, @@ -1802,7 +1834,12 @@ static int network_init (void) sstrerror (errno, errbuf, sizeof (errbuf))); } + else + { + receive_thread_running = 1; + } } + return (0); } /* int network_init */ diff --git a/src/plugin.c b/src/plugin.c index d185ef2f..265bf46e 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -404,6 +404,8 @@ static void stop_read_threads (void) if (read_threads == NULL) return; + INFO ("collectd: Stopping %i read threads.", read_threads_num); + pthread_mutex_lock (&read_lock); read_loop = 0; DEBUG ("plugin: stop_read_threads: Signalling `read_cond'"); diff --git a/src/rrdtool.c b/src/rrdtool.c index 6ddbfc9e..b8885e8e 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -111,7 +111,8 @@ static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; static rrd_queue_t *flushq_head = NULL; static rrd_queue_t *flushq_tail = NULL; -static pthread_t queue_thread = 0; +static pthread_t queue_thread; +static int queue_thread_running = 1; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -973,14 +974,28 @@ static int rrd_shutdown (void) pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); + if ((queue_thread_running != 0) + && ((queue_head != NULL) || (flushq_head != NULL))) + { + INFO ("rrdtool plugin: Shutting down the queue thread. " + "This may take a while."); + } + else if (queue_thread_running != 0) + { + INFO ("rrdtool plugin: Shutting down the queue thread."); + } + /* Wait for all the values to be written to disk before returning. */ - if (queue_thread != 0) + if (queue_thread_running != 0) { pthread_join (queue_thread, NULL); - queue_thread = 0; + memset (&queue_thread, 0, sizeof (queue_thread)); + queue_thread_running = 0; DEBUG ("rrdtool plugin: queue_thread exited."); } + /* TODO: Maybe it'd be a good idea to free the cache here.. */ + return (0); } /* int rrd_shutdown */ @@ -1025,12 +1040,14 @@ static int rrd_init (void) pthread_mutex_unlock (&cache_lock); - status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL); + status = pthread_create (&queue_thread, /* attr = */ NULL, + rrd_queue_thread, /* args = */ NULL); if (status != 0) { ERROR ("rrdtool plugin: Cannot create queue-thread."); return (-1); } + queue_thread_running = 1; DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;",