Merge branch 'ff/java' of /var/lib/git/collectd into ff/java
authorFlorian Forster <octo@huhu.verplant.org>
Fri, 20 Feb 2009 10:13:38 +0000 (11:13 +0100)
committerFlorian Forster <octo@huhu.verplant.org>
Fri, 20 Feb 2009 10:13:38 +0000 (11:13 +0100)
bindings/java/org/collectd/api/CollectdAPI.java
src/java.c
src/plugin.c
src/plugin.h

index bd8d950..67c8e6a 100644 (file)
 
 package org.collectd.api;
 
+import java.util.List;
 import org.collectd.protocol.ValueList;
+import org.collectd.protocol.DataSource;
 
 public class CollectdAPI
 {
   native public static int DispatchValues (ValueList vl);
+
+  native public static List<DataSource> GetDS (String type);
 } /* class CollectdAPI */
 
 /* vim: set sw=2 sts=2 et fdm=marker : */
index a1e4971..74760e4 100644 (file)
@@ -1118,16 +1118,44 @@ static jint JNICALL cjni_api_dispatch_values (JNIEnv *jvm_env, /* {{{ */
     return (-1);
   }
 
-  status = plugin_dispatch_values (&vl);
+  status = plugin_dispatch_values_async (&vl);
 
   sfree (vl.values);
 
   return (status);
 } /* }}} jint cjni_api_dispatch_values */
 
+static jobject JNICALL cjni_api_get_ds (JNIEnv *jvm_env, /* {{{ */
+    jobject this, jobject o_string_type)
+{
+  const char *ds_name;
+  const data_set_t *ds;
+  jobject o_dataset;
+
+  ds_name = (*jvm_env)->GetStringUTFChars (jvm_env, o_string_type, 0);
+  if (ds_name == NULL)
+  {
+    ERROR ("java plugin: cjni_api_get_ds: GetStringUTFChars failed.");
+    return (NULL);
+  }
+
+  ds = plugin_get_ds (ds_name);
+  DEBUG ("java plugin: cjni_api_get_ds: "
+      "plugin_get_ds (%s) = %p;", ds_name, (void *) ds);
+
+  (*jvm_env)->ReleaseStringUTFChars (jvm_env, o_string_type, ds_name);
+
+  if (ds == NULL)
+    return (NULL);
+
+  o_dataset = ctoj_data_set (jvm_env, ds);
+  return (o_dataset);
+} /* }}} jint cjni_api_get_ds */
+
 static JNINativeMethod jni_api_functions[] =
 {
-  { "DispatchValues", "(Lorg/collectd/protocol/ValueList;)I", cjni_api_dispatch_values }
+  { "DispatchValues", "(Lorg/collectd/protocol/ValueList;)I", cjni_api_dispatch_values },
+  { "GetDS",          "(Ljava/lang/String;)Ljava/util/List;", cjni_api_get_ds }
 };
 static size_t jni_api_functions_num = sizeof (jni_api_functions)
   / sizeof (jni_api_functions[0]);
@@ -1312,205 +1340,6 @@ static int cjni_config (oconfig_item_t *ci) /* {{{ */
   return (0);
 } /* }}} int cjni_config */
 
