lua plugin: Create a new thread for each callback.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 28 Nov 2010 15:21:14 +0000 (16:21 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 28 Nov 2010 15:21:14 +0000 (16:21 +0100)
This allows callbacks to call one another and multiple read callbacks to
be handled correctly.

src/lua.c

index 5108054..fbbf105 100644 (file)
--- a/src/lua.c
+++ b/src/lua.c
@@ -137,37 +137,77 @@ static int clua_load_callback (lua_State *l, int callback_id) /* {{{ */
   return (0);
 } /* }}} int clua_load_callback */
 
+/* Store the threads in a global variable so they are not cleaned up by the
+ * garbage collector. */
+static int clua_store_thread (lua_State *l, int idx, int callback_id) /* {{{ */
+{
+  if (idx < 0)
+    idx += lua_gettop (l) + 1;
+
+  lua_getfield (l, LUA_REGISTRYINDEX, "collectd_threads"); /* +1 = 1 */
+  if (!lua_istable (l, /* idx = */ -1))
+  {
+    lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
+    lua_newtable (l); /* +1 = 1 */
+    lua_pushvalue (l, -1); /* +1 = 2 */
+    lua_setfield (l, LUA_REGISTRYINDEX, "collectd_threads"); /* -1 = 1 */
+  }
+
+  /* The table is now on top of the stack */
+  lua_pushinteger (l, (lua_Integer) callback_id); /* +1 = 2 */
+
+  /* Copy the thread pointer */
+  lua_pushvalue (l, idx); /* +1 = 3 */
+
+  if (!lua_isthread (l, /* idx = */ -1))
+  {
+    lua_pop (l, /* nelems = */ 3); /* -3 = 0 */
+    return (-1);
+  }
+
+  lua_settable (l, /* idx = */ -3); /* -2 = 1 */
+  lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
+  return (0);
+} /* }}} int clua_store_thread */
+
 static int clua_read (user_data_t *ud) /* {{{ */
 {
   clua_callback_data_t *cb = ud->data;
   int status;
+  lua_State *l;
+
+  pthread_mutex_lock (&cb->lock);
+
+  l = cb->lua_state;
 
-  status = clua_load_callback (cb->lua_state, cb->callback_id);
+  status = clua_load_callback (l, cb->callback_id);
   if (status != 0)
   {
     ERROR ("lua plugin: Unable to load callback \"%s\" (id %i).",
         cb->lua_function_name, cb->callback_id);
+    pthread_mutex_unlock (&cb->lock);
     return (-1);
   }
   /* +1 = 1 */
 
-  status = lua_pcall (cb->lua_state,
+  status = lua_pcall (l,
       /* nargs    = */ 0,
       /* nresults = */ 1,
       /* errfunc  = */ 0); /* -1+1 = 1 */
   if (status != 0)
   {
-    const char *errmsg = lua_tostring (cb->lua_state, /* idx = */ -1);
+    const char *errmsg = lua_tostring (l, /* idx = */ -1);
     if (errmsg == NULL)
       ERROR ("lua plugin: Calling a read callback failed. "
           "In addition, retrieving the error message failed.");
     else
       ERROR ("lua plugin: Calling a read callback failed: %s", errmsg);
-    lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */
+    lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
+    pthread_mutex_unlock (&cb->lock);
     return (-1);
   }
 
-  if (!lua_isnumber (cb->lua_state, /* idx = */ -1))
+  if (!lua_isnumber (l, /* idx = */ -1))
   {
     ERROR ("lua plugin: Read function \"%s\" (id %i) did not return a numeric status.",
         cb->lua_function_name, cb->callback_id);
@@ -175,12 +215,13 @@ static int clua_read (user_data_t *ud) /* {{{ */
   }
   else
   {
-    status = (int) lua_tointeger (cb->lua_state, /* idx = */ -1);
+    status = (int) lua_tointeger (l, /* idx = */ -1);
   }
 
   /* pop return value and function */
-  lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */
+  lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
 
+  pthread_mutex_unlock (&cb->lock);
   return (status);
 } /* }}} int clua_read */
 
@@ -189,10 +230,13 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 {
   clua_callback_data_t *cb = ud->data;
   int status;
+  lua_State *l;
 
   pthread_mutex_lock (&cb->lock);
 
-  status = clua_load_callback (cb->lua_state, cb->callback_id);
+  l = cb->lua_state;
+
+  status = clua_load_callback (l, cb->callback_id);
   if (status != 0)
   {
     ERROR ("lua plugin: Unable to load callback \"%s\" (id %i).",
@@ -202,34 +246,34 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
   /* +1 = 1 */
 
-  status = luaC_pushvaluelist (cb->lua_state, ds, vl);
+  status = luaC_pushvaluelist (l, ds, vl);
   if (status != 0)
   {
-    lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */
+    lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
     pthread_mutex_unlock (&cb->lock);
     ERROR ("lua plugin: luaC_pushvaluelist failed.");
     return (-1);
   }
   /* +1 = 2 */
 
-  status = lua_pcall (cb->lua_state,
+  status = lua_pcall (l,
       /* nargs    = */ 1,
       /* nresults = */ 1,
       /* errfunc  = */ 0); /* -2+1 = 1 */
   if (status != 0)
   {
-    const char *errmsg = lua_tostring (cb->lua_state, /* idx = */ -1);
+    const char *errmsg = lua_tostring (l, /* idx = */ -1);
     if (errmsg == NULL)
       ERROR ("lua plugin: Calling the write callback failed. "
           "In addition, retrieving the error message failed.");
     else
       ERROR ("lua plugin: Calling the write callback failed:\n%s", errmsg);
-    lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */
+    lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
     pthread_mutex_unlock (&cb->lock);
     return (-1);
   }
 
-  if (!lua_isnumber (cb->lua_state, /* idx = */ -1))
+  if (!lua_isnumber (l, /* idx = */ -1))
   {
     ERROR ("lua plugin: Write function \"%s\" (id %i) did not return a numeric value.",
         cb->lua_function_name, cb->callback_id);
@@ -237,10 +281,10 @@ static int clua_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
   else
   {
-    status = (int) lua_tointeger (cb->lua_state, /* idx = */ -1);
+    status = (int) lua_tointeger (l, /* idx = */ -1);
   }
 
-  lua_pop (cb->lua_state, /* nelems = */ 1); /* -1 = 0 */
+  lua_pop (l, /* nelems = */ 1); /* -1 = 0 */
   pthread_mutex_unlock (&cb->lock);
   return (status);
 } /* }}} int clua_write */
@@ -344,6 +388,7 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */
   int nargs = lua_gettop (l); /* number of arguments */
   clua_callback_data_t *cb;
   user_data_t ud;
+  lua_State *thread;
 
   int callback_id;
   char function_name[DATA_MAX_NAME_LEN] = "";
@@ -368,6 +413,15 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */
     RETURN_LUA (l, -1);
   }
 
+  thread = lua_newthread (l);
+  if (thread == NULL)
+  {
+    ERROR ("lua plugin: lua_newthread failed.");
+    RETURN_LUA (l, -1);
+  }
+  clua_store_thread (l, /* idx = */ -1, callback_id);
+  lua_pop (l, /* nelems = */ 1);
+
   if (function_name[0] == 0)
     ssnprintf (function_name, sizeof (function_name), "lua/callback_%i", callback_id);
 
@@ -379,9 +433,10 @@ static int lua_cb_register_read (lua_State *l) /* {{{ */
   }
 
   memset (cb, 0, sizeof (*cb));
-  cb->lua_state = l;
+  cb->lua_state = thread;
   cb->callback_id = callback_id;
   cb->lua_function_name = strdup (function_name);
+  pthread_mutex_init (&cb->lock, /* attr = */ NULL);
 
   ud.data = cb;
   ud.free_func = NULL; /* FIXME */
@@ -402,6 +457,7 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */
   int nargs = lua_gettop (l); /* number of arguments */
   clua_callback_data_t *cb;
   user_data_t ud;
+  lua_State *thread;
 
   int callback_id;
   char function_name[DATA_MAX_NAME_LEN] = "";
@@ -426,6 +482,15 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */
     RETURN_LUA (l, -1);
   }
 
+  thread = lua_newthread (l);
+  if (thread == NULL)
+  {
+    ERROR ("lua plugin: lua_newthread failed.");
+    RETURN_LUA (l, -1);
+  }
+  clua_store_thread (l, /* idx = */ -1, callback_id);
+  lua_pop (l, /* nelems = */ 1);
+
   if (function_name[0] == 0)
     ssnprintf (function_name, sizeof (function_name), "lua/callback_%i", callback_id);
 
@@ -437,7 +502,7 @@ static int lua_cb_register_write (lua_State *l) /* {{{ */
   }
 
   memset (cb, 0, sizeof (*cb));
-  cb->lua_state = l;
+  cb->lua_state = thread;
   cb->callback_id = callback_id;
   cb->lua_function_name = strdup (function_name);
   pthread_mutex_init (&cb->lock, /* attr = */ NULL);