#define RF_SIMPLE 0
#define RF_COMPLEX 1
+#define RF_REMOVE 65535
struct read_func_s
{
/* `read_func_t' "inherits" from `callback_func_t'.
static char *plugindir = NULL;
static c_heap_t *read_heap = NULL;
+static llist_t *read_list;
static int read_loop = 1;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
read_func_t *rf;
struct timeval now;
int status;
+ int rf_type;
/* Get the read function that needs to be read next. */
rf = c_head_get_root (read_heap);
pthread_mutex_lock (&read_lock);
pthread_cond_timedwait (&read_cond, &read_lock,
&rf->rf_next_read);
+ /* Must hold `real_lock' when accessing `rf->rf_type'. */
+ rf_type = rf->rf_type;
pthread_mutex_unlock (&read_lock);
/* Check if we're supposed to stop.. This may have interrupted
break;
}
+ /* The entry has been marked for deletion. The linked list
+ * entry has already been removed by `plugin_unregister_read'.
+ * All we have to do here is free the `read_func_t' and
+ * continue. */
+ if (rf_type == RF_REMOVE)
+ {
+ DEBUG ("plugin_read_thread: Destroying the `%s' "
+ "callback.", rf->rf_name);
+ destroy_callback ((callback_func_t *) rf);
+ rf = NULL;
+ continue;
+ }
+
DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
- if (rf->rf_type == RF_SIMPLE)
+ if (rf_type == RF_SIMPLE)
{
int (*callback) (void);
{
plugin_read_cb callback;
+ assert (rf_type == RF_COMPLEX);
+
callback = rf->rf_callback;
status = (*callback) (&rf->rf_udata);
}
return (0);
} /* int plugin_compare_read_func */
-int plugin_register_read (const char *name,
- int (*callback) (void))
+/* Add a read function to both, the heap and a linked list. The linked list if
+ * used to look-up read functions, especially for the remove function. The heap
+ * is used to determine which plugin to read next. */
+static int plugin_insert_read (read_func_t *rf)
{
- read_func_t *rf;
+ int status;
+ llentry_t *le;
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ read_list = llist_create ();
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: read_list failed.");
+ return (-1);
+ }
+ }
if (read_heap == NULL)
{
read_heap = c_heap_create (plugin_compare_read_func);
if (read_heap == NULL)
{
- ERROR ("plugin_register_read: "
- "c_heap_create failed.");
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: c_heap_create failed.");
return (-1);
}
}
+ le = llentry_create (rf->rf_name, rf);
+ if (le == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: llentry_create failed.");
+ return (-1);
+ }
+
+ status = c_heap_insert (read_heap, rf);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: c_heap_insert failed.");
+ llentry_destroy (le);
+ return (-1);
+ }
+
+ /* This does not fail. */
+ llist_append (read_list, le);
+
+ pthread_mutex_unlock (&read_lock);
+ return (0);
+} /* int plugin_insert_read */
+
+int plugin_register_read (const char *name,
+ int (*callback) (void))
+{
+ read_func_t *rf;
+
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
rf->rf_interval.tv_nsec = 0;
rf->rf_effective_interval = rf->rf_interval;
- return (c_heap_insert (read_heap, rf));
+ return (plugin_insert_read (rf));
} /* int plugin_register_read */
int plugin_register_complex_read (const char *name,
{
read_func_t *rf;
- if (read_heap == NULL)
- {
- read_heap = c_heap_create (plugin_compare_read_func);
- if (read_heap == NULL)
- {
- ERROR ("plugin_register_read: c_heap_create failed.");
- return (-1);
- }
- }
-
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
rf->rf_udata = *user_data;
}
- return (c_heap_insert (read_heap, rf));
+ return (plugin_insert_read (rf));
} /* int plugin_register_complex_read */
int plugin_register_write (const char *name,
return (plugin_unregister (list_init, name));
}
-int plugin_unregister_read (const char *name)
+int plugin_unregister_read (const char *name) /* {{{ */
{
- /* TODO: Implement removal of a specific key from the heap. */
- assert (0);
- return (-1);
-}
+ llentry_t *le;
+ read_func_t *rf;
+
+ if (name == NULL)
+ return (-ENOENT);
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ return (-ENOENT);
+ }
+
+ le = llist_search (read_list, name);
+ llist_remove (read_list, le);
+
+ rf = le->value;
+ assert (rf != NULL);
+ rf->rf_type = RF_REMOVE;
+
+ pthread_mutex_unlock (&read_lock);
+
+ llentry_destroy (le);
+
+ DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
+
+ return (0);
+} /* }}} int plugin_unregister_read */
int plugin_unregister_write (const char *name)
{
stop_read_threads ();
destroy_all_callbacks (&list_init);
+
+ pthread_mutex_lock (&read_lock);
+ llist_destroy (read_list);
+ read_list = NULL;
+ pthread_mutex_unlock (&read_lock);
+
destroy_read_heap ();
plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,