-static int cjni_init_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
-{
-  jmethodID constructor_id;
-  int status;
-
-  jp->class_ptr = (*jvm_env)->FindClass (jvm_env, jp->class_name);
-  if (jp->class_ptr == NULL)
-  {
-    ERROR ("cjni_init_one_plugin: FindClass (%s) failed.",
-        jp->class_name);
-    return (-1);
-  }
-
-  constructor_id = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "<init>", "()V");
-  if (constructor_id == NULL)
-  {
-    ERROR ("cjni_init_one_plugin: Could not find the constructor for `%s'.",
-        jp->class_name);
-    return (-1);
-  }
-
-  jp->object_ptr = (*jvm_env)->NewObject (jvm_env, jp->class_ptr,
-      constructor_id);
-  if (jp->object_ptr == NULL)
-  {
-    ERROR ("cjni_init_one_plugin: Could create a new `%s' object.",
-        jp->class_name);
-    return (-1);
-  }
-
-  jp->m_config = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "Config", "(Lorg/collectd/api/OConfigItem;)I");
-  DEBUG ("java plugin: cjni_init_one_plugin: "
-      "jp->class_name = %s; jp->m_config = %p;",
-      jp->class_name, (void *) jp->m_config);
-
-  jp->m_init = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "Init", "()I");
-  DEBUG ("java plugin: cjni_init_one_plugin: "
-      "jp->class_name = %s; jp->m_init = %p;",
-      jp->class_name, (void *) jp->m_init);
-
-  jp->m_read = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "Read", "()I");
-  DEBUG ("java plugin: cjni_init_one_plugin: "
-      "jp->class_name = %s; jp->m_read = %p;",
-      jp->class_name, (void *) jp->m_read);
-
-  jp->m_write = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "Write", "(Lorg/collectd/protocol/ValueList;)I");
-  DEBUG ("java plugin: cjni_init_one_plugin: "
-      "jp->class_name = %s; jp->m_write = %p;",
-      jp->class_name, (void *) jp->m_write);
-
-  jp->m_shutdown = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
-      "Shutdown", "()I");
-  DEBUG ("java plugin: cjni_init_one_plugin: "
-      "jp->class_name = %s; jp->m_shutdown = %p;",
-      jp->class_name, (void *) jp->m_shutdown);
-
-  if (jp->ci != NULL)
-  {
-    if (jp->m_config == NULL)
-    {
-      WARNING ("java plugin: Configuration for the `%s' plugin is present, "
-          "but plugin doesn't provide a configuration method.",
-          jp->class_name);
-    }
-    else /* if (jp->m_config != NULL) */
-    {
-      jobject o_ocitem;
-
-      o_ocitem = ctoj_oconfig_item (jvm_env, jp->ci);
-      if (o_ocitem == NULL)
-      {
-        ERROR ("java plugin: Creating an OConfigItem object failed. "
-            "Can't pass configuration information to the `%s' plugin!",
-            jp->class_name);
-      }
-      else /* if (o_ocitem != NULL) */
-      {
-        status = (*jvm_env)->CallIntMethod (jvm_env,
-            jp->object_ptr, jp->m_config, o_ocitem);
-        if (status != 0)
-        {
-          ERROR ("java plugin: cjni_init_one_plugin: "
-              "Configuring the `%s' object failed with status %i.",
-              jp->class_name, status);
-          (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
-          return (-1);
-        }
-        (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
-      } /* if (o_ocitem != NULL) */
-    } /* if (jp->m_config != NULL) */
-  } /* if (jp->ci != NULL) */
-
-  if (jp->m_init != NULL)
-  {
-    status = (*jvm_env)->CallIntMethod (jvm_env, jp->object_ptr,
-        jp->m_init);
-    if (status != 0)
-    {
-      ERROR ("java plugin: cjni_init_one_plugin: "
-        "Initializing `%s' object failed with status %i.",
-        jp->class_name, status);
-      return (-1);
-    }
-  }
-  jp->flags |= CJNI_FLAG_ENABLED;
-
-  return (0);
-} /* }}} int cjni_init_one_plugin */
-
-static int cjni_init_plugins (JNIEnv *jvm_env) /* {{{ */
-{
-  size_t j;
-
-  for (j = 0; j < java_plugins_num; j++)
-    cjni_init_one_plugin (jvm_env, &java_plugins[j]);
-
-  return (0);
-} /* }}} int cjni_init_plugins */
-
-static int cjni_init_native (JNIEnv *jvm_env) /* {{{ */
-{
-  jclass api_class_ptr;
-  int status;
-
-  api_class_ptr = (*jvm_env)->FindClass (jvm_env, "org.collectd.api.CollectdAPI");
-  if (api_class_ptr == NULL)
-  {
-    ERROR ("cjni_init_native: Cannot find API class `org.collectd.api.CollectdAPI'.");
-    return (-1);
-  }
-
-  status = (*jvm_env)->RegisterNatives (jvm_env, api_class_ptr,
-      jni_api_functions, (jint) jni_api_functions_num);
-  if (status != 0)
-  {
-    ERROR ("cjni_init_native: RegisterNatives failed with status %i.", status);
-    return (-1);
-  }
-
-  return (0);
-} /* }}} int cjni_init_native */
-
-static int cjni_init (void) /* {{{ */
-{
-  JNIEnv *jvm_env;
-  JavaVMInitArgs vm_args;
-  JavaVMOption vm_options[jvm_argc];
-
-  int status;
-  size_t i;
-
-  if (jvm != NULL)
-    return (0);
-
-  jvm_env = NULL;
-
-  memset (&vm_args, 0, sizeof (vm_args));
-  vm_args.version = JNI_VERSION_1_2;
-  vm_args.options = vm_options;
-  vm_args.nOptions = (jint) jvm_argc;
-
-  for (i = 0; i < jvm_argc; i++)
-  {
-    DEBUG ("java plugin: cjni_init: jvm_argv[%zu] = %s", i, jvm_argv[i]);
-    vm_args.options[i].optionString = jvm_argv[i];
-  }
-  /*
-  vm_args.options[0].optionString = "-verbose:jni";
-  vm_args.options[1].optionString = "-Djava.class.path=/home/octo/collectd/bindings/java";
-  */
-
-  status = JNI_CreateJavaVM (&jvm, (void **) &jvm_env, (void **) &vm_args);
-  if (status != 0)
-  {
-    ERROR ("cjni_init: JNI_CreateJavaVM failed with status %i.",
-       status);
-    return (-1);
-  }
-  assert (jvm != NULL);
-  assert (jvm_env != NULL);
-
-  /* Call RegisterNatives */
-  status = cjni_init_native (jvm_env);
-  if (status != 0)
-  {
-    ERROR ("cjni_init: cjni_init_native failed.");
-    return (-1);
-  }
-
-  cjni_init_plugins (jvm_env);
-
-  return (0);
-} /* }}} int cjni_init */
-
 static int cjni_read_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
 {
   int status;
@@ -1745,13 +1574,234 @@ static int cjni_shutdown (void) /* {{{ */
   return (0);
 } /* }}} int cjni_shutdown */
 
+static int cjni_init_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
+{
+  jmethodID constructor_id;
+  int status;
+
+  jp->class_ptr = (*jvm_env)->FindClass (jvm_env, jp->class_name);
+  if (jp->class_ptr == NULL)
+  {
+    ERROR ("cjni_init_one_plugin: FindClass (%s) failed.",
+        jp->class_name);
+    return (-1);
+  }
+
+  constructor_id = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "<init>", "()V");
+  if (constructor_id == NULL)
+  {
+    ERROR ("cjni_init_one_plugin: Could not find the constructor for `%s'.",
+        jp->class_name);
+    return (-1);
+  }
+
+  jp->object_ptr = (*jvm_env)->NewObject (jvm_env, jp->class_ptr,
+      constructor_id);
+  if (jp->object_ptr == NULL)
+  {
+    ERROR ("cjni_init_one_plugin: Could create a new `%s' object.",
+        jp->class_name);
+    return (-1);
+  }
+
+  jp->m_config = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "Config", "(Lorg/collectd/api/OConfigItem;)I");
+  DEBUG ("java plugin: cjni_init_one_plugin: "
+      "jp->class_name = %s; jp->m_config = %p;",
+      jp->class_name, (void *) jp->m_config);
+
+  jp->m_init = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "Init", "()I");
+  DEBUG ("java plugin: cjni_init_one_plugin: "
+      "jp->class_name = %s; jp->m_init = %p;",
+      jp->class_name, (void *) jp->m_init);
+
+  jp->m_read = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "Read", "()I");
+  DEBUG ("java plugin: cjni_init_one_plugin: "
+      "jp->class_name = %s; jp->m_read = %p;",
+      jp->class_name, (void *) jp->m_read);
+
+  jp->m_write = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "Write", "(Lorg/collectd/protocol/ValueList;)I");
+  DEBUG ("java plugin: cjni_init_one_plugin: "
+      "jp->class_name = %s; jp->m_write = %p;",
+      jp->class_name, (void *) jp->m_write);
+
+  jp->m_shutdown = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+      "Shutdown", "()I");
+  DEBUG ("java plugin: cjni_init_one_plugin: "
+      "jp->class_name = %s; jp->m_shutdown = %p;",
+      jp->class_name, (void *) jp->m_shutdown);
+
+  if (jp->ci != NULL)
+  {
+    if (jp->m_config == NULL)
+    {
+      WARNING ("java plugin: Configuration for the `%s' plugin is present, "
+          "but plugin doesn't provide a configuration method.",
+          jp->class_name);
+    }
+    else /* if (jp->m_config != NULL) */
+    {
+      jobject o_ocitem;
+
+      o_ocitem = ctoj_oconfig_item (jvm_env, jp->ci);
+      if (o_ocitem == NULL)
+      {
+        ERROR ("java plugin: Creating an OConfigItem object failed. "
+            "Can't pass configuration information to the `%s' plugin!",
+            jp->class_name);
+      }
+      else /* if (o_ocitem != NULL) */
+      {
+        status = (*jvm_env)->CallIntMethod (jvm_env,
+            jp->object_ptr, jp->m_config, o_ocitem);
+        if (status != 0)
+        {
+          ERROR ("java plugin: cjni_init_one_plugin: "
+              "Configuring the `%s' object failed with status %i.",
+              jp->class_name, status);
+          (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
+          return (-1);
+        }
+        (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
+      } /* if (o_ocitem != NULL) */
+    } /* if (jp->m_config != NULL) */
+  } /* if (jp->ci != NULL) */
+
+  if (jp->m_init != NULL)
+  {
+    status = (*jvm_env)->CallIntMethod (jvm_env, jp->object_ptr,
+        jp->m_init);
+    if (status != 0)
+    {
+      ERROR ("java plugin: cjni_init_one_plugin: "
+        "Initializing `%s' object failed with status %i.",
+        jp->class_name, status);
+      return (-1);
+    }
+  }
+  jp->flags |= CJNI_FLAG_ENABLED;
+
+  return (0);
+} /* }}} int cjni_init_one_plugin */
+
+static int cjni_init_plugins (JNIEnv *jvm_env) /* {{{ */
+{
+  size_t j;
+
+  int have_read;
+  int have_write;
+  int have_shutdown;
+
+  have_read = 0;
+  have_write = 0;
+  have_shutdown = 0;
+
+  for (j = 0; j < java_plugins_num; j++)
+  {
+    cjni_init_one_plugin (jvm_env, &java_plugins[j]);
+
+    if (java_plugins[j].m_read != NULL)
+      have_read++;
+    if (java_plugins[j].m_write != NULL)
+      have_write++;
+    if (java_plugins[j].m_shutdown != NULL)
+      have_shutdown++;
+  }
+
+  if (have_read > 0)
+    plugin_register_read ("java", cjni_read);
+  if (have_write > 0)
+    plugin_register_write ("java", cjni_write);
+  if (have_shutdown > 0)
+    plugin_register_shutdown ("java", cjni_shutdown);
+
+
+  return (0);
+} /* }}} int cjni_init_plugins */
+
+static int cjni_init_native (JNIEnv *jvm_env) /* {{{ */
+{
+  jclass api_class_ptr;
+  int status;
+
+  api_class_ptr = (*jvm_env)->FindClass (jvm_env, "org.collectd.api.CollectdAPI");
+  if (api_class_ptr == NULL)
+  {
+    ERROR ("cjni_init_native: Cannot find API class `org.collectd.api.CollectdAPI'.");
+    return (-1);
+  }
+
+  status = (*jvm_env)->RegisterNatives (jvm_env, api_class_ptr,
+      jni_api_functions, (jint) jni_api_functions_num);
+  if (status != 0)
+  {
+    ERROR ("cjni_init_native: RegisterNatives failed with status %i.", status);
+    return (-1);
+  }
+
+  return (0);
+} /* }}} int cjni_init_native */
+
+static int cjni_init (void) /* {{{ */
+{
+  JNIEnv *jvm_env;
+  JavaVMInitArgs vm_args;
+  JavaVMOption vm_options[jvm_argc];
+
+  int status;
+  size_t i;
+
+  if (jvm != NULL)
+    return (0);
+
+  jvm_env = NULL;
+
+  memset (&vm_args, 0, sizeof (vm_args));
+  vm_args.version = JNI_VERSION_1_2;
+  vm_args.options = vm_options;
+  vm_args.nOptions = (jint) jvm_argc;
+
+  for (i = 0; i < jvm_argc; i++)
+  {
+    DEBUG ("java plugin: cjni_init: jvm_argv[%zu] = %s", i, jvm_argv[i]);
+    vm_args.options[i].optionString = jvm_argv[i];
+  }
+  /*
+  vm_args.options[0].optionString = "-verbose:jni";
+  vm_args.options[1].optionString = "-Djava.class.path=/home/octo/collectd/bindings/java";
+  */
+
+  status = JNI_CreateJavaVM (&jvm, (void **) &jvm_env, (void **) &vm_args);
+  if (status != 0)
+  {
+    ERROR ("cjni_init: JNI_CreateJavaVM failed with status %i.",
+       status);
+    return (-1);
+  }
+  assert (jvm != NULL);
+  assert (jvm_env != NULL);
+
+  /* Call RegisterNatives */
+  status = cjni_init_native (jvm_env);
+  if (status != 0)
+  {
+    ERROR ("cjni_init: cjni_init_native failed.");
+    return (-1);
+  }
+
+  cjni_init_plugins (jvm_env);
+
+  return (0);
+} /* }}} int cjni_init */
+
 void module_register (void)
 {
   plugin_register_complex_config ("java", cjni_config);
   plugin_register_init ("java", cjni_init);
-  plugin_register_read ("java", cjni_read);
-  plugin_register_write ("java", cjni_write);
-  plugin_register_shutdown ("java", cjni_shutdown);
 } /* void module_register (void) */
 
 /* vim: set sw=2 sts=2 et fdm=marker : */
index 9f42f2e..6fd74e2 100644 (file)
@@ -50,6 +50,14 @@ struct read_func_s
 };
 typedef struct read_func_s read_func_t;
 
