return (-1);
}
- status = plugin_dispatch_values (&vl);
+ status = plugin_dispatch_values_async (&vl);
sfree (vl.values);
return (status);
} /* }}} jint cjni_api_dispatch_values */
+static jobject JNICALL cjni_api_get_ds (JNIEnv *jvm_env, /* {{{ */
+ jobject this, jobject o_string_type)
+{
+ const char *ds_name;
+ const data_set_t *ds;
+ jobject o_dataset;
+
+ ds_name = (*jvm_env)->GetStringUTFChars (jvm_env, o_string_type, 0);
+ if (ds_name == NULL)
+ {
+ ERROR ("java plugin: cjni_api_get_ds: GetStringUTFChars failed.");
+ return (NULL);
+ }
+
+ ds = plugin_get_ds (ds_name);
+ DEBUG ("java plugin: cjni_api_get_ds: "
+ "plugin_get_ds (%s) = %p;", ds_name, (void *) ds);
+
+ (*jvm_env)->ReleaseStringUTFChars (jvm_env, o_string_type, ds_name);
+
+ if (ds == NULL)
+ return (NULL);
+
+ o_dataset = ctoj_data_set (jvm_env, ds);
+ return (o_dataset);
+} /* }}} jint cjni_api_get_ds */
+
static JNINativeMethod jni_api_functions[] =
{
- { "DispatchValues", "(Lorg/collectd/protocol/ValueList;)I", cjni_api_dispatch_values }
+ { "DispatchValues", "(Lorg/collectd/protocol/ValueList;)I", cjni_api_dispatch_values },
+ { "GetDS", "(Ljava/lang/String;)Ljava/util/List;", cjni_api_get_ds }
};
static size_t jni_api_functions_num = sizeof (jni_api_functions)
/ sizeof (jni_api_functions[0]);
return (0);
} /* }}} int cjni_config */
-static int cjni_init_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
-{
- jmethodID constructor_id;
- int status;
-
- jp->class_ptr = (*jvm_env)->FindClass (jvm_env, jp->class_name);
- if (jp->class_ptr == NULL)
- {
- ERROR ("cjni_init_one_plugin: FindClass (%s) failed.",
- jp->class_name);
- return (-1);
- }
-
- constructor_id = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "<init>", "()V");
- if (constructor_id == NULL)
- {
- ERROR ("cjni_init_one_plugin: Could not find the constructor for `%s'.",
- jp->class_name);
- return (-1);
- }
-
- jp->object_ptr = (*jvm_env)->NewObject (jvm_env, jp->class_ptr,
- constructor_id);
- if (jp->object_ptr == NULL)
- {
- ERROR ("cjni_init_one_plugin: Could create a new `%s' object.",
- jp->class_name);
- return (-1);
- }
-
- jp->m_config = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "Config", "(Lorg/collectd/api/OConfigItem;)I");
- DEBUG ("java plugin: cjni_init_one_plugin: "
- "jp->class_name = %s; jp->m_config = %p;",
- jp->class_name, (void *) jp->m_config);
-
- jp->m_init = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "Init", "()I");
- DEBUG ("java plugin: cjni_init_one_plugin: "
- "jp->class_name = %s; jp->m_init = %p;",
- jp->class_name, (void *) jp->m_init);
-
- jp->m_read = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "Read", "()I");
- DEBUG ("java plugin: cjni_init_one_plugin: "
- "jp->class_name = %s; jp->m_read = %p;",
- jp->class_name, (void *) jp->m_read);
-
- jp->m_write = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "Write", "(Lorg/collectd/protocol/ValueList;)I");
- DEBUG ("java plugin: cjni_init_one_plugin: "
- "jp->class_name = %s; jp->m_write = %p;",
- jp->class_name, (void *) jp->m_write);
-
- jp->m_shutdown = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
- "Shutdown", "()I");
- DEBUG ("java plugin: cjni_init_one_plugin: "
- "jp->class_name = %s; jp->m_shutdown = %p;",
- jp->class_name, (void *) jp->m_shutdown);
-
- if (jp->ci != NULL)
- {
- if (jp->m_config == NULL)
- {
- WARNING ("java plugin: Configuration for the `%s' plugin is present, "
- "but plugin doesn't provide a configuration method.",
- jp->class_name);
- }
- else /* if (jp->m_config != NULL) */
- {
- jobject o_ocitem;
-
- o_ocitem = ctoj_oconfig_item (jvm_env, jp->ci);
- if (o_ocitem == NULL)
- {
- ERROR ("java plugin: Creating an OConfigItem object failed. "
- "Can't pass configuration information to the `%s' plugin!",
- jp->class_name);
- }
- else /* if (o_ocitem != NULL) */
- {
- status = (*jvm_env)->CallIntMethod (jvm_env,
- jp->object_ptr, jp->m_config, o_ocitem);
- if (status != 0)
- {
- ERROR ("java plugin: cjni_init_one_plugin: "
- "Configuring the `%s' object failed with status %i.",
- jp->class_name, status);
- (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
- return (-1);
- }
- (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
- } /* if (o_ocitem != NULL) */
- } /* if (jp->m_config != NULL) */
- } /* if (jp->ci != NULL) */
-
- if (jp->m_init != NULL)
- {
- status = (*jvm_env)->CallIntMethod (jvm_env, jp->object_ptr,
- jp->m_init);
- if (status != 0)
- {
- ERROR ("java plugin: cjni_init_one_plugin: "
- "Initializing `%s' object failed with status %i.",
- jp->class_name, status);
- return (-1);
- }
- }
- jp->flags |= CJNI_FLAG_ENABLED;
-
- return (0);
-} /* }}} int cjni_init_one_plugin */
-
-static int cjni_init_plugins (JNIEnv *jvm_env) /* {{{ */
-{
- size_t j;
-
- for (j = 0; j < java_plugins_num; j++)
- cjni_init_one_plugin (jvm_env, &java_plugins[j]);
-
- return (0);
-} /* }}} int cjni_init_plugins */
-
-static int cjni_init_native (JNIEnv *jvm_env) /* {{{ */
-{
- jclass api_class_ptr;
- int status;
-
- api_class_ptr = (*jvm_env)->FindClass (jvm_env, "org.collectd.api.CollectdAPI");
- if (api_class_ptr == NULL)
- {
- ERROR ("cjni_init_native: Cannot find API class `org.collectd.api.CollectdAPI'.");
- return (-1);
- }
-
- status = (*jvm_env)->RegisterNatives (jvm_env, api_class_ptr,
- jni_api_functions, (jint) jni_api_functions_num);
- if (status != 0)
- {
- ERROR ("cjni_init_native: RegisterNatives failed with status %i.", status);
- return (-1);
- }
-
- return (0);
-} /* }}} int cjni_init_native */
-
-static int cjni_init (void) /* {{{ */
-{
- JNIEnv *jvm_env;
- JavaVMInitArgs vm_args;
- JavaVMOption vm_options[jvm_argc];
-
- int status;
- size_t i;
-
- if (jvm != NULL)
- return (0);
-
- jvm_env = NULL;
-
- memset (&vm_args, 0, sizeof (vm_args));
- vm_args.version = JNI_VERSION_1_2;
- vm_args.options = vm_options;
- vm_args.nOptions = (jint) jvm_argc;
-
- for (i = 0; i < jvm_argc; i++)
- {
- DEBUG ("java plugin: cjni_init: jvm_argv[%zu] = %s", i, jvm_argv[i]);
- vm_args.options[i].optionString = jvm_argv[i];
- }
- /*
- vm_args.options[0].optionString = "-verbose:jni";
- vm_args.options[1].optionString = "-Djava.class.path=/home/octo/collectd/bindings/java";
- */
-
- status = JNI_CreateJavaVM (&jvm, (void **) &jvm_env, (void **) &vm_args);
- if (status != 0)
- {
- ERROR ("cjni_init: JNI_CreateJavaVM failed with status %i.",
- status);
- return (-1);
- }
- assert (jvm != NULL);
- assert (jvm_env != NULL);
-
- /* Call RegisterNatives */
- status = cjni_init_native (jvm_env);
- if (status != 0)
- {
- ERROR ("cjni_init: cjni_init_native failed.");
- return (-1);
- }
-
- cjni_init_plugins (jvm_env);
-
- return (0);
-} /* }}} int cjni_init */
-
static int cjni_read_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
{
int status;
return (0);
} /* }}} int cjni_shutdown */
+static int cjni_init_one_plugin (JNIEnv *jvm_env, java_plugin_t *jp) /* {{{ */
+{
+ jmethodID constructor_id;
+ int status;
+
+ jp->class_ptr = (*jvm_env)->FindClass (jvm_env, jp->class_name);
+ if (jp->class_ptr == NULL)
+ {
+ ERROR ("cjni_init_one_plugin: FindClass (%s) failed.",
+ jp->class_name);
+ return (-1);
+ }
+
+ constructor_id = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "<init>", "()V");
+ if (constructor_id == NULL)
+ {
+ ERROR ("cjni_init_one_plugin: Could not find the constructor for `%s'.",
+ jp->class_name);
+ return (-1);
+ }
+
+ jp->object_ptr = (*jvm_env)->NewObject (jvm_env, jp->class_ptr,
+ constructor_id);
+ if (jp->object_ptr == NULL)
+ {
+ ERROR ("cjni_init_one_plugin: Could create a new `%s' object.",
+ jp->class_name);
+ return (-1);
+ }
+
+ jp->m_config = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "Config", "(Lorg/collectd/api/OConfigItem;)I");
+ DEBUG ("java plugin: cjni_init_one_plugin: "
+ "jp->class_name = %s; jp->m_config = %p;",
+ jp->class_name, (void *) jp->m_config);
+
+ jp->m_init = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "Init", "()I");
+ DEBUG ("java plugin: cjni_init_one_plugin: "
+ "jp->class_name = %s; jp->m_init = %p;",
+ jp->class_name, (void *) jp->m_init);
+
+ jp->m_read = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "Read", "()I");
+ DEBUG ("java plugin: cjni_init_one_plugin: "
+ "jp->class_name = %s; jp->m_read = %p;",
+ jp->class_name, (void *) jp->m_read);
+
+ jp->m_write = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "Write", "(Lorg/collectd/protocol/ValueList;)I");
+ DEBUG ("java plugin: cjni_init_one_plugin: "
+ "jp->class_name = %s; jp->m_write = %p;",
+ jp->class_name, (void *) jp->m_write);
+
+ jp->m_shutdown = (*jvm_env)->GetMethodID (jvm_env, jp->class_ptr,
+ "Shutdown", "()I");
+ DEBUG ("java plugin: cjni_init_one_plugin: "
+ "jp->class_name = %s; jp->m_shutdown = %p;",
+ jp->class_name, (void *) jp->m_shutdown);
+
+ if (jp->ci != NULL)
+ {
+ if (jp->m_config == NULL)
+ {
+ WARNING ("java plugin: Configuration for the `%s' plugin is present, "
+ "but plugin doesn't provide a configuration method.",
+ jp->class_name);
+ }
+ else /* if (jp->m_config != NULL) */
+ {
+ jobject o_ocitem;
+
+ o_ocitem = ctoj_oconfig_item (jvm_env, jp->ci);
+ if (o_ocitem == NULL)
+ {
+ ERROR ("java plugin: Creating an OConfigItem object failed. "
+ "Can't pass configuration information to the `%s' plugin!",
+ jp->class_name);
+ }
+ else /* if (o_ocitem != NULL) */
+ {
+ status = (*jvm_env)->CallIntMethod (jvm_env,
+ jp->object_ptr, jp->m_config, o_ocitem);
+ if (status != 0)
+ {
+ ERROR ("java plugin: cjni_init_one_plugin: "
+ "Configuring the `%s' object failed with status %i.",
+ jp->class_name, status);
+ (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
+ return (-1);
+ }
+ (*jvm_env)->DeleteLocalRef (jvm_env, o_ocitem);
+ } /* if (o_ocitem != NULL) */
+ } /* if (jp->m_config != NULL) */
+ } /* if (jp->ci != NULL) */
+
+ if (jp->m_init != NULL)
+ {
+ status = (*jvm_env)->CallIntMethod (jvm_env, jp->object_ptr,
+ jp->m_init);
+ if (status != 0)
+ {
+ ERROR ("java plugin: cjni_init_one_plugin: "
+ "Initializing `%s' object failed with status %i.",
+ jp->class_name, status);
+ return (-1);
+ }
+ }
+ jp->flags |= CJNI_FLAG_ENABLED;
+
+ return (0);
+} /* }}} int cjni_init_one_plugin */
+
+static int cjni_init_plugins (JNIEnv *jvm_env) /* {{{ */
+{
+ size_t j;
+
+ int have_read;
+ int have_write;
+ int have_shutdown;
+
+ have_read = 0;
+ have_write = 0;
+ have_shutdown = 0;
+
+ for (j = 0; j < java_plugins_num; j++)
+ {
+ cjni_init_one_plugin (jvm_env, &java_plugins[j]);
+
+ if (java_plugins[j].m_read != NULL)
+ have_read++;
+ if (java_plugins[j].m_write != NULL)
+ have_write++;
+ if (java_plugins[j].m_shutdown != NULL)
+ have_shutdown++;
+ }
+
+ if (have_read > 0)
+ plugin_register_read ("java", cjni_read);
+ if (have_write > 0)
+ plugin_register_write ("java", cjni_write);
+ if (have_shutdown > 0)
+ plugin_register_shutdown ("java", cjni_shutdown);
+
+
+ return (0);
+} /* }}} int cjni_init_plugins */
+
+static int cjni_init_native (JNIEnv *jvm_env) /* {{{ */
+{
+ jclass api_class_ptr;
+ int status;
+
+ api_class_ptr = (*jvm_env)->FindClass (jvm_env, "org.collectd.api.CollectdAPI");
+ if (api_class_ptr == NULL)
+ {
+ ERROR ("cjni_init_native: Cannot find API class `org.collectd.api.CollectdAPI'.");
+ return (-1);
+ }
+
+ status = (*jvm_env)->RegisterNatives (jvm_env, api_class_ptr,
+ jni_api_functions, (jint) jni_api_functions_num);
+ if (status != 0)
+ {
+ ERROR ("cjni_init_native: RegisterNatives failed with status %i.", status);
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int cjni_init_native */
+
+static int cjni_init (void) /* {{{ */
+{
+ JNIEnv *jvm_env;
+ JavaVMInitArgs vm_args;
+ JavaVMOption vm_options[jvm_argc];
+
+ int status;
+ size_t i;
+
+ if (jvm != NULL)
+ return (0);
+
+ jvm_env = NULL;
+
+ memset (&vm_args, 0, sizeof (vm_args));
+ vm_args.version = JNI_VERSION_1_2;
+ vm_args.options = vm_options;
+ vm_args.nOptions = (jint) jvm_argc;
+
+ for (i = 0; i < jvm_argc; i++)
+ {
+ DEBUG ("java plugin: cjni_init: jvm_argv[%zu] = %s", i, jvm_argv[i]);
+ vm_args.options[i].optionString = jvm_argv[i];
+ }
+ /*
+ vm_args.options[0].optionString = "-verbose:jni";
+ vm_args.options[1].optionString = "-Djava.class.path=/home/octo/collectd/bindings/java";
+ */
+
+ status = JNI_CreateJavaVM (&jvm, (void **) &jvm_env, (void **) &vm_args);
+ if (status != 0)
+ {
+ ERROR ("cjni_init: JNI_CreateJavaVM failed with status %i.",
+ status);
+ return (-1);
+ }
+ assert (jvm != NULL);
+ assert (jvm_env != NULL);
+
+ /* Call RegisterNatives */
+ status = cjni_init_native (jvm_env);
+ if (status != 0)
+ {
+ ERROR ("cjni_init: cjni_init_native failed.");
+ return (-1);
+ }
+
+ cjni_init_plugins (jvm_env);
+
+ return (0);
+} /* }}} int cjni_init */
+
void module_register (void)
{
plugin_register_complex_config ("java", cjni_config);
plugin_register_init ("java", cjni_init);
- plugin_register_read ("java", cjni_read);
- plugin_register_write ("java", cjni_write);
- plugin_register_shutdown ("java", cjni_shutdown);
} /* void module_register (void) */
/* vim: set sw=2 sts=2 et fdm=marker : */
};
typedef struct read_func_s read_func_t;
+struct dispatch_queue_s;
+typedef struct dispatch_queue_s dispatch_queue_t;
+struct dispatch_queue_s
+{
+ value_list_t *vl;
+ dispatch_queue_t *next;
+};
+
/*
* Private variables
*/
static char *plugindir = NULL;
-static int read_loop = 1;
+static int read_loop = 0;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
-static int read_threads_num = 0;
+static size_t read_threads_num = 0;
+
+static int dispatch_loop = 0;
+static pthread_mutex_t dispatch_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t dispatch_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t *dispatch_threads = NULL;
+static size_t dispatch_threads_num = 0;
+static dispatch_queue_t *dispatch_head;
+static dispatch_queue_t *dispatch_tail;
/*
* Static functions
return ((void *) 0);
} /* void *plugin_read_thread */
-static void start_threads (int num)
+static void *plugin_dispatch_thread (void *arg)
{
- int i;
+ pthread_mutex_lock (&dispatch_lock);
- if (read_threads != NULL)
- return;
+ while ((dispatch_loop != 0) || (dispatch_head != NULL))
+ {
+ dispatch_queue_t *qi;
+
+ if (dispatch_head == NULL)
+ pthread_cond_wait (&dispatch_cond, &dispatch_lock);
+
+ if (dispatch_head == NULL)
+ continue;
+
+ qi = dispatch_head;
+
+ if (dispatch_head == dispatch_tail)
+ {
+ dispatch_head = NULL;
+ dispatch_tail = NULL;
+ }
+ else
+ {
+ dispatch_head = qi->next;
+ }
+
+ pthread_mutex_unlock (&dispatch_lock);
- read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
- if (read_threads == NULL)
+ plugin_dispatch_values (qi->vl);
+ sfree (qi->vl->values);
+ sfree (qi->vl);
+ sfree (qi);
+
+ pthread_mutex_lock (&dispatch_lock);
+ } /* while */
+
+ pthread_mutex_unlock (&dispatch_lock);
+
+ return ((void *) 0);
+} /* void *plugin_dispatch_thread */
+
+static int plugin_start_threads (size_t num,
+ pthread_t **tlist, size_t *tlist_num,
+ void *(*thread_main) (void *))
+{
+ size_t i;
+ pthread_t *new_tlist;
+ size_t new_tlist_num;
+
+ if (*tlist != NULL)
+ return (-1);
+
+ new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist));
+ if (new_tlist == NULL)
{
- ERROR ("plugin: start_threads: calloc failed.");
- return;
+ ERROR ("plugin: plugin_start_threads: calloc failed.");
+ return (-1);
}
- read_threads_num = 0;
+ new_tlist_num = 0;
for (i = 0; i < num; i++)
{
- if (pthread_create (read_threads + read_threads_num, NULL,
- plugin_read_thread, NULL) == 0)
- {
- read_threads_num++;
- }
- else
+ int status;
+
+ status = pthread_create (new_tlist + new_tlist_num,
+ /* attr = */ NULL, thread_main,
+ /* arg = */ (void *) 0);
+ if (status != 0)
{
- ERROR ("plugin: start_threads: pthread_create failed.");
- return;
+ ERROR ("plugin: plugin_start_threads: "
+ "pthread_create failed.");
+ continue;
}
+
+ new_tlist_num++;
} /* for (i) */
-} /* void start_threads */
-static void stop_threads (void)
+ if (new_tlist_num < 1)
+ {
+ ERROR ("plugin: plugin_start_threads: "
+ "Creating threads failed.");
+ sfree (new_tlist);
+ return (-1);
+ }
+
+ *tlist = new_tlist;
+ *tlist_num = new_tlist_num;
+
+ return (0);
+} /* int plugin_start_threads */
+
+static int plugin_stop_threads (int *loop,
+ pthread_mutex_t *lock, pthread_cond_t *cond,
+ pthread_t **ret_tlist, size_t *ret_tlist_len)
{
- int i;
+ pthread_t *tlist;
+ size_t tlist_len;
+ size_t i;
- if (read_threads == NULL)
- return;
+ tlist = *ret_tlist;
+ tlist_len = *ret_tlist_len;
- pthread_mutex_lock (&read_lock);
- read_loop = 0;
- DEBUG ("plugin: stop_threads: Signalling `read_cond'");
- pthread_cond_broadcast (&read_cond);
- pthread_mutex_unlock (&read_lock);
+ if (tlist == NULL)
+ return (0);
+
+ DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len);
+
+ pthread_mutex_lock (lock);
+
+ *loop = 0;
+
+ pthread_cond_broadcast (cond);
+ pthread_mutex_unlock (lock);
- for (i = 0; i < read_threads_num; i++)
+ for (i = 0; i < tlist_len; i++)
{
- if (pthread_join (read_threads[i], NULL) != 0)
+ int status;
+
+ status = pthread_join (tlist[i], NULL);
+ if (status != 0)
{
- ERROR ("plugin: stop_threads: pthread_join failed.");
+ ERROR ("plugin_stop_threads: pthread_join failed.");
}
- read_threads[i] = (pthread_t) 0;
}
- sfree (read_threads);
- read_threads_num = 0;
-} /* void stop_threads */
+
+ sfree (tlist);
+
+ *ret_tlist = NULL;
+ *ret_tlist_len = 0;
+
+ return (0);
+} /* int plugin_stop_threads */
/*
* Public functions
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
- start_threads ((num > 0) ? num : 5);
+ {
+ read_loop = 1;
+ plugin_start_threads ((num > 0) ? ((size_t ) num) : 5,
+ &read_threads, &read_threads_num,
+ plugin_read_thread);
+ }
}
+
+ dispatch_loop = 1;
+ plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */
+ &dispatch_threads, &dispatch_threads_num,
+ plugin_dispatch_thread);
} /* void plugin_init_all */
void plugin_read_all (void)
int (*callback) (void);
llentry_t *le;
- stop_threads ();
+ plugin_stop_threads (&read_loop, &read_lock, &read_cond,
+ &read_threads, &read_threads_num);
+
+ plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond,
+ &dispatch_threads, &dispatch_threads_num);
if (list_shutdown == NULL)
return;
return (0);
} /* int plugin_dispatch_values */
+int plugin_dispatch_values_async (const value_list_t *vl)
+{
+ dispatch_queue_t *qi;
+ int i;
+
+ if (vl == NULL)
+ return (-EINVAL);
+
+ if (dispatch_threads_num < 1)
+ {
+ ERROR ("plugin_dispatch_values_async: "
+ "No dispatch threads have been started!");
+#ifdef ENOTCONN
+ return (-ENOTCONN);
+#else
+ return (-1);
+#endif
+ }
+
+ qi = (dispatch_queue_t *) malloc (sizeof (*qi));
+ if (qi == NULL)
+ {
+ ERROR ("plugin_dispatch_values_async: malloc failed.");
+ return (-ENOMEM);
+ }
+ memset (qi, 0, sizeof (*qi));
+ qi->next = NULL;
+
+ qi->vl = (value_list_t *) malloc (sizeof (value_list_t));
+ if (qi->vl == NULL)
+ {
+ ERROR ("plugin_dispatch_values_async: malloc failed.");
+ sfree (qi);
+ return (-ENOMEM);
+ }
+ memcpy (qi->vl, vl, sizeof (value_list_t));
+ qi->vl->values = NULL;
+
+ qi->vl->values = (value_t *) calloc (qi->vl->values_len,
+ sizeof (value_t));
+ if (qi->vl->values == NULL)
+ {
+ ERROR ("plugin_dispatch_values_async: malloc failed.");
+ sfree (qi->vl);
+ sfree (qi);
+ return (-ENOMEM);
+ }
+
+ for (i = 0; i < vl->values_len; i++)
+ qi->vl->values[i] = vl->values[i];
+
+ pthread_mutex_lock (&dispatch_lock);
+
+ if (dispatch_tail == NULL)
+ {
+ dispatch_head = qi;
+ dispatch_tail = qi;
+ }
+ else
+ {
+ dispatch_tail->next = qi;
+ dispatch_tail = qi;
+ }
+
+ pthread_cond_signal (&dispatch_cond);
+ pthread_mutex_unlock (&dispatch_lock);
+
+ return (0);
+} /* int plugin_dispatch_values_async */
+
int plugin_dispatch_notification (const notification_t *notif)
{
int (*callback) (const notification_t *);