#include "configfile.h"
#include "utils_avltree.h"
#include "utils_llist.h"
+#include "utils_heap.h"
#include "utils_cache.h"
#include "utils_threshold.h"
#include "filter_chain.h"
#define rf_callback rf_super.cf_callback
#define rf_udata rf_super.cf_udata
callback_func_t rf_super;
+ char rf_name[DATA_MAX_NAME_LEN];
int rf_type;
- int rf_wait_time;
- int rf_wait_left;
- enum { DONE = 0, TODO = 1, ACTIVE = 2 } rf_needs_read;
+ struct timespec rf_interval;
+ struct timespec rf_effective_interval;
+ struct timespec rf_next_read;
};
typedef struct read_func_s read_func_t;
* Private variables
*/
static llist_t *list_init;
-static llist_t *list_read;
static llist_t *list_write;
static llist_t *list_flush;
static llist_t *list_shutdown;
static char *plugindir = NULL;
+static c_heap_t *read_heap = NULL;
static int read_loop = 1;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
*list = NULL;
} /* }}} void destroy_all_callbacks */
+static void destroy_read_heap (void) /* {{{ */
+{
+ if (read_heap == NULL)
+ return;
+
+ while (42)
+ {
+ callback_func_t *cf;
+
+ cf = c_head_get_root (read_heap);
+ if (cf == NULL)
+ break;
+
+ destroy_callback (cf);
+ }
+
+ c_heap_destroy (read_heap);
+ read_heap = NULL;
+} /* }}} void destroy_read_heap */
+
static int register_callback (llist_t **list, /* {{{ */
const char *name, callback_func_t *cf)
{
static void *plugin_read_thread (void __attribute__((unused)) *args)
{
- llentry_t *le;
- read_func_t *rf;
- int status;
- int done;
-
- pthread_mutex_lock (&read_lock);
-
while (read_loop != 0)
{
- le = llist_head (list_read);
- done = 0;
+ read_func_t *rf;
+ struct timeval now;
+ int status;
- while ((read_loop != 0) && (le != NULL))
+ /* Get the read function that needs to be read next. */
+ rf = c_head_get_root (read_heap);
+ if (rf == NULL)
{
- rf = (read_func_t *) le->value;
+ struct timespec abstime;
- if (rf->rf_needs_read != TODO)
- {
- le = le->next;
- continue;
- }
+ gettimeofday (&now, /* timezone = */ NULL);
- /* We will do this read function */
- rf->rf_needs_read = ACTIVE;
+ abstime.tv_sec = now.tv_sec + interval_g;
+ abstime.tv_nsec = 1000 * now.tv_usec;
- DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s",
- (unsigned long int) pthread_self (), le->key);
+ pthread_mutex_lock (&read_lock);
+ pthread_cond_timedwait (&read_cond, &read_lock,
+ &abstime);
pthread_mutex_unlock (&read_lock);
+ continue;
+ }
- if (rf->rf_type == RF_SIMPLE)
- {
- int (*callback) (void);
+ if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
+ {
+ gettimeofday (&now, /* timezone = */ NULL);
- callback = rf->rf_callback;
- status = (*callback) ();
- }
- else
- {
- plugin_read_cb callback;
+ rf->rf_interval.tv_sec = interval_g;
+ rf->rf_interval.tv_nsec = 0;
- callback = rf->rf_callback;
- status = (*callback) (&rf->rf_udata);
- }
+ rf->rf_effective_interval = rf->rf_interval;
- done++;
+ rf->rf_next_read.tv_sec = now.tv_sec;
+ rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+ }
- if (status != 0)
- {
- if (rf->rf_wait_time < interval_g)
- rf->rf_wait_time = interval_g;
- rf->rf_wait_left = rf->rf_wait_time;
- rf->rf_wait_time = rf->rf_wait_time * 2;
- if (rf->rf_wait_time > 86400)
- rf->rf_wait_time = 86400;
-
- NOTICE ("read-function of plugin `%s' "
- "failed. Will suspend it for %i "
- "seconds.", le->key, rf->rf_wait_left);
- }
- else
+ /* sleep until this entry is due,
+ * using pthread_cond_timedwait */
+ pthread_mutex_lock (&read_lock);
+ pthread_cond_timedwait (&read_cond, &read_lock,
+ &rf->rf_next_read);
+ pthread_mutex_unlock (&read_lock);
+
+ /* Check if we're supposed to stop.. This may have interrupted
+ * the sleep, too. */
+ if (read_loop == 0)
+ {
+ /* Insert `rf' again, so it can be free'd correctly */
+ c_heap_insert (read_heap, rf);
+ break;
+ }
+
+ DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
+
+ if (rf->rf_type == RF_SIMPLE)
+ {
+ int (*callback) (void);
+
+ callback = rf->rf_callback;
+ status = (*callback) ();
+ }
+ else
+ {
+ plugin_read_cb callback;
+
+ callback = rf->rf_callback;
+ status = (*callback) (&rf->rf_udata);
+ }
+
+ /* If the function signals failure, we will increase the
+ * intervals in which it will be called. */
+ if (status != 0)
+ {
+ rf->rf_effective_interval.tv_sec *= 2;
+ rf->rf_effective_interval.tv_nsec *= 2;
+ NORMALIZE_TIMESPEC (rf->rf_effective_interval);
+
+ if (rf->rf_effective_interval.tv_sec >= 86400)
{
- rf->rf_wait_left = 0;
- rf->rf_wait_time = interval_g;
+ rf->rf_effective_interval.tv_sec = 86400;
+ rf->rf_effective_interval.tv_nsec = 0;
}
- pthread_mutex_lock (&read_lock);
-
- rf->rf_needs_read = DONE;
- le = le->next;
- } /* while (le != NULL) */
+ NOTICE ("read-function of plugin `%s' failed. "
+ "Will suspend it for %i seconds.",
+ rf->rf_name,
+ (int) rf->rf_effective_interval.tv_sec);
+ }
+ else
+ {
+ /* Success: Restore the interval, if it was changed. */
+ rf->rf_effective_interval = rf->rf_interval;
+ }
- if ((read_loop != 0) && (done == 0))
+ /* update the ``next read due'' field */
+ gettimeofday (&now, /* timezone = */ NULL);
+
+ DEBUG ("plugin_read_thread: Effective interval of the "
+ "%s plugin is %i.%09i.",
+ rf->rf_name,
+ (int) rf->rf_effective_interval.tv_sec,
+ (int) rf->rf_effective_interval.tv_nsec);
+
+ /* Calculate the next (absolute) time at which this function
+ * should be called. */
+ rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec
+ + rf->rf_effective_interval.tv_sec;
+ rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec
+ + rf->rf_effective_interval.tv_nsec;
+ NORMALIZE_TIMESPEC (rf->rf_next_read);
+
+ /* Check, if `rf_next_read' is in the past. */
+ if ((rf->rf_next_read.tv_sec < now.tv_sec)
+ || ((rf->rf_next_read.tv_sec == now.tv_sec)
+ && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec))))
{
- DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Waiting on read_cond.",
- (unsigned long int) pthread_self ());
- pthread_cond_wait (&read_cond, &read_lock);
+ /* `rf_next_read' is in the past. Insert `now'
+ * so this value doesn't trail off into the
+ * past too much. */
+ rf->rf_next_read.tv_sec = now.tv_sec;
+ rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
}
- } /* while (read_loop) */
- pthread_mutex_unlock (&read_lock);
+ DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
+ rf->rf_name,
+ (int) rf->rf_next_read.tv_sec,
+ (int) rf->rf_next_read.tv_nsec);
+
+ /* Re-insert this read function into the heap again. */
+ c_heap_insert (read_heap, rf);
+ } /* while (read_loop) */
pthread_exit (NULL);
return ((void *) 0);
/* user_data = */ NULL));
} /* plugin_register_init */
+static int plugin_compare_read_func (const void *arg0, const void *arg1)
+{
+ const read_func_t *rf0;
+ const read_func_t *rf1;
+
+ rf0 = arg0;
+ rf1 = arg1;
+
+ if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec)
+ return (-1);
+ else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec)
+ return (1);
+ else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec)
+ return (-1);
+ else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec)
+ return (1);
+ else
+ return (0);
+} /* int plugin_compare_read_func */
+
int plugin_register_read (const char *name,
int (*callback) (void))
{
read_func_t *rf;
+ if (read_heap == NULL)
+ {
+ read_heap = c_heap_create (plugin_compare_read_func);
+ if (read_heap == NULL)
+ {
+ ERROR ("plugin_register_complex_read: "
+ "c_heap_create failed.");
+ return (-1);
+ }
+ }
+
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
rf->rf_callback = (void *) callback;
rf->rf_udata.data = NULL;
rf->rf_udata.free_func = NULL;
- rf->rf_wait_time = interval_g;
- rf->rf_wait_left = 0;
+ sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_SIMPLE;
- rf->rf_needs_read = DONE;
+ rf->rf_interval.tv_sec = 0;
+ rf->rf_interval.tv_nsec = 0;
+ rf->rf_effective_interval = rf->rf_interval;
- return (register_callback (&list_read, name, (callback_func_t *) rf));
+ return (c_heap_insert (read_heap, rf));
} /* int plugin_register_read */
int plugin_register_complex_read (const char *name,
{
read_func_t *rf;
+ if (read_heap == NULL)
+ {
+ read_heap = c_heap_create (plugin_compare_read_func);
+ if (read_heap == NULL)
+ {
+ ERROR ("plugin_register_complex_read: "
+ "c_heap_create failed.");
+ return (-1);
+ }
+ }
+
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
memset (rf, 0, sizeof (read_func_t));
rf->rf_callback = (void *) callback;
- rf->rf_wait_time = interval_g;
- rf->rf_wait_left = 0;
+ sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_COMPLEX;
- rf->rf_needs_read = DONE;
+ rf->rf_interval.tv_sec = 0;
+ rf->rf_interval.tv_nsec = 0;
+ rf->rf_effective_interval = rf->rf_interval;
/* Set user data */
if (user_data == NULL)
rf->rf_udata = *user_data;
}
- return (register_callback (&list_read, name, (callback_func_t *) rf));
+ return (c_heap_insert (read_heap, rf));
} /* int plugin_register_complex_read */
int plugin_register_write (const char *name,
int plugin_unregister_read (const char *name)
{
- return (plugin_unregister (list_read, name));
+ /* TODO: Implement removal of a specific key from the heap. */
+ assert (0);
+ return (-1);
}
int plugin_unregister_write (const char *name)
post_cache_chain = fc_chain_get_by_name (chain_name);
- if ((list_init == NULL) && (list_read == NULL))
+ if ((list_init == NULL) && (read_heap == NULL))
return;
/* Calling all init callbacks before checking if read callbacks
}
/* Start read-threads */
- if (list_read != NULL)
+ if (read_heap != NULL)
{
const char *rt;
int num;
}
} /* void plugin_init_all */
+/* TODO: Rename this function. */
void plugin_read_all (void)
{
- llentry_t *le;
- read_func_t *rf;
-
uc_check_timeout ();
- if (list_read == NULL)
- return;
-
- pthread_mutex_lock (&read_lock);
-
- le = llist_head (list_read);
- while (le != NULL)
- {
- rf = (read_func_t *) le->value;
-
- if (rf->rf_needs_read != DONE)
- {
- le = le->next;
- continue;
- }
-
- if (rf->rf_wait_left > 0)
- rf->rf_wait_left -= interval_g;
-
- if (rf->rf_wait_left <= 0)
- {
- rf->rf_needs_read = TODO;
- }
-
- le = le->next;
- }
-
- DEBUG ("plugin: plugin_read_all: Signalling `read_cond'");
- pthread_cond_broadcast (&read_cond);
- pthread_mutex_unlock (&read_lock);
+ return;
} /* void plugin_read_all */
/* Read function called when the `-T' command line argument is given. */
int plugin_read_all_once (void)
{
llentry_t *le;
- read_func_t *rf;
int status;
int return_status = 0;
- if (list_read == NULL)
+ if (read_heap == NULL)
{
NOTICE ("No read-functions are registered.");
return (0);
}
- for (le = llist_head (list_read);
- le != NULL;
- le = le->next)
+ while (42)
{
- rf = (read_func_t *) le->value;
+ read_func_t *rf;
+
+ rf = c_head_get_root (read_heap);
+ if (rf == NULL)
+ break;
if (rf->rf_type == RF_SIMPLE)
{
le->key);
return_status = -1;
}
+
+ destroy_callback ((void *) rf);
}
return (return_status);
stop_read_threads ();
destroy_all_callbacks (&list_init);
- destroy_all_callbacks (&list_read);
+ destroy_read_heap ();
plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
/* identifier = */ NULL);
--- /dev/null
+/**
+ * collectd - src/utils_heap.c
+ * Copyright (C) 2009 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at verplant.org>
+ **/
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "utils_heap.h"
+
+struct c_heap_s
+{
+ pthread_mutex_t lock;
+ int (*compare) (const void *, const void *);
+
+ void **list;
+ size_t list_len; /* # entries used */
+ size_t list_size; /* # entries allocated */
+};
+
+enum reheap_direction
+{
+ DIR_UP,
+ DIR_DOWN
+};
+
+static void reheap (c_heap_t *h, size_t root, enum reheap_direction dir)
+{
+ size_t left;
+ size_t right;
+ size_t min;
+ int status;
+
+ /* Calculate the positions of the children */
+ left = (2 * root) + 1;
+ if (left >= h->list_len)
+ left = 0;
+
+ right = (2 * root) + 2;
+ if (right >= h->list_len)
+ right = 0;
+
+ /* Check which one of the children is smaller. */
+ if ((left == 0) && (right == 0))
+ return;
+ else if (left == 0)
+ min = right;
+ else if (right == 0)
+ min = left;
+ else
+ {
+ status = h->compare (h->list[left], h->list[right]);
+ if (status > 0)
+ min = right;
+ else
+ min = left;
+ }
+
+ status = h->compare (h->list[root], h->list[min]);
+ if (status <= 0)
+ {
+ /* We didn't need to change anything, so the rest of the tree should be
+ * okay now. */
+ return;
+ }
+ else /* if (status > 0) */
+ {
+ void *tmp;
+
+ tmp = h->list[root];
+ h->list[root] = h->list[min];
+ h->list[min] = tmp;
+ }
+
+ if ((dir == DIR_UP) && (root == 0))
+ return;
+
+ if (dir == DIR_UP)
+ reheap (h, root / 2, dir);
+ else if (dir == DIR_DOWN)
+ reheap (h, min, dir);
+} /* void reheap */
+
+c_heap_t *c_heap_create (int (*compare) (const void *, const void *))
+{
+ c_heap_t *h;
+
+ if (compare == NULL)
+ return (NULL);
+
+ h = malloc (sizeof (*h));
+ if (h == NULL)
+ return (NULL);
+
+ memset (h, 0, sizeof (*h));
+ pthread_mutex_init (&h->lock, /* attr = */ NULL);
+ h->compare = compare;
+
+ h->list = NULL;
+ h->list_len = 0;
+ h->list_size = 0;
+
+ return (h);
+} /* c_heap_t *c_heap_create */
+
+void c_heap_destroy (c_heap_t *h)
+{
+ if (h == NULL)
+ return;
+
+ h->list_len = 0;
+ h->list_size = 0;
+ free (h->list);
+ h->list = NULL;
+
+ pthread_mutex_destroy (&h->lock);
+
+ free (h);
+} /* void c_heap_destroy */
+
+int c_heap_insert (c_heap_t *h, void *ptr)
+{
+ if ((h == NULL) || (ptr == NULL))
+ return (-EINVAL);
+
+ pthread_mutex_lock (&h->lock);
+
+ assert (h->list_len <= h->list_size);
+ if (h->list_len == h->list_size)
+ {
+ void **tmp;
+
+ tmp = realloc (h->list, (h->list_size + 16) * sizeof (*h->list));
+ if (tmp == NULL)
+ {
+ pthread_mutex_unlock (&h->lock);
+ return (-ENOMEM);
+ }
+
+ h->list = tmp;
+ h->list_size += 16;
+ }
+
+ /* Insert the new node as a leaf. */
+ h->list[h->list_len] = ptr;
+ h->list_len++;
+
+ /* Reorganize the heap from bottom up. */
+ reheap (h, /* parent of this node = */ (h->list_len - 1) / 2, DIR_UP);
+
+ pthread_mutex_unlock (&h->lock);
+ return (0);
+} /* int c_heap_insert */
+
+void *c_head_get_root (c_heap_t *h)
+{
+ void *ret = NULL;
+
+ if (h == NULL)
+ return (NULL);
+
+ pthread_mutex_lock (&h->lock);
+
+ if (h->list_len == 0)
+ {
+ pthread_mutex_unlock (&h->lock);
+ return (NULL);
+ }
+ else if (h->list_len == 1)
+ {
+ ret = h->list[0];
+ h->list[0] = NULL;
+ h->list_len = 0;
+ }
+ else /* if (h->list_len > 1) */
+ {
+ ret = h->list[0];
+ h->list[0] = h->list[h->list_len - 1];
+ h->list[h->list_len - 1] = NULL;
+ h->list_len--;
+
+ reheap (h, /* root = */ 0, DIR_DOWN);
+ }
+
+ /* free some memory */
+ if ((h->list_len + 32) < h->list_size)
+ {
+ void **tmp;
+
+ tmp = realloc (h->list, (h->list_len + 16) * sizeof (*h->list));
+ if (tmp != NULL)
+ {
+ h->list = tmp;
+ h->list_size = h->list_len + 16;
+ }
+ }
+
+ pthread_mutex_unlock (&h->lock);
+
+ return (ret);
+} /* void *c_head_get_root */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
--- /dev/null
+/**
+ * collectd - src/utils_heap.h
+ * Copyright (C) 2009 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at verplant.org>
+ **/
+
+#ifndef UTILS_HEAP_H
+#define UTILS_HEAP_H 1
+
+struct c_heap_s;
+typedef struct c_heap_s c_heap_t;
+
+/*
+ * NAME
+ * c_heap_create
+ *
+ * DESCRIPTION
+ * Allocates a new heap.
+ *
+ * PARAMETERS
+ * `compare' The function-pointer `compare' is used to compare two keys. It
+ * has to return less than zero if it's first argument is smaller
+ * then the second argument, more than zero if the first argument
+ * is bigger than the second argument and zero if they are equal.
+ * If your keys are char-pointers, you can use the `strcmp'
+ * function from the libc here.
+ *
+ * RETURN VALUE
+ * A c_heap_t-pointer upon success or NULL upon failure.
+ */
+c_heap_t *c_heap_create (int (*compare) (const void *, const void *));
+
+/*
+ * NAME
+ * c_heap_destroy
+ *
+ * DESCRIPTION
+ * Deallocates a heap. Stored value- and key-pointer are lost, but of course
+ * not freed.
+ */
+void c_heap_destroy (c_heap_t *h);
+
+/*
+ * NAME
+ * c_heap_insert
+ *
+ * DESCRIPTION
+ * Stores the key-value-pair in the heap pointed to by `h'.
+ *
+ * PARAMETERS
+ * `h' Heap to store the data in.
+ * `ptr' Value to be stored. This is typically a pointer to a data
+ * structure. The data structure is of course *not* copied and may
+ * not be free'd before the pointer has been removed from the heap
+ * again.
+ *
+ * RETURN VALUE
+ * Zero upon success, non-zero otherwise. It's less than zero if an error
+ * occurred or greater than zero if the key is already stored in the tree.
+ */
+int c_heap_insert (c_heap_t *h, void *ptr);
+
+/*
+ * NAME
+ * c_head_get_root
+ *
+ * DESCRIPTION
+ * Removes the value at the root of the heap and returns both, key and value.
+ *
+ * PARAMETERS
+ * `h' Heap to remove key-value-pair from.
+ *
+ * RETURN VALUE
+ * The pointer passed to `c_heap_insert' or NULL if there are no more
+ * elements in the heap (or an error occurred).
+ */
+void *c_head_get_root (c_heap_t *h);
+
+#endif /* UTILS_HEAP_H */
+/* vim: set sw=2 sts=2 et : */