+struct dispatch_queue_s;
+typedef struct dispatch_queue_s dispatch_queue_t;
+struct dispatch_queue_s
+{
+       value_list_t *vl;
+       dispatch_queue_t *next;
+};
+
 /*
  * Private variables
  */
@@ -68,11 +76,19 @@ static c_avl_tree_t *data_sets;
 
 static char *plugindir = NULL;
 
-static int             read_loop = 1;
+static int             read_loop = 0;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
-static int             read_threads_num = 0;
+static size_t          read_threads_num = 0;
+
+static int               dispatch_loop = 0;
+static pthread_mutex_t   dispatch_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t    dispatch_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t        *dispatch_threads = NULL;
+static size_t            dispatch_threads_num = 0;
+static dispatch_queue_t *dispatch_head;
+static dispatch_queue_t *dispatch_tail;
 
 /*
  * Static functions
@@ -244,60 +260,138 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        return ((void *) 0);
 } /* void *plugin_read_thread */
 
-static void start_threads (int num)
+static void *plugin_dispatch_thread (void *arg)
 {
-       int i;
+       pthread_mutex_lock (&dispatch_lock);
 
-       if (read_threads != NULL)
-               return;
+       while ((dispatch_loop != 0) || (dispatch_head != NULL))
+       {
+               dispatch_queue_t *qi;
+
+               if (dispatch_head == NULL)
+                       pthread_cond_wait (&dispatch_cond, &dispatch_lock);
+
+               if (dispatch_head == NULL)
+                       continue;
+
+               qi = dispatch_head;
+
+               if (dispatch_head == dispatch_tail)
+               {
+                       dispatch_head = NULL;
+                       dispatch_tail = NULL;
+               }
+               else
+               {
+                       dispatch_head = qi->next;
+               }
+
+               pthread_mutex_unlock (&dispatch_lock);
 
-       read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
-       if (read_threads == NULL)
+               plugin_dispatch_values (qi->vl);
+               sfree (qi->vl->values);
+               sfree (qi->vl);
+               sfree (qi);
+
+               pthread_mutex_lock (&dispatch_lock);
+       } /* while */
+
+       pthread_mutex_unlock (&dispatch_lock);
+
+       return ((void *) 0);
+} /* void *plugin_dispatch_thread */
+
+static int plugin_start_threads (size_t num,
+               pthread_t **tlist, size_t *tlist_num,
+               void *(*thread_main) (void *))
+{
+       size_t i;
+       pthread_t *new_tlist;
+       size_t new_tlist_num;
+
+       if (*tlist != NULL)
+               return (-1);
+
+       new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist));
+       if (new_tlist == NULL)
        {
-               ERROR ("plugin: start_threads: calloc failed.");
-               return;
+               ERROR ("plugin: plugin_start_threads: calloc failed.");
+               return (-1);
        }
 
