From: Florian Forster Date: Sun, 28 Nov 2010 15:21:14 +0000 (+0100) Subject: lua plugin: Create a new thread for each callback. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=f275893bc80efcd062b76bcf9a5720827b5619ad;p=collectd.git lua plugin: Create a new thread for each callback. This allows callbacks to call one another and multiple read callbacks to be handled correctly. --- diff --git a/src/lua.c b/src/lua.c index 51080540..fbbf1054 100644 --- a/src/lua.c +++ b/src/lua.c @@ -137,37 +137,77 @@ static int clua_load_callback (lua_State *l, int callback_id) /* {{{ */ return (0); } /* }}} int clua_load_callback */ +/* Store the threads in a global variable so they are not cleaned up by the + * garbage collector. */ +static int clua_store_thread (lua_State *l, int idx, int callback_id) /* {{{ */ +{ + if (idx < 0) + idx += lua_gettop (l) + 1; + + lua_getfield (l, LUA_REGISTRYINDEX, "collectd_threads"); /* +1 = 1 */ + if (!lua_istable (l, /* idx = */ -1)) + { + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ + lua_newtable (l); /* +1 = 1 */ + lua_pushvalue (l, -1); /* +1 = 2 */ + lua_setfield (l, LUA_REGISTRYINDEX, "collectd_threads"); /* -1 = 1 */ + } + + /* The table is now on top of the stack */ + lua_pushinteger (l, (lua_Integer) callback_id); /* +1 = 2 */ + + /* Copy the thread pointer */ + lua_pushvalue (l, idx); /* +1 = 3 */ + + if (!lua_isthread (l, /* idx = */ -1)) + { + lua_pop (l, /* nelems = */ 3); /* -3 = 0 */ + return (-1); + } + + lua_settable (l, /* idx = */ -3); /* -2 = 1 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ + return (0); +} /* }}} int clua_store_thread */ + static int clua_read (user_data_t *ud) /* {{{ */ { clua_callback_data_t *cb = ud->data; int status; + lua_State *l; + + pthread_mutex_lock (&cb->lock); + + l = cb->lua_state; - status = clua_load_callback (cb->lua_state, cb->callback_id); + status = clua_load_callback (l, cb->callback_id); if (status != 0) { ERROR ("lua plugin: Unable to load callback \"%s\" (id %i).", cb->lua_function_name, cb->callback_id); + pthread_mutex_unlock (&cb->lock); return (-1); } /* +1 = 1 */ - status = lua_pcall (cb->lua_state, + status = lua_pcall (l, /* nargs = */ 0, /* nresults = */ 1, /* errfunc = */ 0); /* -1+1 = 1 */ if (status != 0) { - const char *errmsg = lua_tostring (cb->lua_state, /* idx = */ -1); + const char *errmsg = lua_tostring (l, /* idx = */ -1); if (errmsg == NULL) ERROR ("lua plugin: Calling a read callback failed. " "In addition, retrieving the error message failed."); else ERROR ("lua plugin: Calling a read callback failed: %s", errmsg); - lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ + pthread_mutex_unlock (&cb->lock); return (-1); } - if (!lua_isnumber (cb->lua_state, /* idx = */ -1)) + if (!lua_isnumber (l, /* idx = */ -1)) { ERROR ("lua plugin: Read function \"%s\" (id %i) did not return a numeric status.", cb->lua_function_name, cb->callback_id); @@ -175,12 +215,13 @@ static int clua_read (user_data_t *ud) /* {{{ */ } else { - status = (int) lua_tointeger (cb->lua_state, /* idx = */ -1); + status = (int) lua_tointeger (l, /* idx = */ -1); } /* pop return value and function */ - lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ + pthread_mutex_unlock (&cb->lock); return (status); } /* }}} int clua_read */ @@ -189,10 +230,13 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { clua_callback_data_t *cb = ud->data; int status; + lua_State *l; pthread_mutex_lock (&cb->lock); - status = clua_load_callback (cb->lua_state, cb->callback_id); + l = cb->lua_state; + + status = clua_load_callback (l, cb->callback_id); if (status != 0) { ERROR ("lua plugin: Unable to load callback \"%s\" (id %i).", @@ -202,34 +246,34 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ } /* +1 = 1 */ - status = luaC_pushvaluelist (cb->lua_state, ds, vl); + status = luaC_pushvaluelist (l, ds, vl); if (status != 0) { - lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ pthread_mutex_unlock (&cb->lock); ERROR ("lua plugin: luaC_pushvaluelist failed."); return (-1); } /* +1 = 2 */ - status = lua_pcall (cb->lua_state, + status = lua_pcall (l, /* nargs = */ 1, /* nresults = */ 1, /* errfunc = */ 0); /* -2+1 = 1 */ if (status != 0) { - const char *errmsg = lua_tostring (cb->lua_state, /* idx = */ -1); + const char *errmsg = lua_tostring (l, /* idx = */ -1); if (errmsg == NULL) ERROR ("lua plugin: Calling the write callback failed. " "In addition, retrieving the error message failed."); else ERROR ("lua plugin: Calling the write callback failed:\n%s", errmsg); - lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ pthread_mutex_unlock (&cb->lock); return (-1); } - if (!lua_isnumber (cb->lua_state, /* idx = */ -1)) + if (!lua_isnumber (l, /* idx = */ -1)) { ERROR ("lua plugin: Write function \"%s\" (id %i) did not return a numeric value.", cb->lua_function_name, cb->callback_id); @@ -237,10 +281,10 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ } else { - status = (int) lua_tointeger (cb->lua_state, /* idx = */ -1); + status = (int) lua_tointeger (l, /* idx = */ -1); } - lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */ + lua_pop (l, /* nelems = */ 1); /* -1 = 0 */ pthread_mutex_unlock (&cb->lock); return (status); } /* }}} int clua_write */ @@ -344,6 +388,7 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */ int nargs = lua_gettop (l); /* number of arguments */ clua_callback_data_t *cb; user_data_t ud; + lua_State *thread; int callback_id; char function_name[DATA_MAX_NAME_LEN] = ""; @@ -368,6 +413,15 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */ RETURN_LUA (l, -1); } + thread = lua_newthread (l); + if (thread == NULL) + { + ERROR ("lua plugin: lua_newthread failed."); + RETURN_LUA (l, -1); + } + clua_store_thread (l, /* idx = */ -1, callback_id); + lua_pop (l, /* nelems = */ 1); + if (function_name[0] == 0) ssnprintf (function_name, sizeof (function_name), "lua/callback_%i", callback_id); @@ -379,9 +433,10 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */ } memset (cb, 0, sizeof (*cb)); - cb->lua_state = l; + cb->lua_state = thread; cb->callback_id = callback_id; cb->lua_function_name = strdup (function_name); + pthread_mutex_init (&cb->lock, /* attr = */ NULL); ud.data = cb; ud.free_func = NULL; /* FIXME */ @@ -402,6 +457,7 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */ int nargs = lua_gettop (l); /* number of arguments */ clua_callback_data_t *cb; user_data_t ud; + lua_State *thread; int callback_id; char function_name[DATA_MAX_NAME_LEN] = ""; @@ -426,6 +482,15 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */ RETURN_LUA (l, -1); } + thread = lua_newthread (l); + if (thread == NULL) + { + ERROR ("lua plugin: lua_newthread failed."); + RETURN_LUA (l, -1); + } + clua_store_thread (l, /* idx = */ -1, callback_id); + lua_pop (l, /* nelems = */ 1); + if (function_name[0] == 0) ssnprintf (function_name, sizeof (function_name), "lua/callback_%i", callback_id); @@ -437,7 +502,7 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */ } memset (cb, 0, sizeof (*cb)); - cb->lua_state = l; + cb->lua_state = thread; cb->callback_id = callback_id; cb->lua_function_name = strdup (function_name); pthread_mutex_init (&cb->lock, /* attr = */ NULL);