Merge remote-tracking branches 'github/pr/392' and 'github/pr/399' into jr/json
authorFlorian Forster <octo@collectd.org>
Sun, 18 Aug 2013 07:04:45 +0000 (09:04 +0200)
committerFlorian Forster <octo@collectd.org>
Sun, 18 Aug 2013 07:04:45 +0000 (09:04 +0200)
1  2  3 
src/collectd.conf.pod
src/curl_json.c

diff --combined src/collectd.conf.pod
@@@@ -1072,13 -1072,15 -1072,13 +1072,15 @@@@ is set to B<true>, B<Match> blocks are 
   
   =head2 Plugin C<curl_json>
   
- -The B<curl_json plugin> uses B<libcurl> (L<http://curl.haxx.se/>) and
- -B<libyajl> (L<http://www.lloydforge.org/projects/yajl/>) to retrieve JSON data
- -via cURL. This can be used to collect values from CouchDB documents (which are
- -stored JSON notation), for example.
- -
- -The following example will collect several values from the built-in `_stats'
- -runtime statistics module of CouchDB
+ +The B<curl_json plugin> collects values from JSON data to be parsed by
+ +B<libyajl> (L<http://www.lloydforge.org/projects/yajl/>) retrieved via
+ +either B<libcurl> (L<http://curl.haxx.se/>) or read directly from a
+ +unix socket. The former can be used, for example, to collect values
+ +from CouchDB documents (which are stored JSON notation), and the
+ +latter to collect values from a uWSGI stats socket.
+ +
+ +The following example will collect several values from the built-in
+ +`_stats' runtime statistics module of CouchDB
   (L<http://wiki.apache.org/couchdb/Runtime_Statistics>).
   
     <Plugin curl_json>
       </URL>
     </Plugin>
   
- -In the B<Plugin> block, there may be one or more B<URL> blocks, each defining
- -a URL to be fetched via HTTP (using libcurl) and one or more B<Key> blocks.
+ +This example will collect data directly from a uWSGI Stats Server
+ +socket.
+ +
+ +  <Plugin curl_json>
+ +    <Sock "/var/run/uwsgi.stats.sock">
+ +      Instance "uwsgi"
+ +      <Key "workers/*/requests">
+ +        Type "http_requests"
+ +      </Key>
+ +
+ +      <Key "workers/*/apps/*/requests">
+ +        Type "http_requests"
+ +      </Key>
+ +    </Sock>
+ +  </Plugin>
+ +
+ +In the B<Plugin> block, there may be one or more B<URL> blocks, each
+ +defining a URL to be fetched via HTTP (using libcurl) or B<Sock>
+ +blocks defining a unix socket to read JSON from directly.  Each of
+ +these blocks may have one or more B<Key> blocks.
+ +
   The B<Key> string argument must be in a path format, which is used to collect a
   value from a JSON map object. If a path element of B<Key> is the
   I<*>E<nbsp>wildcard, the values for all keys will be collectd.
   
++ The B<Key> string argument must be in a path format. Each component is
++ used to match the key from a JSON map or the index of an JSON
++ array. If a path component of a B<Key> is a I<*>E<nbsp>wildcard, the
++ values for all map keys or array indices will be collectd.
++ 
   The following options are valid within B<URL> blocks:
   
   =over 4
diff --combined src/curl_json.c
   #include "utils_avltree.h"
   #include "utils_complain.h"
   
+ +#include <sys/socket.h>
+ +#include <sys/types.h>
+ +#include <sys/un.h>
+ +
   #include <curl/curl.h>
+ +
   #include <yajl/yajl_parse.h>
   #if HAVE_YAJL_YAJL_VERSION_H
   # include <yajl/yajl_version.h>
@@@@ -60,6 -65,8 -60,6 +65,8 @@@@ struct cj_s /* {{{ *
     char *instance;
     char *host;
   
+ +  char *sock;
+ +
     char *url;
     char *user;
     char *pass;
         c_avl_tree_t *tree;
         cj_key_t *key;
       };
++     _Bool in_array;
++     int index;
       char name[DATA_MAX_NAME_LEN];
     } state[YAJL_MAX_DEPTH];
   };
@@@@ -94,7 -101,6 -96,7 +103,6 @@@@ typedef unsigned int yajl_len_t
   #endif
   
   static int cj_read (user_data_t *ud);
- -static int cj_curl_perform (cj_t *db, CURL *curl);
   static void cj_submit (cj_t *db, cj_key_t *key, value_t *value);
   
   static size_t cj_curl_callback (void *buf, /* {{{ */
@@@@ -167,10 -173,10 -169,41 +175,41 @@@@ static int cj_get_type (cj_key_t *key
     return ds->ds[0].type;
   }
   
++ static int cj_cb_map_key (void *ctx, const unsigned char *val,
++     yajl_len_t len);
++ 
++ static void cj_cb_inc_array_index (void * ctx, _Bool ignore)
++ {
++   cj_t *db = (cj_t *)ctx;
++ 
++   if (db->state[db->depth].in_array) {
++     if (ignore)
++       db->state[db->depth].index++;
++     else {
++       char name[DATA_MAX_NAME_LEN];
++       cj_cb_map_key (ctx, (unsigned char *)name,
++                      ssnprintf (name, sizeof (name),
++                                 "%d", db->state[db->depth].index++));
++     }
++   }
++ }
++ 
   /* yajl callbacks */
   #define CJ_CB_ABORT    0
   #define CJ_CB_CONTINUE 1
   
++ static int cj_cb_boolean (void * ctx, int boolVal)
++ {
++   cj_cb_inc_array_index (ctx, 1);
++   return (CJ_CB_CONTINUE);
++ }
++ 
++ static int cj_cb_null (void * ctx)
++ {
++   cj_cb_inc_array_index (ctx, 1);
++   return (CJ_CB_CONTINUE);
++ }
++ 
   /* "number" may not be null terminated, so copy it into a buffer before
    * parsing. */
   static int cj_cb_number (void *ctx,
     int type;
     int status;
   
--   if ((key == NULL) || !CJ_IS_KEY (key))
++   if ((key == NULL) || !CJ_IS_KEY (key)) {
++     if (key != NULL)
++       NOTICE ("curl_json plugin: Found \"%.*s\", but the configuration expects"
++               " a map.", (int)number_len > number_len ? 0 : (int)number_len,
++               number);
++     cj_cb_inc_array_index (ctx, 1);
       return (CJ_CB_CONTINUE);
++   } else
++     cj_cb_inc_array_index (ctx, 0);
   
     memcpy (buffer, number, number_len);
     buffer[sizeof (buffer) - 1] = 0;
@@@@ -233,24 -239,24 -273,6 +279,6 @@@@ static int cj_cb_map_key (void *ctx, co
   static int cj_cb_string (void *ctx, const unsigned char *val,
       yajl_len_t len)
   {
--   cj_t *db = (cj_t *)ctx;
--   char str[len + 1];
-- 
--   /* Create a null-terminated version of the string. */
--   memcpy (str, val, len);
--   str[len] = 0;
-- 
--   /* No configuration for this string -> simply return. */
--   if (db->state[db->depth].key == NULL)
--     return (CJ_CB_CONTINUE);
-- 
--   if (!CJ_IS_KEY (db->state[db->depth].key))
--   {
--     NOTICE ("curl_json plugin: Found string \"%s\", but the configuration "
--         "expects a map here.", str);
--     return (CJ_CB_CONTINUE);
--   }
-- 
     /* Handle the string as if it was a number. */
     return (cj_cb_number (ctx, (const char *) val, len));
   } /* int cj_cb_string */
@@@@ -260,7 -266,8 -282,7 +288,8 @@@@ static int cj_cb_start (void *ctx
     cj_t *db = (cj_t *)ctx;
     if (++db->depth >= YAJL_MAX_DEPTH)
     {
- -    ERROR ("curl_json plugin: %s depth exceeds max, aborting.", db->url);
+ +    ERROR ("curl_json plugin: %s depth exceeds max, aborting.",
+ +           db->url ? db->url : db->sock);
       return (CJ_CB_ABORT);
     }
     return (CJ_CB_CONTINUE);
@@@@ -276,6 -283,6 -298,7 +305,7 @@@@ static int cj_cb_end (void *ctx
   
   static int cj_cb_start_map (void *ctx)
   {
++   cj_cb_inc_array_index (ctx, 0);
     return cj_cb_start (ctx);
   }
   
@@@@ -286,17 -293,17 -309,25 +316,25 @@@@ static int cj_cb_end_map (void *ctx
   
   static int cj_cb_start_array (void * ctx)
   {
++   cj_t *db = (cj_t *)ctx;
++   cj_cb_inc_array_index (ctx, 0);
++   if (db->depth+1 < YAJL_MAX_DEPTH) {
++     db->state[db->depth+1].in_array = 1;
++     db->state[db->depth+1].index = 0;
++   }
     return cj_cb_start (ctx);
   }
   
   static int cj_cb_end_array (void * ctx)
   {
++   cj_t *db = (cj_t *)ctx;
++   db->state[db->depth].in_array = 0;
     return cj_cb_end (ctx);
   }
   
   static yajl_callbacks ycallbacks = {
--   NULL, /* null */
--   NULL, /* boolean */
++   cj_cb_null, /* null */
++   cj_cb_boolean, /* boolean */
     NULL, /* integer */
     NULL, /* double */
     cj_cb_number,
@@@@ -364,6 -371,8 -395,6 +402,8 @@@@ static void cj_free (void *arg) /* {{{ 
     sfree (db->instance);
     sfree (db->host);
   
+ +  sfree (db->sock);
+ +
     sfree (db->url);
     sfree (db->user);
     sfree (db->pass);
@@@@ -598,20 -607,20 -629,20 +638,20 @@@@ static int cj_config_add_url (oconfig_i
     memset (db, 0, sizeof (*db));
   
     if (strcasecmp ("URL", ci->key) == 0)
- -  {
       status = cf_util_get_string (ci, &db->url);
- -    if (status != 0)
- -    {
- -      sfree (db);
- -      return (status);
- -    }
- -  }
+ +  else if (strcasecmp ("Sock", ci->key) == 0)
+ +    status = cf_util_get_string (ci, &db->sock);
     else
     {
       ERROR ("curl_json plugin: cj_config: "
              "Invalid key: %s", ci->key);
       return (-1);
     }
+ +  if (status != 0)
+ +  {
+ +    sfree (db);
+ +    return (status);
+ +  }
   
     /* Fill the `cj_t' structure.. */
     for (i = 0; i < ci->children_num; i++)
         status = cf_util_get_string (child, &db->instance);
       else if (strcasecmp ("Host", child->key) == 0)
         status = cf_util_get_string (child, &db->host);
- -    else if (strcasecmp ("User", child->key) == 0)
+ +    else if (db->url && strcasecmp ("User", child->key) == 0)
         status = cf_util_get_string (child, &db->user);
- -    else if (strcasecmp ("Password", child->key) == 0)
+ +    else if (db->url && strcasecmp ("Password", child->key) == 0)
         status = cf_util_get_string (child, &db->pass);
- -    else if (strcasecmp ("VerifyPeer", child->key) == 0)
+ +    else if (db->url && strcasecmp ("VerifyPeer", child->key) == 0)
         status = cf_util_get_boolean (child, &db->verify_peer);
- -    else if (strcasecmp ("VerifyHost", child->key) == 0)
+ +    else if (db->url && strcasecmp ("VerifyHost", child->key) == 0)
         status = cf_util_get_boolean (child, &db->verify_host);
- -    else if (strcasecmp ("CACert", child->key) == 0)
+ +    else if (db->url && strcasecmp ("CACert", child->key) == 0)
         status = cf_util_get_string (child, &db->cacert);
- -    else if (strcasecmp ("Header", child->key) == 0)
+ +    else if (db->url && strcasecmp ("Header", child->key) == 0)
         status = cj_config_append_string ("Header", &db->headers, child);
- -    else if (strcasecmp ("Post", child->key) == 0)
+ +    else if (db->url && strcasecmp ("Post", child->key) == 0)
         status = cf_util_get_string (child, &db->post_body);
       else if (strcasecmp ("Key", child->key) == 0)
         status = cj_config_add_key (db, child);
     {
       if (db->tree == NULL)
       {
- -      WARNING ("curl_json plugin: No (valid) `Key' block "
- -               "within `URL' block `%s'.", db->url);
+ +      WARNING ("curl_json plugin: No (valid) `Key' block within `%s' \"`%s'\".",
+ +               db->url ? "URL" : "Sock", db->url ? db->url : db->sock);
         status = -1;
       }
- -    if (status == 0)
+ +    if (status == 0 && db->url)
         status = cj_init_curl (db);
     }
   
       ud.free_func = cj_free;
   
       ssnprintf (cb_name, sizeof (cb_name), "curl_json-%s-%s",
- -               db->instance, db->url);
+ +               db->instance, db->url ? db->url : db->sock);
   
       plugin_register_complex_read (/* group = */ NULL, cb_name, cj_read,
                                     /* interval = */ NULL, &ud);
@@@@ -706,7 -715,8 -737,7 +746,8 @@@@ static int cj_config (oconfig_item_t *c
     {
       oconfig_item_t *child = ci->children + i;
   
- -    if (strcasecmp ("URL", child->key) == 0)
+ +    if (strcasecmp ("Sock", child->key) == 0
+ +        || strcasecmp ("URL", child->key) == 0)
       {
         status = cj_config_add_url (child);
         if (status == 0)
@@@@ -766,47 -776,97 -797,47 +807,97 @@@@ static void cj_submit (cj_t *db, cj_key
     plugin_dispatch_values (&vl);
   } /* }}} int cj_submit */
   
- -static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */
+ +static int cj_sock_perform (cj_t *db) /* {{{ */
   {
- -  int status;
- -  long rc;
- -  char *url;
- -  yajl_handle yprev = db->yajl;
+ +  char errbuf[1024];
+ +  struct sockaddr_un sa_unix = {};
+ +  sa_unix.sun_family = AF_UNIX;
+ +  sstrncpy (sa_unix.sun_path, db->sock, sizeof (sa_unix.sun_path));
   
- -  db->yajl = yajl_alloc (&ycallbacks,
- -#if HAVE_YAJL_V2
- -      /* alloc funcs = */ NULL,
- -#else
- -      /* alloc funcs = */ NULL, NULL,
- -#endif
- -      /* context = */ (void *)db);
- -  if (db->yajl == NULL)
+ +  int fd = socket (AF_UNIX, SOCK_STREAM, 0);
+ +  if (fd < 0)
+ +    return (-1);
+ +  if (connect (fd, (struct sockaddr *)&sa_unix, sizeof(sa_unix)) < 0)
     {
- -    ERROR ("curl_json plugin: yajl_alloc failed.");
- -    db->yajl = yprev;
+ +    ERROR ("curl_json plugin: connect(%s) failed: %s",
+ +           (db->sock != NULL) ? db->sock : "<null>",
+ +           sstrerror(errno, errbuf, sizeof (errbuf)));
+ +    close (fd);
       return (-1);
     }
   
+ +  ssize_t red;
+ +  do {
+ +    unsigned char buffer[4096];
+ +    red = read (fd, buffer, sizeof(buffer));
+ +    if (red < 0) {
+ +        ERROR ("curl_json plugin: read(%s) failed: %s",
+ +               (db->sock != NULL) ? db->sock : "<null>",
+ +               sstrerror(errno, errbuf, sizeof (errbuf)));
+ +        close (fd);
+ +        return (-1);
+ +    }
+ +    if (!cj_curl_callback (buffer, red, 1, db))
+ +        break;
+ +  } while (red > 0);
+ +  close (fd);
+ +  return (0);
+ +} /* }}} int cj_sock_perform */
+ +
+ +
+ +static int cj_curl_perform(cj_t *db) /* {{{ */
+ +{
+ +  int status;
+ +  long rc;
+ +  char *url;
     url = NULL;
- -  curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &url);
+ +  curl_easy_getinfo(db->curl, CURLINFO_EFFECTIVE_URL, &url);
   
- -  status = curl_easy_perform (curl);
+ +  status = curl_easy_perform (db->curl);
     if (status != CURLE_OK)
     {
       ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)",
              status, db->curl_errbuf, (url != NULL) ? url : "<null>");
- -    yajl_free (db->yajl);
- -    db->yajl = yprev;
       return (-1);
     }
   
- -  curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
+ +  curl_easy_getinfo(db->curl, CURLINFO_RESPONSE_CODE, &rc);
   
     /* The response code is zero if a non-HTTP transport was used. */
     if ((rc != 0) && (rc != 200))
     {
       ERROR ("curl_json plugin: curl_easy_perform failed with "
           "response code %ld (%s)", rc, url);
+ +    return (-1);
+ +  }
+ +  return (0);
+ +} /* }}} int cj_curl_perform */
+ +
+ +static int cj_perform (cj_t *db) /* {{{ */
+ +{
+ +  int status;
+ +  yajl_handle yprev = db->yajl;
+ +
+ +  db->yajl = yajl_alloc (&ycallbacks,
+ +#if HAVE_YAJL_V2
+ +      /* alloc funcs = */ NULL,
+ +#else
+ +      /* alloc funcs = */ NULL, NULL,
+ +#endif
+ +      /* context = */ (void *)db);
+ +  if (db->yajl == NULL)
+ +  {
+ +    ERROR ("curl_json plugin: yajl_alloc failed.");
+ +    db->yajl = yprev;
+ +    return (-1);
+ +  }
+ +
+ +  if (db->url)
+ +    status = cj_curl_perform (db);
+ +  else
+ +    status = cj_sock_perform (db);
+ +  if (status < 0)
+ +  {
       yajl_free (db->yajl);
       db->yajl = yprev;
       return (-1);
     yajl_free (db->yajl);
     db->yajl = yprev;
     return (0);
- -} /* }}} int cj_curl_perform */
+ +} /* }}} int cj_perform */
   
   static int cj_read (user_data_t *ud) /* {{{ */
   {
     db->state[db->depth].tree = db->tree;
     db->key = NULL;
   
- -  return cj_curl_perform (db, db->curl);
+ +  return cj_perform (db);
   } /* }}} int cj_read */
   
   void module_register (void)