-       read_threads_num = 0;
+       new_tlist_num = 0;
        for (i = 0; i < num; i++)
        {
-               if (pthread_create (read_threads + read_threads_num, NULL,
-                                       plugin_read_thread, NULL) == 0)
-               {
-                       read_threads_num++;
-               }
-               else
+               int status;
+
+               status = pthread_create (new_tlist + new_tlist_num,
+                               /* attr = */ NULL, thread_main,
+                               /* arg = */ (void *) 0);
+               if (status != 0)
                {
-                       ERROR ("plugin: start_threads: pthread_create failed.");
-                       return;
+                       ERROR ("plugin: plugin_start_threads: "
+                                       "pthread_create failed.");
+                       continue;
                }
+
+               new_tlist_num++;
        } /* for (i) */
-} /* void start_threads */
 
-static void stop_threads (void)
+       if (new_tlist_num < 1)
+       {
+               ERROR ("plugin: plugin_start_threads: "
+                               "Creating threads failed.");
+               sfree (new_tlist);
+               return (-1);
+       }
+
+       *tlist     = new_tlist;
+       *tlist_num = new_tlist_num;
+
+       return (0);
+} /* int plugin_start_threads */
+
+static int plugin_stop_threads (int *loop,
+               pthread_mutex_t *lock, pthread_cond_t *cond,
+               pthread_t **ret_tlist, size_t *ret_tlist_len)
 {
-       int i;
+       pthread_t *tlist;
+       size_t tlist_len;
+       size_t i;
 
-       if (read_threads == NULL)
-               return;
+       tlist     = *ret_tlist;
+       tlist_len = *ret_tlist_len;
 
-       pthread_mutex_lock (&read_lock);
-       read_loop = 0;
-       DEBUG ("plugin: stop_threads: Signalling `read_cond'");
-       pthread_cond_broadcast (&read_cond);
-       pthread_mutex_unlock (&read_lock);
+       if (tlist == NULL)
+               return (0);
+
+       DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len);
+
+       pthread_mutex_lock (lock);
+
+       *loop = 0;
+
+       pthread_cond_broadcast (cond);
+       pthread_mutex_unlock (lock);
 
