#include "plugin.h"
#include "common.h"
+#include "filter_chain.h"
+
#include <pthread.h>
#if !defined(USE_ITHREADS)
#define PLUGIN_CONFIG 254
#define PLUGIN_DATASET 255
+#define FC_MATCH 0
+#define FC_TARGET 1
+
+#define FC_TYPES 2
+
+#define FC_CB_CREATE 0
+#define FC_CB_DESTROY 1
+#define FC_CB_EXEC 2
+
+#define FC_CB_TYPES 3
+
#define log_debug(...) DEBUG ("perl: " __VA_ARGS__)
#define log_info(...) INFO ("perl: " __VA_ARGS__)
#define log_warn(...) WARNING ("perl: " __VA_ARGS__)
static XS (Collectd__plugin_flush);
static XS (Collectd_plugin_dispatch_notification);
static XS (Collectd_plugin_log);
+static XS (Collectd__fc_register);
static XS (Collectd_call_by_name);
/*
pthread_mutex_t mutex;
} c_ithread_list_t;
+/* name / user_data for Perl matches / targets */
+typedef struct {
+ char *name;
+ SV *user_data;
+} pfc_user_data_t;
+
+#define PFC_USER_DATA_FREE(data) \
+ do { \
+ sfree ((data)->name); \
+ if (NULL != (data)->user_data) \
+ sv_free ((data)->user_data); \
+ sfree (data); \
+ } while (0)
+
/*
* private variables
*/
{ "Collectd::plugin_dispatch_notification",
Collectd_plugin_dispatch_notification },
{ "Collectd::plugin_log", Collectd_plugin_log },
+ { "Collectd::_fc_register", Collectd__fc_register },
{ "Collectd::call_by_name", Collectd_call_by_name },
{ "", NULL }
};
int value;
} constants[] =
{
- { "Collectd::TYPE_INIT", PLUGIN_INIT },
- { "Collectd::TYPE_READ", PLUGIN_READ },
- { "Collectd::TYPE_WRITE", PLUGIN_WRITE },
- { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN },
- { "Collectd::TYPE_LOG", PLUGIN_LOG },
- { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF },
- { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH },
- { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG },
- { "Collectd::TYPE_DATASET", PLUGIN_DATASET },
- { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER },
- { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE },
- { "Collectd::LOG_ERR", LOG_ERR },
- { "Collectd::LOG_WARNING", LOG_WARNING },
- { "Collectd::LOG_NOTICE", LOG_NOTICE },
- { "Collectd::LOG_INFO", LOG_INFO },
- { "Collectd::LOG_DEBUG", LOG_DEBUG },
- { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE },
- { "Collectd::NOTIF_WARNING", NOTIF_WARNING },
- { "Collectd::NOTIF_OKAY", NOTIF_OKAY },
+ { "Collectd::TYPE_INIT", PLUGIN_INIT },
+ { "Collectd::TYPE_READ", PLUGIN_READ },
+ { "Collectd::TYPE_WRITE", PLUGIN_WRITE },
+ { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN },
+ { "Collectd::TYPE_LOG", PLUGIN_LOG },
+ { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF },
+ { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH },
+ { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG },
+ { "Collectd::TYPE_DATASET", PLUGIN_DATASET },
+ { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER },
+ { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE },
+ { "Collectd::LOG_ERR", LOG_ERR },
+ { "Collectd::LOG_WARNING", LOG_WARNING },
+ { "Collectd::LOG_NOTICE", LOG_NOTICE },
+ { "Collectd::LOG_INFO", LOG_INFO },
+ { "Collectd::LOG_DEBUG", LOG_DEBUG },
+ { "Collectd::FC_MATCH", FC_MATCH },
+ { "Collectd::FC_TARGET", FC_TARGET },
+ { "Collectd::FC_CB_CREATE", FC_CB_CREATE },
+ { "Collectd::FC_CB_DESTROY", FC_CB_DESTROY },
+ { "Collectd::FC_CB_EXEC", FC_CB_EXEC },
+ { "Collectd::FC_MATCH_NO_MATCH", FC_MATCH_NO_MATCH },
+ { "Collectd::FC_MATCH_MATCHES", FC_MATCH_MATCHES },
+ { "Collectd::FC_TARGET_CONTINUE", FC_TARGET_CONTINUE },
+ { "Collectd::FC_TARGET_STOP", FC_TARGET_STOP },
+ { "Collectd::FC_TARGET_RETURN", FC_TARGET_RETURN },
+ { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE },
+ { "Collectd::NOTIF_WARNING", NOTIF_WARNING },
+ { "Collectd::NOTIF_OKAY", NOTIF_OKAY },
{ "", 0 }
};
} /* static int pplugin_call_all (int, ...) */
/*
+ * collectd's perl interpreter based thread implementation.
+ *
+ * This has been inspired by Perl's ithreads introduced in version 5.6.0.
+ */
+
+/* must be called with perl_threads->mutex locked */
+static void c_ithread_destroy (c_ithread_t *ithread)
+{
+ dTHXa (ithread->interp);
+
+ assert (NULL != perl_threads);
+
+ PERL_SET_CONTEXT (aTHX);
+ log_debug ("Shutting down Perl interpreter %p...", aTHX);
+
+#if COLLECT_DEBUG
+ sv_report_used ();
+
+ --perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ perl_destruct (aTHX);
+ perl_free (aTHX);
+
+ if (NULL == ithread->prev)
+ perl_threads->head = ithread->next;
+ else
+ ithread->prev->next = ithread->next;
+
+ if (NULL == ithread->next)
+ perl_threads->tail = ithread->prev;
+ else
+ ithread->next->prev = ithread->prev;
+
+ sfree (ithread);
+ return;
+} /* static void c_ithread_destroy (c_ithread_t *) */
+
+static void c_ithread_destructor (void *arg)
+{
+ c_ithread_t *ithread = (c_ithread_t *)arg;
+ c_ithread_t *t = NULL;
+
+ if (NULL == perl_threads)
+ return;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+
+ for (t = perl_threads->head; NULL != t; t = t->next)
+ if (t == ithread)
+ break;
+
+ /* the ithread no longer exists */
+ if (NULL == t)
+ return;
+
+ c_ithread_destroy (ithread);
+
+ pthread_mutex_unlock (&perl_threads->mutex);
+ return;
+} /* static void c_ithread_destructor (void *) */
+
+/* must be called with perl_threads->mutex locked */
+static c_ithread_t *c_ithread_create (PerlInterpreter *base)
+{
+ c_ithread_t *t = NULL;
+ dTHXa (NULL);
+
+ assert (NULL != perl_threads);
+
+ t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
+ memset (t, 0, sizeof (c_ithread_t));
+
+ t->interp = (NULL == base)
+ ? NULL
+ : perl_clone (base, CLONEf_KEEP_PTR_TABLE);
+
+ aTHX = t->interp;
+
+ if ((NULL != base) && (NULL != PL_endav)) {
+ av_clear (PL_endav);
+ av_undef (PL_endav);
+ PL_endav = Nullav;
+ }
+
+#if COLLECT_DEBUG
+ ++perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ t->next = NULL;
+
+ if (NULL == perl_threads->tail) {
+ perl_threads->head = t;
+ t->prev = NULL;
+ }
+ else {
+ perl_threads->tail->next = t;
+ t->prev = perl_threads->tail;
+ }
+
+ perl_threads->tail = t;
+
+ pthread_setspecific (perl_thr_key, (const void *)t);
+ return t;
+} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
+
+/*
+ * Filter chains implementation.
+ */
+
+static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
+{
+ int retvals = 0;
+
+ va_list ap;
+ int ret = 0;
+
+ notification_meta_t **meta = NULL;
+ AV *pmeta = NULL;
+
+ dSP;
+
+ if ((type < 0) || (type >= FC_TYPES))
+ return -1;
+
+ if ((cb_type < 0) || (cb_type >= FC_CB_TYPES))
+ return -1;
+
+ va_start (ap, data);
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+
+ XPUSHs (sv_2mortal (newSViv ((IV)type)));
+ XPUSHs (sv_2mortal (newSVpv (data->name, 0)));
+ XPUSHs (sv_2mortal (newSViv ((IV)cb_type)));
+
+ if (FC_CB_CREATE == cb_type) {
+ /*
+ * $_[0] = $ci;
+ * $_[1] = $user_data;
+ */
+ oconfig_item_t *ci;
+ HV *config = newHV ();
+
+ ci = va_arg (ap, oconfig_item_t *);
+
+ if (0 != oconfig_item2hv (aTHX_ ci, config)) {
+ hv_clear (config);
+ hv_undef (config);
+ config = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)config)));
+ }
+ else if (FC_CB_DESTROY == cb_type) {
+ /*
+ * $_[1] = $user_data;
+ */
+
+ /* nothing to be done - the user data pointer
+ * is pushed onto the stack later */
+ }
+ else if (FC_CB_EXEC == cb_type) {
+ /*
+ * $_[0] = $ds;
+ * $_[1] = $vl;
+ * $_[2] = $meta;
+ * $_[3] = $user_data;
+ */
+ data_set_t *ds;
+ value_list_t *vl;
+
+ AV *pds = newAV ();
+ HV *pvl = newHV ();
+
+ ds = va_arg (ap, data_set_t *);
+ vl = va_arg (ap, value_list_t *);
+ meta = va_arg (ap, notification_meta_t **);
+
+ if (0 != data_set2av (aTHX_ ds, pds)) {
+ av_clear (pds);
+ av_undef (pds);
+ pds = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (0 != value_list2hv (aTHX_ vl, ds, pvl)) {
+ hv_clear (pvl);
+ hv_undef (pvl);
+ pvl = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (NULL != meta) {
+ pmeta = newAV ();
+
+ if (0 != notification_meta2av (aTHX_ *meta, pmeta)) {
+ av_clear (pmeta);
+ av_undef (pmeta);
+ pmeta = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+ }
+ else {
+ pmeta = (AV *)&PL_sv_undef;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pmeta)));
+ }
+
+ XPUSHs (sv_2mortal (newRV_inc (data->user_data)));
+
+ PUTBACK;
+
+ retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+
+ if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
+ assert (pmeta != NULL);
+
+ plugin_notification_meta_free (*meta);
+ av2notification_meta (aTHX_ pmeta, meta);
+ }
+
+ SPAGAIN;
+ if (0 < retvals) {
+ SV *tmp = POPs;
+
+ /* the exec callbacks return a status, while
+ * the others return a boolean value */
+ if (FC_CB_EXEC == cb_type)
+ ret = SvIV (tmp);
+ else if (! SvTRUE (tmp))
+ ret = -1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ va_end (ap);
+ return ret;
+} /* static int fc_call (int, int, pfc_user_data_t *, ...) */
+
+static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
+{
+ pfc_user_data_t *data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_create: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ if ((1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_warn ("A \"%s\" block expects a single string argument.",
+ (FC_MATCH == type) ? "Match" : "Target");
+ return -1;
+ }
+
+ data = (pfc_user_data_t *)smalloc (sizeof (*data));
+ data->name = sstrdup (ci->values[0].value.string);
+ data->user_data = newSV (0);
+
+ ret = fc_call (aTHX_ type, FC_CB_CREATE, data, ci);
+
+ if (0 != ret)
+ PFC_USER_DATA_FREE (data);
+ else
+ *user_data = data;
+ return ret;
+} /* static int fc_create (int, const oconfig_item_t *, void **) */
+
+static int fc_destroy (int type, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if ((NULL == perl_threads) || (NULL == data))
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_destroy: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ ret = fc_call (aTHX_ type, FC_CB_DESTROY, data);
+
+ PFC_USER_DATA_FREE (data);
+ *user_data = NULL;
+ return ret;
+} /* static int fc_destroy (int, void **) */
+
+static int fc_exec (int type, const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ assert (NULL != data);
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_exec: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ return fc_call (aTHX_ type, FC_CB_EXEC, data, ds, vl, meta);
+} /* static int fc_exec (int, const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static int pmatch_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_MATCH, ci, user_data);
+} /* static int pmatch_create (const oconfig_item_t *, void **) */
+
+static int pmatch_destroy (void **user_data)
+{
+ return fc_destroy (FC_MATCH, user_data);
+} /* static int pmatch_destroy (void **) */
+
+static int pmatch_match (const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_MATCH, ds, vl, meta, user_data);
+} /* static int pmatch_match (const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static match_proc_t pmatch = {
+ pmatch_create, pmatch_destroy, pmatch_match
+};
+
+static int ptarget_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_TARGET, ci, user_data);
+} /* static int ptarget_create (const oconfig_item_t *, void **) */
+
+static int ptarget_destroy (void **user_data)
+{
+ return fc_destroy (FC_TARGET, user_data);
+} /* static int ptarget_destroy (void **) */
+
+static int ptarget_invoke (const data_set_t *ds, value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_TARGET, ds, vl, meta, user_data);
+} /* static int ptarget_invoke (const data_set_t *, value_list_t *,
+ notification_meta_t **, void **) */
+
+static target_proc_t ptarget = {
+ ptarget_create, ptarget_destroy, ptarget_invoke
+};
+
+/*
* Exported Perl API.
*/
} /* static XS (Collectd_plugin_log) */
/*
+ * Collectd::_fc_register (type, name)
+ *
+ * type:
+ * match | target
+ *
+ * name:
+ * name of the match
+ */
+static XS (Collectd__fc_register)
+{
+ int type;
+ char *name;
+
+ int ret = 0;
+
+ dXSARGS;
+
+ if (2 != items) {
+ log_err ("Usage: Collectd::_fc_register(type, name)");
+ XSRETURN_EMPTY;
+ }
+
+ type = SvIV (ST (0));
+ name = SvPV_nolen (ST (1));
+
+ if (FC_MATCH == type)
+ ret = fc_register_match (name, pmatch);
+ else if (FC_TARGET == type)
+ ret = fc_register_target (name, ptarget);
+
+ if (0 == ret)
+ XSRETURN_YES;
+ else
+ XSRETURN_EMPTY;
+} /* static XS (Collectd_fc_register) */
+
+/*
* Collectd::call_by_name (...).
*
* Call a Perl sub identified by its name passed through $Collectd::cb_name.
} /* static XS (Collectd_call_by_name) */
/*
- * collectd's perl interpreter based thread implementation.
- *
- * This has been inspired by Perl's ithreads introduced in version 5.6.0.
- */
-
-/* must be called with perl_threads->mutex locked */
-static void c_ithread_destroy (c_ithread_t *ithread)
-{
- dTHXa (ithread->interp);
-
- assert (NULL != perl_threads);
-
- PERL_SET_CONTEXT (aTHX);
- log_debug ("Shutting down Perl interpreter %p...", aTHX);
-
-#if COLLECT_DEBUG
- sv_report_used ();
-
- --perl_threads->number_of_threads;
-#endif /* COLLECT_DEBUG */
-
- perl_destruct (aTHX);
- perl_free (aTHX);
-
- if (NULL == ithread->prev)
- perl_threads->head = ithread->next;
- else
- ithread->prev->next = ithread->next;
-
- if (NULL == ithread->next)
- perl_threads->tail = ithread->prev;
- else
- ithread->next->prev = ithread->prev;
-
- sfree (ithread);
- return;
-} /* static void c_ithread_destroy (c_ithread_t *) */
-
-static void c_ithread_destructor (void *arg)
-{
- c_ithread_t *ithread = (c_ithread_t *)arg;
- c_ithread_t *t = NULL;
-
- if (NULL == perl_threads)
- return;
-
- pthread_mutex_lock (&perl_threads->mutex);
-
- for (t = perl_threads->head; NULL != t; t = t->next)
- if (t == ithread)
- break;
-
- /* the ithread no longer exists */
- if (NULL == t)
- return;
-
- c_ithread_destroy (ithread);
-
- pthread_mutex_unlock (&perl_threads->mutex);
- return;
-} /* static void c_ithread_destructor (void *) */
-
-/* must be called with perl_threads->mutex locked */
-static c_ithread_t *c_ithread_create (PerlInterpreter *base)
-{
- c_ithread_t *t = NULL;
- dTHXa (NULL);
-
- assert (NULL != perl_threads);
-
- t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
- memset (t, 0, sizeof (c_ithread_t));
-
- t->interp = (NULL == base)
- ? NULL
- : perl_clone (base, CLONEf_KEEP_PTR_TABLE);
-
- aTHX = t->interp;
-
- if ((NULL != base) && (NULL != PL_endav)) {
- av_clear (PL_endav);
- av_undef (PL_endav);
- PL_endav = Nullav;
- }
-
-#if COLLECT_DEBUG
- ++perl_threads->number_of_threads;
-#endif /* COLLECT_DEBUG */
-
- t->next = NULL;
-
- if (NULL == perl_threads->tail) {
- perl_threads->head = t;
- t->prev = NULL;
- }
- else {
- perl_threads->tail->next = t;
- t->prev = perl_threads->tail;
- }
-
- perl_threads->tail = t;
-
- pthread_setspecific (perl_thr_key, (const void *)t);
- return t;
-} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
-
-/*
* Interface to collectd.
*/