**/
#include "collectd.h"
-#include "utils_complain.h"
-
-#include <ltdl.h>
-
-#if HAVE_PTHREAD_H
-# include <pthread.h>
-#endif
-
#include "common.h"
#include "plugin.h"
#include "configfile.h"
+#include "filter_chain.h"
#include "utils_avltree.h"
+#include "utils_cache.h"
+#include "utils_complain.h"
#include "utils_llist.h"
#include "utils_heap.h"
-#include "utils_cache.h"
-#include "filter_chain.h"
+#include "utils_time.h"
+
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
+#include <ltdl.h>
/*
* Private structures
char rf_group[DATA_MAX_NAME_LEN];
char rf_name[DATA_MAX_NAME_LEN];
int rf_type;
- struct timespec rf_interval;
- struct timespec rf_effective_interval;
- struct timespec rf_next_read;
+ cdtime_t rf_interval;
+ cdtime_t rf_effective_interval;
+ cdtime_t rf_next_read;
};
typedef struct read_func_s read_func_t;
return (0);
}
-static _Bool timeout_reached(struct timespec timeout)
-{
- struct timeval now;
- gettimeofday(&now, NULL);
- return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
-}
-
static void *plugin_read_thread (void __attribute__((unused)) *args)
{
while (read_loop != 0)
}
pthread_mutex_unlock (&read_lock);
- if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
+ if (rf->rf_interval == 0)
{
/* this should not happen, because the interval is set
* for each plugin when loading it
* XXX: issue a warning? */
- now = cdtime ();
-
- CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
-
+ rf->rf_interval = plugin_get_interval ();
rf->rf_effective_interval = rf->rf_interval;
- CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
+ rf->rf_next_read = cdtime ();
}
/* sleep until this entry is due,
* pthread_cond_timedwait returns. */
rc = 0;
while ((read_loop != 0)
- && !timeout_reached(rf->rf_next_read)
+ && (cdtime () < rf->rf_next_read)
&& rc == 0)
{
+ struct timespec ts = { 0 };
+
+ CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
+
rc = pthread_cond_timedwait (&read_cond, &read_lock,
- &rf->rf_next_read);
+ &ts);
}
/* Must hold `read_lock' when accessing `rf->rf_type'. */
* intervals in which it will be called. */
if (status != 0)
{
- rf->rf_effective_interval.tv_sec *= 2;
- rf->rf_effective_interval.tv_nsec *= 2;
- NORMALIZE_TIMESPEC (rf->rf_effective_interval);
-
- if (rf->rf_effective_interval.tv_sec >= 86400)
- {
- rf->rf_effective_interval.tv_sec = 86400;
- rf->rf_effective_interval.tv_nsec = 0;
- }
+ rf->rf_effective_interval *= 2;
+ if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
+ rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
NOTICE ("read-function of plugin `%s' failed. "
- "Will suspend it for %i seconds.",
+ "Will suspend it for %.3f seconds.",
rf->rf_name,
- (int) rf->rf_effective_interval.tv_sec);
+ CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
}
else
{
now = cdtime ();
DEBUG ("plugin_read_thread: Effective interval of the "
- "%s plugin is %i.%09i.",
+ "%s plugin is %.3f seconds.",
rf->rf_name,
- (int) rf->rf_effective_interval.tv_sec,
- (int) rf->rf_effective_interval.tv_nsec);
+ CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
/* Calculate the next (absolute) time at which this function
* should be called. */
- rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec
- + rf->rf_effective_interval.tv_sec;
- rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec
- + rf->rf_effective_interval.tv_nsec;
- NORMALIZE_TIMESPEC (rf->rf_next_read);
+ rf->rf_next_read += rf->rf_effective_interval;
/* Check, if `rf_next_read' is in the past. */
- if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now)
+ if (rf->rf_next_read < now)
{
/* `rf_next_read' is in the past. Insert `now'
* so this value doesn't trail off into the
* past too much. */
- CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
+ rf->rf_next_read = now;
}
- DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
+ DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.",
rf->rf_name,
- (int) rf->rf_next_read.tv_sec,
- (int) rf->rf_next_read.tv_nsec);
+ CDTIME_T_TO_DOUBLE (rf->rf_next_read));
/* Re-insert this read function into the heap again. */
c_heap_insert (read_heap, rf);
if (vl->time == 0)
vl->time = cdtime ();
- /* Once this gets dequeued by a write thread, we don't have access to
- * the thread context anymore. We therefore fill in the interval here,
- * if required. An alternative would be to copy the context and clone
- * the context in the write thread, but that seems overly complicated
- * for the interval alone. */
+ /* Fill in the interval from the thread context, if it is zero. */
if (vl->interval == 0)
{
plugin_ctx_t ctx = plugin_get_ctx ();
return (vl);
} /* }}} value_list_t *plugin_value_list_clone */
-static void plugin_write_queue_item_free (write_queue_t *q) /* {{{ */
-{
- if (q == NULL)
- return;
-
- plugin_value_list_free (q->vl);
- sfree (q);
-} /* }}} void plugin_write_queue_item_free */
-
static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
{
write_queue_t *q;
return (ENOMEM);
}
- /* store context of caller (read plugin); else, the write plugins
- * won't have the right interval settings available when actually
- * dispatching the value-list later on */
+ /* Store context of caller (read plugin); otherwise, it would not be
+ * available to the write plugins when actually dispatching the
+ * value-list later on. */
q->ctx = plugin_get_ctx ();
pthread_mutex_lock (&write_lock);
return (0);
} /* }}} int plugin_write_enqueue */
-static write_queue_t *plugin_write_dequeue (void) /* {{{ */
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
{
write_queue_t *q;
+ value_list_t *vl;
pthread_mutex_lock (&write_lock);
pthread_mutex_unlock (&write_lock);
- q->next = NULL;
+ (void) plugin_set_ctx (q->ctx);
- return (q);
+ vl = q->vl;
+ sfree (q);
+ return (vl);
} /* }}} value_list_t *plugin_write_dequeue */
static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
{
while (write_loop)
{
- write_queue_t *q = plugin_write_dequeue ();
- plugin_ctx_t old_ctx;
-
- if (q == NULL)
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
continue;
- old_ctx = plugin_set_ctx (q->ctx);
-
- if (q->vl != NULL)
- plugin_dispatch_values_internal (q->vl);
+ plugin_dispatch_values_internal (vl);
- plugin_write_queue_item_free (q);
- plugin_set_ctx (old_ctx);
+ plugin_value_list_free (vl);
}
pthread_exit (NULL);
pthread_mutex_lock (&write_lock);
i = 0;
- for (q = write_queue_head; q != NULL; q = q->next)
+ for (q = write_queue_head; q != NULL; )
{
+ write_queue_t *q1 = q;
plugin_value_list_free (q->vl);
- sfree (q);
+ q = q->next;
+ sfree (q1);
i++;
}
write_queue_head = NULL;
rf0 = arg0;
rf1 = arg1;
- if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec)
- return (-1);
- else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec)
- return (1);
- else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec)
+ if (rf0->rf_next_read < rf1->rf_next_read)
return (-1);
- else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec)
+ else if (rf0->rf_next_read > rf1->rf_next_read)
return (1);
else
return (0);
rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_SIMPLE;
- rf->rf_interval.tv_sec = 0;
- rf->rf_interval.tv_nsec = 0;
+ rf->rf_interval = 0;
rf->rf_effective_interval = rf->rf_interval;
status = plugin_insert_read (rf);
rf->rf_type = RF_COMPLEX;
if (interval != NULL)
{
- rf->rf_interval = *interval;
+ rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
}
else if (ctx.interval != 0)
{
- CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
+ rf->rf_interval = ctx.interval;
}
rf->rf_effective_interval = rf->rf_interval;
- DEBUG ("plugin_register_read: interval = %i.%09i",
- (int) rf->rf_interval.tv_sec,
- (int) rf->rf_interval.tv_nsec);
+ DEBUG ("plugin_register_read: interval = %.3f",
+ CDTIME_T_TO_DOUBLE (rf->rf_interval));
/* Set user data */
if (user_data == NULL)
return (-1);
}
- /* Assured by plugin_value_list_clone(). The write thread doesn't have
- * access to the thread context, so the interval has to be filled in
- * already. The time is determined at _enqueue_ time. */
+ /* Assured by plugin_value_list_clone(). The time is determined at
+ * _enqueue_ time. */
assert (vl->time != 0);
assert (vl->interval != 0);