-       for (i = 0; i < read_threads_num; i++)
+       for (i = 0; i < tlist_len; i++)
        {
-               if (pthread_join (read_threads[i], NULL) != 0)
+               int status;
+
+               status = pthread_join (tlist[i], NULL);
+               if (status != 0)
                {
-                       ERROR ("pluginstop_threads: pthread_join failed.");
+                       ERROR ("plugin_stop_threads: pthread_join failed.");
                }
-               read_threads[i] = (pthread_t) 0;
        }
-       sfree (read_threads);
-       read_threads_num = 0;
-} /* void stop_threads */
+
+       sfree (tlist);
+
+       *ret_tlist = NULL;
+       *ret_tlist_len = 0;
+
+       return (0);
+} /* int plugin_stop_threads */
 
 /*
  * Public functions
@@ -640,8 +734,18 @@ void plugin_init_all (void)
                rt = global_option_get ("ReadThreads");
                num = atoi (rt);
                if (num != -1)
-                       start_threads ((num > 0) ? num : 5);
+               {
+                       read_loop = 1;
+                       plugin_start_threads ((num > 0) ? ((size_t ) num) : 5,
+                                       &read_threads, &read_threads_num,
+                                       plugin_read_thread);
+               }
        }
+
+       dispatch_loop = 1;
+       plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */
+                       &dispatch_threads, &dispatch_threads_num,
+                       plugin_dispatch_thread);
 } /* void plugin_init_all */
 
 void plugin_read_all (void)
