#include "utils_avltree.h"
#include "utils_complain.h"
+ +#include <sys/socket.h>
+ +#include <sys/types.h>
+ +#include <sys/un.h>
+ +
#include <curl/curl.h>
+ +
#include <yajl/yajl_parse.h>
#if HAVE_YAJL_YAJL_VERSION_H
# include <yajl/yajl_version.h>
char *instance;
char *host;
+ + char *sock;
+ +
char *url;
char *user;
char *pass;
c_avl_tree_t *tree;
cj_key_t *key;
};
++ _Bool in_array;
++ int index;
char name[DATA_MAX_NAME_LEN];
} state[YAJL_MAX_DEPTH];
};
#endif
static int cj_read (user_data_t *ud);
- -static int cj_curl_perform (cj_t *db, CURL *curl);
static void cj_submit (cj_t *db, cj_key_t *key, value_t *value);
static size_t cj_curl_callback (void *buf, /* {{{ */
return ds->ds[0].type;
}
++ static int cj_cb_map_key (void *ctx, const unsigned char *val,
++ yajl_len_t len);
++
++ static void cj_cb_inc_array_index (void * ctx, _Bool ignore)
++ {
++ cj_t *db = (cj_t *)ctx;
++
++ if (db->state[db->depth].in_array) {
++ if (ignore)
++ db->state[db->depth].index++;
++ else {
++ char name[DATA_MAX_NAME_LEN];
++ cj_cb_map_key (ctx, (unsigned char *)name,
++ ssnprintf (name, sizeof (name),
++ "%d", db->state[db->depth].index++));
++ }
++ }
++ }
++
/* yajl callbacks */
#define CJ_CB_ABORT 0
#define CJ_CB_CONTINUE 1
++ static int cj_cb_boolean (void * ctx, int boolVal)
++ {
++ cj_cb_inc_array_index (ctx, 1);
++ return (CJ_CB_CONTINUE);
++ }
++
++ static int cj_cb_null (void * ctx)
++ {
++ cj_cb_inc_array_index (ctx, 1);
++ return (CJ_CB_CONTINUE);
++ }
++
/* "number" may not be null terminated, so copy it into a buffer before
* parsing. */
static int cj_cb_number (void *ctx,
int type;
int status;
-- if ((key == NULL) || !CJ_IS_KEY (key))
++ if ((key == NULL) || !CJ_IS_KEY (key)) {
++ if (key != NULL)
++ NOTICE ("curl_json plugin: Found \"%.*s\", but the configuration expects"
++ " a map.", (int)number_len > number_len ? 0 : (int)number_len,
++ number);
++ cj_cb_inc_array_index (ctx, 1);
return (CJ_CB_CONTINUE);
++ } else
++ cj_cb_inc_array_index (ctx, 0);
memcpy (buffer, number, number_len);
buffer[sizeof (buffer) - 1] = 0;
static int cj_cb_string (void *ctx, const unsigned char *val,
yajl_len_t len)
{
-- cj_t *db = (cj_t *)ctx;
-- char str[len + 1];
--
-- /* Create a null-terminated version of the string. */
-- memcpy (str, val, len);
-- str[len] = 0;
--
-- /* No configuration for this string -> simply return. */
-- if (db->state[db->depth].key == NULL)
-- return (CJ_CB_CONTINUE);
--
-- if (!CJ_IS_KEY (db->state[db->depth].key))
-- {
-- NOTICE ("curl_json plugin: Found string \"%s\", but the configuration "
-- "expects a map here.", str);
-- return (CJ_CB_CONTINUE);
-- }
--
/* Handle the string as if it was a number. */
return (cj_cb_number (ctx, (const char *) val, len));
} /* int cj_cb_string */
cj_t *db = (cj_t *)ctx;
if (++db->depth >= YAJL_MAX_DEPTH)
{
- - ERROR ("curl_json plugin: %s depth exceeds max, aborting.", db->url);
+ + ERROR ("curl_json plugin: %s depth exceeds max, aborting.",
+ + db->url ? db->url : db->sock);
return (CJ_CB_ABORT);
}
return (CJ_CB_CONTINUE);
static int cj_cb_start_map (void *ctx)
{
++ cj_cb_inc_array_index (ctx, 0);
return cj_cb_start (ctx);
}
static int cj_cb_start_array (void * ctx)
{
++ cj_t *db = (cj_t *)ctx;
++ cj_cb_inc_array_index (ctx, 0);
++ if (db->depth+1 < YAJL_MAX_DEPTH) {
++ db->state[db->depth+1].in_array = 1;
++ db->state[db->depth+1].index = 0;
++ }
return cj_cb_start (ctx);
}
static int cj_cb_end_array (void * ctx)
{
++ cj_t *db = (cj_t *)ctx;
++ db->state[db->depth].in_array = 0;
return cj_cb_end (ctx);
}
static yajl_callbacks ycallbacks = {
-- NULL, /* null */
-- NULL, /* boolean */
++ cj_cb_null, /* null */
++ cj_cb_boolean, /* boolean */
NULL, /* integer */
NULL, /* double */
cj_cb_number,
sfree (db->instance);
sfree (db->host);
+ + sfree (db->sock);
+ +
sfree (db->url);
sfree (db->user);
sfree (db->pass);
memset (db, 0, sizeof (*db));
if (strcasecmp ("URL", ci->key) == 0)
- - {
status = cf_util_get_string (ci, &db->url);
- - if (status != 0)
- - {
- - sfree (db);
- - return (status);
- - }
- - }
+ + else if (strcasecmp ("Sock", ci->key) == 0)
+ + status = cf_util_get_string (ci, &db->sock);
else
{
ERROR ("curl_json plugin: cj_config: "
"Invalid key: %s", ci->key);
return (-1);
}
+ + if (status != 0)
+ + {
+ + sfree (db);
+ + return (status);
+ + }
/* Fill the `cj_t' structure.. */
for (i = 0; i < ci->children_num; i++)
status = cf_util_get_string (child, &db->instance);
else if (strcasecmp ("Host", child->key) == 0)
status = cf_util_get_string (child, &db->host);
- - else if (strcasecmp ("User", child->key) == 0)
+ + else if (db->url && strcasecmp ("User", child->key) == 0)
status = cf_util_get_string (child, &db->user);
- - else if (strcasecmp ("Password", child->key) == 0)
+ + else if (db->url && strcasecmp ("Password", child->key) == 0)
status = cf_util_get_string (child, &db->pass);
- - else if (strcasecmp ("VerifyPeer", child->key) == 0)
+ + else if (db->url && strcasecmp ("VerifyPeer", child->key) == 0)
status = cf_util_get_boolean (child, &db->verify_peer);
- - else if (strcasecmp ("VerifyHost", child->key) == 0)
+ + else if (db->url && strcasecmp ("VerifyHost", child->key) == 0)
status = cf_util_get_boolean (child, &db->verify_host);
- - else if (strcasecmp ("CACert", child->key) == 0)
+ + else if (db->url && strcasecmp ("CACert", child->key) == 0)
status = cf_util_get_string (child, &db->cacert);
- - else if (strcasecmp ("Header", child->key) == 0)
+ + else if (db->url && strcasecmp ("Header", child->key) == 0)
status = cj_config_append_string ("Header", &db->headers, child);
- - else if (strcasecmp ("Post", child->key) == 0)
+ + else if (db->url && strcasecmp ("Post", child->key) == 0)
status = cf_util_get_string (child, &db->post_body);
else if (strcasecmp ("Key", child->key) == 0)
status = cj_config_add_key (db, child);
{
if (db->tree == NULL)
{
- - WARNING ("curl_json plugin: No (valid) `Key' block "
- - "within `URL' block `%s'.", db->url);
+ + WARNING ("curl_json plugin: No (valid) `Key' block within `%s' \"`%s'\".",
+ + db->url ? "URL" : "Sock", db->url ? db->url : db->sock);
status = -1;
}
- - if (status == 0)
+ + if (status == 0 && db->url)
status = cj_init_curl (db);
}
ud.free_func = cj_free;
ssnprintf (cb_name, sizeof (cb_name), "curl_json-%s-%s",
- - db->instance, db->url);
+ + db->instance, db->url ? db->url : db->sock);
plugin_register_complex_read (/* group = */ NULL, cb_name, cj_read,
/* interval = */ NULL, &ud);
{
oconfig_item_t *child = ci->children + i;
- - if (strcasecmp ("URL", child->key) == 0)
+ + if (strcasecmp ("Sock", child->key) == 0
+ + || strcasecmp ("URL", child->key) == 0)
{
status = cj_config_add_url (child);
if (status == 0)
plugin_dispatch_values (&vl);
} /* }}} int cj_submit */
- -static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */
+ +static int cj_sock_perform (cj_t *db) /* {{{ */
{
- - int status;
- - long rc;
- - char *url;
- - yajl_handle yprev = db->yajl;
+ + char errbuf[1024];
+ + struct sockaddr_un sa_unix = {};
+ + sa_unix.sun_family = AF_UNIX;
+ + sstrncpy (sa_unix.sun_path, db->sock, sizeof (sa_unix.sun_path));
- - db->yajl = yajl_alloc (&ycallbacks,
- -#if HAVE_YAJL_V2
- - /* alloc funcs = */ NULL,
- -#else
- - /* alloc funcs = */ NULL, NULL,
- -#endif
- - /* context = */ (void *)db);
- - if (db->yajl == NULL)
+ + int fd = socket (AF_UNIX, SOCK_STREAM, 0);
+ + if (fd < 0)
+ + return (-1);
+ + if (connect (fd, (struct sockaddr *)&sa_unix, sizeof(sa_unix)) < 0)
{
- - ERROR ("curl_json plugin: yajl_alloc failed.");
- - db->yajl = yprev;
+ + ERROR ("curl_json plugin: connect(%s) failed: %s",
+ + (db->sock != NULL) ? db->sock : "<null>",
+ + sstrerror(errno, errbuf, sizeof (errbuf)));
+ + close (fd);
return (-1);
}
+ + ssize_t red;
+ + do {
+ + unsigned char buffer[4096];
+ + red = read (fd, buffer, sizeof(buffer));
+ + if (red < 0) {
+ + ERROR ("curl_json plugin: read(%s) failed: %s",
+ + (db->sock != NULL) ? db->sock : "<null>",
+ + sstrerror(errno, errbuf, sizeof (errbuf)));
+ + close (fd);
+ + return (-1);
+ + }
+ + if (!cj_curl_callback (buffer, red, 1, db))
+ + break;
+ + } while (red > 0);
+ + close (fd);
+ + return (0);
+ +} /* }}} int cj_sock_perform */
+ +
+ +
+ +static int cj_curl_perform(cj_t *db) /* {{{ */
+ +{
+ + int status;
+ + long rc;
+ + char *url;
url = NULL;
- - curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &url);
+ + curl_easy_getinfo(db->curl, CURLINFO_EFFECTIVE_URL, &url);
- - status = curl_easy_perform (curl);
+ + status = curl_easy_perform (db->curl);
if (status != CURLE_OK)
{
ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)",
status, db->curl_errbuf, (url != NULL) ? url : "<null>");
- - yajl_free (db->yajl);
- - db->yajl = yprev;
return (-1);
}
- - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
+ + curl_easy_getinfo(db->curl, CURLINFO_RESPONSE_CODE, &rc);
/* The response code is zero if a non-HTTP transport was used. */
if ((rc != 0) && (rc != 200))
{
ERROR ("curl_json plugin: curl_easy_perform failed with "
"response code %ld (%s)", rc, url);
+ + return (-1);
+ + }
+ + return (0);
+ +} /* }}} int cj_curl_perform */
+ +
+ +static int cj_perform (cj_t *db) /* {{{ */
+ +{
+ + int status;
+ + yajl_handle yprev = db->yajl;
+ +
+ + db->yajl = yajl_alloc (&ycallbacks,
+ +#if HAVE_YAJL_V2
+ + /* alloc funcs = */ NULL,
+ +#else
+ + /* alloc funcs = */ NULL, NULL,
+ +#endif
+ + /* context = */ (void *)db);
+ + if (db->yajl == NULL)
+ + {
+ + ERROR ("curl_json plugin: yajl_alloc failed.");
+ + db->yajl = yprev;
+ + return (-1);
+ + }
+ +
+ + if (db->url)
+ + status = cj_curl_perform (db);
+ + else
+ + status = cj_sock_perform (db);
+ + if (status < 0)
+ + {
yajl_free (db->yajl);
db->yajl = yprev;
return (-1);
yajl_free (db->yajl);
db->yajl = yprev;
return (0);
- -} /* }}} int cj_curl_perform */
+ +} /* }}} int cj_perform */
static int cj_read (user_data_t *ud) /* {{{ */
{
db->state[db->depth].tree = db->tree;
db->key = NULL;
- - return cj_curl_perform (db, db->curl);
+ + return cj_perform (db);
} /* }}} int cj_read */
void module_register (void)