X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fdaemon%2Fplugin.c;h=d54abeeae083fb32c4d53704acf475a115d66a02;hb=e81567f2a645fd15f4384ca9569119bc66c1412a;hp=d4b962fb5647cc7bdff4a0139554619d7b8c7d97;hpb=6f4f918d4d5e70c75471632254ecb9c55fd8d62f;p=collectd.git diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index d4b962fb..d54abeea 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -25,6 +25,9 @@ * Sebastian Harl **/ +/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */ +#define _GNU_SOURCE + #include "collectd.h" #include "common.h" @@ -39,6 +42,10 @@ #include "utils_time.h" #include "utils_random.h" +#if HAVE_PTHREAD_NP_H +# include /* for pthread_set_name_np(3) */ +#endif + #include /* @@ -108,7 +115,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; #ifndef DEFAULT_MAX_READ_INTERVAL -# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400) +# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC (86400) #endif static c_heap_t *read_heap = NULL; static llist_t *read_list; @@ -116,7 +123,7 @@ static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; -static int read_threads_num = 0; +static size_t read_threads_num = 0; static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL; static write_queue_t *write_queue_head; @@ -360,7 +367,7 @@ static void log_list_callbacks (llist_t **list, /* {{{ */ } /* }}} void log_list_callbacks */ static int create_register_callback (llist_t **list, /* {{{ */ - const char *name, void *callback, user_data_t *ud) + const char *name, void *callback, user_data_t const *ud) { callback_func_t *cf; @@ -522,12 +529,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) && (cdtime () < rf->rf_next_read) && rc == 0) { - struct timespec ts = { 0 }; - - CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts); - rc = pthread_cond_timedwait (&read_cond, &read_lock, - &ts); + &CDTIME_T_TO_TIMESPEC (rf->rf_next_read)); } /* Must hold `read_lock' when accessing `rf->rf_type'. */ @@ -648,7 +651,38 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) return ((void *) 0); } /* void *plugin_read_thread */ -static void start_read_threads (int num) +#ifdef PTHREAD_MAX_NAMELEN_NP +# define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP +#else +# define THREAD_NAME_MAX 16 +#endif + +static void set_thread_name(pthread_t tid, char const *name) { +#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP) + + /* glibc limits the length of the name and fails if the passed string + * is too long, so we truncate it here. */ + char n[THREAD_NAME_MAX]; + if (strlen (name) >= THREAD_NAME_MAX) + WARNING("set_thread_name(\"%s\"): name too long", name); + sstrncpy (n, name, sizeof(n)); + +#if defined(HAVE_PTHREAD_SETNAME_NP) + int status = pthread_setname_np (tid, n); + if (status != 0) + { + char errbuf[1024]; + ERROR ("set_thread_name(\"%s\"): %s", n, + sstrerror (status, errbuf, sizeof(errbuf))); + } +#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */ + pthread_set_name_np (tid, n); +#endif + +#endif +} + +static void start_read_threads (size_t num) /* {{{ */ { if (read_threads != NULL) return; @@ -661,27 +695,35 @@ static void start_read_threads (int num) } read_threads_num = 0; - for (int i = 0; i < num; i++) + for (size_t i = 0; i < num; i++) { - if (pthread_create (read_threads + read_threads_num, NULL, - plugin_read_thread, NULL) == 0) - { - read_threads_num++; - } - else + int status = pthread_create (read_threads + read_threads_num, + /* attr = */ NULL, + plugin_read_thread, + /* arg = */ NULL); + if (status != 0) { - ERROR ("plugin: start_read_threads: pthread_create failed."); + char errbuf[1024]; + ERROR ("plugin: start_read_threads: pthread_create failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); return; } + + char name[THREAD_NAME_MAX]; + ssnprintf (name, sizeof (name), "reader#%zu", read_threads_num); + set_thread_name (read_threads[read_threads_num], name); + + read_threads_num++; } /* for (i) */ -} /* void start_read_threads */ +} /* }}} void start_read_threads */ static void stop_read_threads (void) { if (read_threads == NULL) return; - INFO ("collectd: Stopping %i read threads.", read_threads_num); + INFO ("collectd: Stopping %zu read threads.", read_threads_num); pthread_mutex_lock (&read_lock); read_loop = 0; @@ -689,7 +731,7 @@ static void stop_read_threads (void) pthread_cond_broadcast (&read_cond); pthread_mutex_unlock (&read_lock); - for (int i = 0; i < read_threads_num; i++) + for (size_t i = 0; i < read_threads_num; i++) { if (pthread_join (read_threads[i], NULL) != 0) { @@ -723,6 +765,9 @@ static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{ return (NULL); memcpy (vl, vl_orig, sizeof (*vl)); + if (vl->host[0] == 0) + sstrncpy (vl->host, hostname_g, sizeof (vl->host)); + vl->values = calloc (vl_orig->values_len, sizeof (*vl->values)); if (vl->values == NULL) { @@ -874,9 +919,7 @@ static void start_write_threads (size_t num) /* {{{ */ write_threads_num = 0; for (size_t i = 0; i < num; i++) { - int status; - - status = pthread_create (write_threads + write_threads_num, + int status = pthread_create (write_threads + write_threads_num, /* attr = */ NULL, plugin_write_thread, /* arg = */ NULL); @@ -889,6 +932,10 @@ static void start_write_threads (size_t num) /* {{{ */ return; } + char name[THREAD_NAME_MAX]; + ssnprintf (name, sizeof (name), "writer#%zu", write_threads_num); + set_thread_name (write_threads[write_threads_num], name); + write_threads_num++; } /* for (i) */ } /* }}} void start_write_threads */ @@ -1261,7 +1308,7 @@ int plugin_register_read (const char *name, int plugin_register_complex_read (const char *group, const char *name, plugin_read_cb callback, cdtime_t interval, - user_data_t *user_data) + user_data_t const *user_data) { read_func_t *rf; int status; @@ -1305,7 +1352,7 @@ int plugin_register_complex_read (const char *group, const char *name, } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, - plugin_write_cb callback, user_data_t *ud) + plugin_write_cb callback, user_data_t const *ud) { return (create_register_callback (&list_write, name, (void *) callback, ud)); @@ -1352,7 +1399,7 @@ static char *plugin_flush_callback_name (const char *name) } /* static char *plugin_flush_callback_name */ int plugin_register_flush (const char *name, - plugin_flush_cb callback, user_data_t *ud) + plugin_flush_cb callback, user_data_t const *ud) { int status; plugin_ctx_t ctx = plugin_get_ctx (); @@ -1389,15 +1436,15 @@ int plugin_register_flush (const char *name, } cb->timeout = ctx.flush_timeout; - ud->data = cb; - ud->free_func = plugin_flush_timeout_callback_free; - status = plugin_register_complex_read ( /* group = */ "flush", /* name = */ flush_name, /* callback = */ plugin_flush_timeout_callback, /* interval = */ ctx.flush_interval, - /* user data = */ ud); + /* user data = */ &(user_data_t) { + .data = cb, + .free_func = plugin_flush_timeout_callback_free, + }); sfree (flush_name); if (status != 0) @@ -1412,7 +1459,7 @@ int plugin_register_flush (const char *name, } /* int plugin_register_flush */ int plugin_register_missing (const char *name, - plugin_missing_cb callback, user_data_t *ud) + plugin_missing_cb callback, user_data_t const *ud) { return (create_register_callback (&list_missing, name, (void *) callback, ud)); @@ -1483,14 +1530,14 @@ int plugin_register_data_set (const data_set_t *ds) } /* int plugin_register_data_set */ int plugin_register_log (const char *name, - plugin_log_cb callback, user_data_t *ud) + plugin_log_cb callback, user_data_t const *ud) { return (create_register_callback (&list_log, name, (void *) callback, ud)); } /* int plugin_register_log */ int plugin_register_notification (const char *name, - plugin_notification_cb callback, user_data_t *ud) + plugin_notification_cb callback, user_data_t const *ud) { return (create_register_callback (&list_notification, name, (void *) callback, ud)); @@ -1778,7 +1825,7 @@ int plugin_init_all (void) rt = global_option_get ("ReadThreads"); num = atoi (rt); if (num != -1) - start_read_threads ((num > 0) ? num : 5); + start_read_threads ((num > 0) ? ((size_t) num) : 5); } return ret; } /* void plugin_init_all */ @@ -2078,15 +2125,16 @@ static int plugin_dispatch_values_internal (value_list_t *vl) int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; - value_t *saved_values; - int saved_values_len; - data_set_t *ds; - int free_meta_data = 0; + _Bool free_meta_data = 0; - assert(vl); - assert(vl->plugin); + assert (vl != NULL); + + /* These fields are initialized by plugin_value_list_clone() if needed: */ + assert (vl->host[0] != 0); + assert (vl->time != 0); /* The time is determined at _enqueue_ time. */ + assert (vl->interval != 0); if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) { @@ -2126,11 +2174,6 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (-1); } - /* Assured by plugin_value_list_clone(). The time is determined at - * _enqueue_ time. */ - assert (vl->time != 0); - assert (vl->interval != 0); - DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " "plugin = %s; plugin_instance = %s; " @@ -2168,31 +2211,6 @@ static int plugin_dispatch_values_internal (value_list_t *vl) escape_slashes (vl->type, sizeof (vl->type)); escape_slashes (vl->type_instance, sizeof (vl->type_instance)); - /* Copy the values. This way, we can assure `targets' that they get - * dynamically allocated values, which they can free and replace if - * they like. */ - if ((pre_cache_chain != NULL) || (post_cache_chain != NULL)) - { - saved_values = vl->values; - saved_values_len = vl->values_len; - - vl->values = (value_t *) calloc (vl->values_len, - sizeof (*vl->values)); - if (vl->values == NULL) - { - ERROR ("plugin_dispatch_values: calloc failed."); - vl->values = saved_values; - return (-1); - } - memcpy (vl->values, saved_values, - vl->values_len * sizeof (*vl->values)); - } - else /* if ((pre == NULL) && (post == NULL)) */ - { - saved_values = NULL; - saved_values_len = 0; - } - if (pre_cache_chain != NULL) { status = fc_process_chain (ds, vl, pre_cache_chain); @@ -2204,17 +2222,7 @@ static int plugin_dispatch_values_internal (value_list_t *vl) status, status); } else if (status == FC_TARGET_STOP) - { - /* Restore the state of the value_list so that plugins - * don't get confused.. */ - if (saved_values != NULL) - { - sfree (vl->values); - vl->values = saved_values; - vl->values_len = saved_values_len; - } return (0); - } } /* Update the value cache */ @@ -2234,15 +2242,6 @@ static int plugin_dispatch_values_internal (value_list_t *vl) else fc_default_action (ds, vl); - /* Restore the state of the value_list so that plugins don't get - * confused.. */ - if (saved_values != NULL) - { - sfree (vl->values); - vl->values = saved_values; - vl->values_len = saved_values_len; - } - if ((free_meta_data != 0) && (vl->meta != NULL)) { meta_data_destroy (vl->meta); @@ -2838,20 +2837,30 @@ static void *plugin_thread_start (void *arg) } /* void *plugin_thread_start */ int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr, - void *(*start_routine) (void *), void *arg) + void *(*start_routine) (void *), void *arg, char const *name) { plugin_thread_t *plugin_thread; plugin_thread = malloc (sizeof (*plugin_thread)); if (plugin_thread == NULL) - return -1; + return ENOMEM; plugin_thread->ctx = plugin_get_ctx (); plugin_thread->start_routine = start_routine; plugin_thread->arg = arg; - return pthread_create (thread, attr, + int ret = pthread_create (thread, attr, plugin_thread_start, plugin_thread); + if (ret != 0) + { + sfree (plugin_thread); + return ret; + } + + if (name != NULL) + set_thread_name (*thread, name); + + return 0; } /* int plugin_thread_create */ /* vim: set sw=8 ts=8 noet fdm=marker : */