@@ -812,7 +916,11 @@ void plugin_shutdown_all (void)
        int (*callback) (void);
        llentry_t *le;
 
-       stop_threads ();
+       plugin_stop_threads (&read_loop, &read_lock, &read_cond,
+                       &read_threads, &read_threads_num);
+
+       plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond,
+                       &dispatch_threads, &dispatch_threads_num);
 
        if (list_shutdown == NULL)
                return;
@@ -937,6 +1045,76 @@ int plugin_dispatch_values (value_list_t *vl)
        return (0);
 } /* int plugin_dispatch_values */
 
+int plugin_dispatch_values_async (const value_list_t *vl)
+{
+       dispatch_queue_t *qi;
+       int i;
+
+       if (vl == NULL)
+               return (-EINVAL);
+
+       if (dispatch_threads_num < 1)
+       {
+               ERROR ("plugin_dispatch_values_async: "
+                               "No dispatch threads have been started!");
+#ifdef ENOTCONN
+               return (-ENOTCONN);
+#else
+               return (-1);
+#endif
+       }
+
+       qi = (dispatch_queue_t *) malloc (sizeof (*qi));
+       if (qi == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               return (-ENOMEM);
+       }
+       memset (qi, 0, sizeof (*qi));
+       qi->next = NULL;
+
+       qi->vl = (value_list_t *) malloc (sizeof (value_list_t));
+       if (qi->vl == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               sfree (qi);
+               return (-ENOMEM);
+       }
+       memcpy (qi->vl, vl, sizeof (value_list_t));
+       qi->vl->values = NULL;
+
+       qi->vl->values = (value_t *) calloc (qi->vl->values_len,
+                       sizeof (value_t));
+       if (qi->vl->values == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               sfree (qi->vl);
+               sfree (qi);
+               return (-ENOMEM);
+       }
+
+       for (i = 0; i < vl->values_len; i++)
+               qi->vl->values[i] = vl->values[i];
+
+       pthread_mutex_lock (&dispatch_lock);
+
+       if (dispatch_tail == NULL)
+       {
+               dispatch_head = qi;
+               dispatch_tail = qi;
+       }
+       else
+       {
+               dispatch_tail->next = qi;
+               dispatch_tail = qi;
+       }
+
+       pthread_cond_signal (&dispatch_cond);
+       pthread_mutex_unlock (&dispatch_lock);
+
+       return (0);
+} /* int plugin_dispatch_values_async */
+
 int plugin_dispatch_notification (const notification_t *notif)
 {
        int (*callback) (const notification_t *);
index 3088e06..6c2c41e 100644 (file)
@@ -264,6 +264,7 @@ int plugin_unregister_notification (const char *name);
  *              function.
  */
 int plugin_dispatch_values (value_list_t *vl);
+int plugin_dispatch_values_async (const value_list_t *vl);
 
 int plugin_dispatch_notification (const notification_t *notif);