From cbc3671e75d323482c15ac5176474ad7ec1d67f9 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Mon, 16 Mar 2009 00:02:55 +0100 Subject: [PATCH] src/plugin.c: Use a heap to schedule reads. With this change basically only timeout checks are triggered from the global "read loop". This will allow each read function to be queried at a different interval. This is currently not used, but will come in handy in the future. --- src/Makefile.am | 1 + src/common.h | 7 ++ src/plugin.c | 322 +++++++++++++++++++++++++++++++++++-------------------- src/utils_heap.c | 223 ++++++++++++++++++++++++++++++++++++++ src/utils_heap.h | 96 +++++++++++++++++ 5 files changed, 534 insertions(+), 115 deletions(-) create mode 100644 src/utils_heap.c create mode 100644 src/utils_heap.h diff --git a/src/Makefile.am b/src/Makefile.am index 5cc1045f..40c2c388 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,6 +35,7 @@ collectd_SOURCES = collectd.c collectd.h \ utils_avltree.c utils_avltree.h \ utils_cache.c utils_cache.h \ utils_complain.c utils_complain.h \ + utils_heap.c utils_heap.h \ utils_ignorelist.c utils_ignorelist.h \ utils_llist.c utils_llist.h \ utils_parse_option.c utils_parse_option.h \ diff --git a/src/common.h b/src/common.h index 113f0440..d0cc4e82 100644 --- a/src/common.h +++ b/src/common.h @@ -222,6 +222,13 @@ int timeval_cmp (struct timeval tv0, struct timeval tv1, struct timeval *delta); (tv).tv_usec = (tv).tv_usec % 1000000; \ } while (0) +/* make sure tv_sec stores less than a second */ +#define NORMALIZE_TIMESPEC(tv) \ + do { \ + (tv).tv_sec += (tv).tv_nsec / 1000000000; \ + (tv).tv_nsec = (tv).tv_nsec % 1000000000; \ + } while (0) + int check_create_dir (const char *file_orig); #ifdef HAVE_LIBKSTAT diff --git a/src/plugin.c b/src/plugin.c index a5c6c527..14126cd5 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -34,6 +34,7 @@ #include "configfile.h" #include "utils_avltree.h" #include "utils_llist.h" +#include "utils_heap.h" #include "utils_cache.h" #include "utils_threshold.h" #include "filter_chain.h" @@ -57,10 +58,11 @@ struct read_func_s #define rf_callback rf_super.cf_callback #define rf_udata rf_super.cf_udata callback_func_t rf_super; + char rf_name[DATA_MAX_NAME_LEN]; int rf_type; - int rf_wait_time; - int rf_wait_left; - enum { DONE = 0, TODO = 1, ACTIVE = 2 } rf_needs_read; + struct timespec rf_interval; + struct timespec rf_effective_interval; + struct timespec rf_next_read; }; typedef struct read_func_s read_func_t; @@ -68,7 +70,6 @@ typedef struct read_func_s read_func_t; * Private variables */ static llist_t *list_init; -static llist_t *list_read; static llist_t *list_write; static llist_t *list_flush; static llist_t *list_shutdown; @@ -82,6 +83,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; +static c_heap_t *read_heap = NULL; static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; @@ -138,6 +140,26 @@ static void destroy_all_callbacks (llist_t **list) /* {{{ */ *list = NULL; } /* }}} void destroy_all_callbacks */ +static void destroy_read_heap (void) /* {{{ */ +{ + if (read_heap == NULL) + return; + + while (42) + { + callback_func_t *cf; + + cf = c_head_get_root (read_heap); + if (cf == NULL) + break; + + destroy_callback (cf); + } + + c_heap_destroy (read_heap); + read_heap = NULL; +} /* }}} void destroy_read_heap */ + static int register_callback (llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { @@ -280,86 +302,138 @@ static int plugin_load_file (char *file) static void *plugin_read_thread (void __attribute__((unused)) *args) { - llentry_t *le; - read_func_t *rf; - int status; - int done; - - pthread_mutex_lock (&read_lock); - while (read_loop != 0) { - le = llist_head (list_read); - done = 0; + read_func_t *rf; + struct timeval now; + int status; - while ((read_loop != 0) && (le != NULL)) + /* Get the read function that needs to be read next. */ + rf = c_head_get_root (read_heap); + if (rf == NULL) { - rf = (read_func_t *) le->value; + struct timespec abstime; - if (rf->rf_needs_read != TODO) - { - le = le->next; - continue; - } + gettimeofday (&now, /* timezone = */ NULL); - /* We will do this read function */ - rf->rf_needs_read = ACTIVE; + abstime.tv_sec = now.tv_sec + interval_g; + abstime.tv_nsec = 1000 * now.tv_usec; - DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s", - (unsigned long int) pthread_self (), le->key); + pthread_mutex_lock (&read_lock); + pthread_cond_timedwait (&read_cond, &read_lock, + &abstime); pthread_mutex_unlock (&read_lock); + continue; + } - if (rf->rf_type == RF_SIMPLE) - { - int (*callback) (void); + if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) + { + gettimeofday (&now, /* timezone = */ NULL); - callback = rf->rf_callback; - status = (*callback) (); - } - else - { - plugin_read_cb callback; + rf->rf_interval.tv_sec = interval_g; + rf->rf_interval.tv_nsec = 0; - callback = rf->rf_callback; - status = (*callback) (&rf->rf_udata); - } + rf->rf_effective_interval = rf->rf_interval; - done++; + rf->rf_next_read.tv_sec = now.tv_sec; + rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; + } - if (status != 0) - { - if (rf->rf_wait_time < interval_g) - rf->rf_wait_time = interval_g; - rf->rf_wait_left = rf->rf_wait_time; - rf->rf_wait_time = rf->rf_wait_time * 2; - if (rf->rf_wait_time > 86400) - rf->rf_wait_time = 86400; - - NOTICE ("read-function of plugin `%s' " - "failed. Will suspend it for %i " - "seconds.", le->key, rf->rf_wait_left); - } - else + /* sleep until this entry is due, + * using pthread_cond_timedwait */ + pthread_mutex_lock (&read_lock); + pthread_cond_timedwait (&read_cond, &read_lock, + &rf->rf_next_read); + pthread_mutex_unlock (&read_lock); + + /* Check if we're supposed to stop.. This may have interrupted + * the sleep, too. */ + if (read_loop == 0) + { + /* Insert `rf' again, so it can be free'd correctly */ + c_heap_insert (read_heap, rf); + break; + } + + DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); + + if (rf->rf_type == RF_SIMPLE) + { + int (*callback) (void); + + callback = rf->rf_callback; + status = (*callback) (); + } + else + { + plugin_read_cb callback; + + callback = rf->rf_callback; + status = (*callback) (&rf->rf_udata); + } + + /* If the function signals failure, we will increase the + * intervals in which it will be called. */ + if (status != 0) + { + rf->rf_effective_interval.tv_sec *= 2; + rf->rf_effective_interval.tv_nsec *= 2; + NORMALIZE_TIMESPEC (rf->rf_effective_interval); + + if (rf->rf_effective_interval.tv_sec >= 86400) { - rf->rf_wait_left = 0; - rf->rf_wait_time = interval_g; + rf->rf_effective_interval.tv_sec = 86400; + rf->rf_effective_interval.tv_nsec = 0; } - pthread_mutex_lock (&read_lock); - - rf->rf_needs_read = DONE; - le = le->next; - } /* while (le != NULL) */ + NOTICE ("read-function of plugin `%s' failed. " + "Will suspend it for %i seconds.", + rf->rf_name, + (int) rf->rf_effective_interval.tv_sec); + } + else + { + /* Success: Restore the interval, if it was changed. */ + rf->rf_effective_interval = rf->rf_interval; + } - if ((read_loop != 0) && (done == 0)) + /* update the ``next read due'' field */ + gettimeofday (&now, /* timezone = */ NULL); + + DEBUG ("plugin_read_thread: Effective interval of the " + "%s plugin is %i.%09i.", + rf->rf_name, + (int) rf->rf_effective_interval.tv_sec, + (int) rf->rf_effective_interval.tv_nsec); + + /* Calculate the next (absolute) time at which this function + * should be called. */ + rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec + + rf->rf_effective_interval.tv_sec; + rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec + + rf->rf_effective_interval.tv_nsec; + NORMALIZE_TIMESPEC (rf->rf_next_read); + + /* Check, if `rf_next_read' is in the past. */ + if ((rf->rf_next_read.tv_sec < now.tv_sec) + || ((rf->rf_next_read.tv_sec == now.tv_sec) + && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec)))) { - DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Waiting on read_cond.", - (unsigned long int) pthread_self ()); - pthread_cond_wait (&read_cond, &read_lock); + /* `rf_next_read' is in the past. Insert `now' + * so this value doesn't trail off into the + * past too much. */ + rf->rf_next_read.tv_sec = now.tv_sec; + rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; } - } /* while (read_loop) */ - pthread_mutex_unlock (&read_lock); + DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", + rf->rf_name, + (int) rf->rf_next_read.tv_sec, + (int) rf->rf_next_read.tv_nsec); + + /* Re-insert this read function into the heap again. */ + c_heap_insert (read_heap, rf); + } /* while (read_loop) */ pthread_exit (NULL); return ((void *) 0); @@ -546,11 +620,42 @@ int plugin_register_init (const char *name, /* user_data = */ NULL)); } /* plugin_register_init */ +static int plugin_compare_read_func (const void *arg0, const void *arg1) +{ + const read_func_t *rf0; + const read_func_t *rf1; + + rf0 = arg0; + rf1 = arg1; + + if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec) + return (-1); + else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec) + return (1); + else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec) + return (-1); + else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec) + return (1); + else + return (0); +} /* int plugin_compare_read_func */ + int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; + if (read_heap == NULL) + { + read_heap = c_heap_create (plugin_compare_read_func); + if (read_heap == NULL) + { + ERROR ("plugin_register_complex_read: " + "c_heap_create failed."); + return (-1); + } + } + rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -564,12 +669,13 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; - rf->rf_wait_time = interval_g; - rf->rf_wait_left = 0; + sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; - rf->rf_needs_read = DONE; + rf->rf_interval.tv_sec = 0; + rf->rf_interval.tv_nsec = 0; + rf->rf_effective_interval = rf->rf_interval; - return (register_callback (&list_read, name, (callback_func_t *) rf)); + return (c_heap_insert (read_heap, rf)); } /* int plugin_register_read */ int plugin_register_complex_read (const char *name, @@ -577,6 +683,17 @@ int plugin_register_complex_read (const char *name, { read_func_t *rf; + if (read_heap == NULL) + { + read_heap = c_heap_create (plugin_compare_read_func); + if (read_heap == NULL) + { + ERROR ("plugin_register_complex_read: " + "c_heap_create failed."); + return (-1); + } + } + rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -586,10 +703,11 @@ int plugin_register_complex_read (const char *name, memset (rf, 0, sizeof (read_func_t)); rf->rf_callback = (void *) callback; - rf->rf_wait_time = interval_g; - rf->rf_wait_left = 0; + sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_COMPLEX; - rf->rf_needs_read = DONE; + rf->rf_interval.tv_sec = 0; + rf->rf_interval.tv_nsec = 0; + rf->rf_effective_interval = rf->rf_interval; /* Set user data */ if (user_data == NULL) @@ -602,7 +720,7 @@ int plugin_register_complex_read (const char *name, rf->rf_udata = *user_data; } - return (register_callback (&list_read, name, (callback_func_t *) rf)); + return (c_heap_insert (read_heap, rf)); } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, @@ -696,7 +814,9 @@ int plugin_unregister_init (const char *name) int plugin_unregister_read (const char *name) { - return (plugin_unregister (list_read, name)); + /* TODO: Implement removal of a specific key from the heap. */ + assert (0); + return (-1); } int plugin_unregister_write (const char *name) @@ -756,7 +876,7 @@ void plugin_init_all (void) post_cache_chain = fc_chain_get_by_name (chain_name); - if ((list_init == NULL) && (list_read == NULL)) + if ((list_init == NULL) && (read_heap == NULL)) return; /* Calling all init callbacks before checking if read callbacks @@ -789,7 +909,7 @@ void plugin_init_all (void) } /* Start read-threads */ - if (list_read != NULL) + if (read_heap != NULL) { const char *rt; int num; @@ -800,64 +920,34 @@ void plugin_init_all (void) } } /* void plugin_init_all */ +/* TODO: Rename this function. */ void plugin_read_all (void) { - llentry_t *le; - read_func_t *rf; - uc_check_timeout (); - if (list_read == NULL) - return; - - pthread_mutex_lock (&read_lock); - - le = llist_head (list_read); - while (le != NULL) - { - rf = (read_func_t *) le->value; - - if (rf->rf_needs_read != DONE) - { - le = le->next; - continue; - } - - if (rf->rf_wait_left > 0) - rf->rf_wait_left -= interval_g; - - if (rf->rf_wait_left <= 0) - { - rf->rf_needs_read = TODO; - } - - le = le->next; - } - - DEBUG ("plugin: plugin_read_all: Signalling `read_cond'"); - pthread_cond_broadcast (&read_cond); - pthread_mutex_unlock (&read_lock); + return; } /* void plugin_read_all */ /* Read function called when the `-T' command line argument is given. */ int plugin_read_all_once (void) { llentry_t *le; - read_func_t *rf; int status; int return_status = 0; - if (list_read == NULL) + if (read_heap == NULL) { NOTICE ("No read-functions are registered."); return (0); } - for (le = llist_head (list_read); - le != NULL; - le = le->next) + while (42) { - rf = (read_func_t *) le->value; + read_func_t *rf; + + rf = c_head_get_root (read_heap); + if (rf == NULL) + break; if (rf->rf_type == RF_SIMPLE) { @@ -880,6 +970,8 @@ int plugin_read_all_once (void) le->key); return_status = -1; } + + destroy_callback ((void *) rf); } return (return_status); @@ -998,7 +1090,7 @@ void plugin_shutdown_all (void) stop_read_threads (); destroy_all_callbacks (&list_init); - destroy_all_callbacks (&list_read); + destroy_read_heap (); plugin_flush (/* plugin = */ NULL, /* timeout = */ -1, /* identifier = */ NULL); diff --git a/src/utils_heap.c b/src/utils_heap.c new file mode 100644 index 00000000..1ecd07e8 --- /dev/null +++ b/src/utils_heap.c @@ -0,0 +1,223 @@ +/** + * collectd - src/utils_heap.c + * Copyright (C) 2009 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include +#include +#include +#include +#include + +#include "utils_heap.h" + +struct c_heap_s +{ + pthread_mutex_t lock; + int (*compare) (const void *, const void *); + + void **list; + size_t list_len; /* # entries used */ + size_t list_size; /* # entries allocated */ +}; + +enum reheap_direction +{ + DIR_UP, + DIR_DOWN +}; + +static void reheap (c_heap_t *h, size_t root, enum reheap_direction dir) +{ + size_t left; + size_t right; + size_t min; + int status; + + /* Calculate the positions of the children */ + left = (2 * root) + 1; + if (left >= h->list_len) + left = 0; + + right = (2 * root) + 2; + if (right >= h->list_len) + right = 0; + + /* Check which one of the children is smaller. */ + if ((left == 0) && (right == 0)) + return; + else if (left == 0) + min = right; + else if (right == 0) + min = left; + else + { + status = h->compare (h->list[left], h->list[right]); + if (status > 0) + min = right; + else + min = left; + } + + status = h->compare (h->list[root], h->list[min]); + if (status <= 0) + { + /* We didn't need to change anything, so the rest of the tree should be + * okay now. */ + return; + } + else /* if (status > 0) */ + { + void *tmp; + + tmp = h->list[root]; + h->list[root] = h->list[min]; + h->list[min] = tmp; + } + + if ((dir == DIR_UP) && (root == 0)) + return; + + if (dir == DIR_UP) + reheap (h, root / 2, dir); + else if (dir == DIR_DOWN) + reheap (h, min, dir); +} /* void reheap */ + +c_heap_t *c_heap_create (int (*compare) (const void *, const void *)) +{ + c_heap_t *h; + + if (compare == NULL) + return (NULL); + + h = malloc (sizeof (*h)); + if (h == NULL) + return (NULL); + + memset (h, 0, sizeof (*h)); + pthread_mutex_init (&h->lock, /* attr = */ NULL); + h->compare = compare; + + h->list = NULL; + h->list_len = 0; + h->list_size = 0; + + return (h); +} /* c_heap_t *c_heap_create */ + +void c_heap_destroy (c_heap_t *h) +{ + if (h == NULL) + return; + + h->list_len = 0; + h->list_size = 0; + free (h->list); + h->list = NULL; + + pthread_mutex_destroy (&h->lock); + + free (h); +} /* void c_heap_destroy */ + +int c_heap_insert (c_heap_t *h, void *ptr) +{ + if ((h == NULL) || (ptr == NULL)) + return (-EINVAL); + + pthread_mutex_lock (&h->lock); + + assert (h->list_len <= h->list_size); + if (h->list_len == h->list_size) + { + void **tmp; + + tmp = realloc (h->list, (h->list_size + 16) * sizeof (*h->list)); + if (tmp == NULL) + { + pthread_mutex_unlock (&h->lock); + return (-ENOMEM); + } + + h->list = tmp; + h->list_size += 16; + } + + /* Insert the new node as a leaf. */ + h->list[h->list_len] = ptr; + h->list_len++; + + /* Reorganize the heap from bottom up. */ + reheap (h, /* parent of this node = */ (h->list_len - 1) / 2, DIR_UP); + + pthread_mutex_unlock (&h->lock); + return (0); +} /* int c_heap_insert */ + +void *c_head_get_root (c_heap_t *h) +{ + void *ret = NULL; + + if (h == NULL) + return (NULL); + + pthread_mutex_lock (&h->lock); + + if (h->list_len == 0) + { + pthread_mutex_unlock (&h->lock); + return (NULL); + } + else if (h->list_len == 1) + { + ret = h->list[0]; + h->list[0] = NULL; + h->list_len = 0; + } + else /* if (h->list_len > 1) */ + { + ret = h->list[0]; + h->list[0] = h->list[h->list_len - 1]; + h->list[h->list_len - 1] = NULL; + h->list_len--; + + reheap (h, /* root = */ 0, DIR_DOWN); + } + + /* free some memory */ + if ((h->list_len + 32) < h->list_size) + { + void **tmp; + + tmp = realloc (h->list, (h->list_len + 16) * sizeof (*h->list)); + if (tmp != NULL) + { + h->list = tmp; + h->list_size = h->list_len + 16; + } + } + + pthread_mutex_unlock (&h->lock); + + return (ret); +} /* void *c_head_get_root */ + +/* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_heap.h b/src/utils_heap.h new file mode 100644 index 00000000..dd0f4866 --- /dev/null +++ b/src/utils_heap.h @@ -0,0 +1,96 @@ +/** + * collectd - src/utils_heap.h + * Copyright (C) 2009 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#ifndef UTILS_HEAP_H +#define UTILS_HEAP_H 1 + +struct c_heap_s; +typedef struct c_heap_s c_heap_t; + +/* + * NAME + * c_heap_create + * + * DESCRIPTION + * Allocates a new heap. + * + * PARAMETERS + * `compare' The function-pointer `compare' is used to compare two keys. It + * has to return less than zero if it's first argument is smaller + * then the second argument, more than zero if the first argument + * is bigger than the second argument and zero if they are equal. + * If your keys are char-pointers, you can use the `strcmp' + * function from the libc here. + * + * RETURN VALUE + * A c_heap_t-pointer upon success or NULL upon failure. + */ +c_heap_t *c_heap_create (int (*compare) (const void *, const void *)); + +/* + * NAME + * c_heap_destroy + * + * DESCRIPTION + * Deallocates a heap. Stored value- and key-pointer are lost, but of course + * not freed. + */ +void c_heap_destroy (c_heap_t *h); + +/* + * NAME + * c_heap_insert + * + * DESCRIPTION + * Stores the key-value-pair in the heap pointed to by `h'. + * + * PARAMETERS + * `h' Heap to store the data in. + * `ptr' Value to be stored. This is typically a pointer to a data + * structure. The data structure is of course *not* copied and may + * not be free'd before the pointer has been removed from the heap + * again. + * + * RETURN VALUE + * Zero upon success, non-zero otherwise. It's less than zero if an error + * occurred or greater than zero if the key is already stored in the tree. + */ +int c_heap_insert (c_heap_t *h, void *ptr); + +/* + * NAME + * c_head_get_root + * + * DESCRIPTION + * Removes the value at the root of the heap and returns both, key and value. + * + * PARAMETERS + * `h' Heap to remove key-value-pair from. + * + * RETURN VALUE + * The pointer passed to `c_heap_insert' or NULL if there are no more + * elements in the heap (or an error occurred). + */ +void *c_head_get_root (c_heap_t *h); + +#endif /* UTILS_HEAP_H */ +/* vim: set sw=2 sts=2 et : */ -- 2.11.0