From: Florian Forster Date: Sun, 30 Aug 2009 15:22:28 +0000 (+0200) Subject: write_http plugin: Create one cURL object for each read-thread. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=0b109f8171485c69286a947828ca45847ed9e4b7;p=collectd.git write_http plugin: Create one cURL object for each read-thread. --- diff --git a/src/utils_format_json.c b/src/utils_format_json.c index a9193160..fcd17889 100644 --- a/src/utils_format_json.c +++ b/src/utils_format_json.c @@ -116,8 +116,6 @@ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */ #undef BUFFER_ADD - DEBUG ("format_json: values_to_json: buffer = %s;", buffer); - return (0); } /* }}} int values_to_json */ @@ -171,8 +169,6 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */ #undef BUFFER_ADD_KEYVAL #undef BUFFER_ADD - DEBUG ("format_json: value_list_to_json: buffer = %s;", buffer); - return (0); } /* }}} int value_list_to_json */ diff --git a/src/write_http.c b/src/write_http.c index f14636bd..a11448e5 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -54,9 +54,6 @@ struct wh_callback_s #define WH_FORMAT_JSON 1 int format; - CURL *curl; - char curl_errbuf[CURL_ERROR_SIZE]; - char send_buffer[4096]; size_t send_buffer_free; size_t send_buffer_fill; @@ -66,52 +63,68 @@ struct wh_callback_s }; typedef struct wh_callback_s wh_callback_t; -static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */ +struct wh_curl_s { - memset (cb->send_buffer, 0, sizeof (cb->send_buffer)); - cb->send_buffer_free = sizeof (cb->send_buffer); - cb->send_buffer_fill = 0; - cb->send_buffer_init_time = time (NULL); + CURL *curl; + char errbuf[CURL_ERROR_SIZE]; +}; +typedef struct wh_curl_s wh_curl_t; - if (cb->format == WH_FORMAT_JSON) - { - format_json_initialize (cb->send_buffer, - &cb->send_buffer_fill, - &cb->send_buffer_free); - } -} /* }}} wh_reset_buffer */ +static pthread_once_t curl_key_init = PTHREAD_ONCE_INIT; +static pthread_key_t curl_key; -static int wh_send_buffer (wh_callback_t *cb) /* {{{ */ +static void wh_curl_destroy (void *data) /* {{{ */ { - int status = 0; + wh_curl_t *c = data; - curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer); - status = curl_easy_perform (cb->curl); - if (status != 0) - { - ERROR ("write_http plugin: curl_easy_perform failed with " - "status %i: %s", - status, cb->curl_errbuf); - } - return (status); -} /* }}} wh_send_buffer */ + if (c == NULL) + return; + + DEBUG ("write_http plugin: Destroying a cURL object."); + + curl_easy_cleanup (c->curl); + sfree (c); +} /* }}} void wh_curl_destroy */ + +static void wh_curl_init (void) /* {{{ */ +{ + pthread_key_create(&curl_key, wh_curl_destroy); +} /* }}} void wh_curl_init */ -static int wh_callback_init (wh_callback_t *cb) /* {{{ */ +static wh_curl_t *wh_curl_get (wh_callback_t *cb) /* {{{ */ { struct curl_slist *headers; + wh_curl_t *c; + + pthread_once (&curl_key_init, wh_curl_init); - if (cb->curl != NULL) - return (0); + c = pthread_getspecific (curl_key); + if (c != NULL) + return (c); - cb->curl = curl_easy_init (); - if (cb->curl == NULL) + DEBUG ("write_http plugin: Creating a cURL object."); + + c = malloc (sizeof (*c)); + if (c == NULL) { - ERROR ("curl plugin: curl_easy_init failed."); - return (-1); + ERROR ("write_http plugin: malloc failed."); + return (NULL); + } + memset (c, 0, sizeof (*c)); + + c->curl = curl_easy_init (); + if (c->curl == NULL) + { + ERROR ("write_http plugin: curl_easy_init failed."); + sfree (c); + return (NULL); } - curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION); + curl_easy_setopt (c->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION); + /* The fields from `cb' we read here are only written to at + * configuration time, therefore it's safe to read them without a + * lock. */ headers = NULL; headers = curl_slist_append (headers, "Accept: */*"); if (cb->format == WH_FORMAT_JSON) @@ -119,45 +132,66 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */ else headers = curl_slist_append (headers, "Content-Type: text/plain"); headers = curl_slist_append (headers, "Expect:"); - curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt (c->curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf); - curl_easy_setopt (cb->curl, CURLOPT_URL, cb->location); + curl_easy_setopt (c->curl, CURLOPT_ERRORBUFFER, c->errbuf); + curl_easy_setopt (c->curl, CURLOPT_URL, cb->location); - if (cb->user != NULL) - { - size_t credentials_size; + if (cb->credentials != NULL) + curl_easy_setopt (c->curl, CURLOPT_USERPWD, cb->credentials); - credentials_size = strlen (cb->user) + 2; - if (cb->pass != NULL) - credentials_size += strlen (cb->pass); + curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer); + curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYHOST, + cb->verify_host ? 2 : 0); + if (cb->cacert != NULL) + curl_easy_setopt (c->curl, CURLOPT_CAINFO, cb->cacert); - cb->credentials = (char *) malloc (credentials_size); - if (cb->credentials == NULL) - { - ERROR ("curl plugin: malloc failed."); - return (-1); - } + pthread_setspecific (curl_key, c); - ssnprintf (cb->credentials, credentials_size, "%s:%s", - cb->user, (cb->pass == NULL) ? "" : cb->pass); - curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials); - curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); + return (c); +} /* }}} int wh_curl_get */ + +static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */ +{ + memset (cb->send_buffer, 0, sizeof (cb->send_buffer)); + cb->send_buffer_free = sizeof (cb->send_buffer); + cb->send_buffer_fill = 0; + cb->send_buffer_init_time = time (NULL); + + if (cb->format == WH_FORMAT_JSON) + { + format_json_initialize (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free); } +} /* }}} wh_reset_buffer */ - curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer); - curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYHOST, - cb->verify_host ? 2 : 0); - if (cb->cacert != NULL) - curl_easy_setopt (cb->curl, CURLOPT_CAINFO, cb->cacert); +static int wh_send_buffer (wh_callback_t *cb, /* {{{ */ + const char *buffer) +{ + int status = 0; + wh_curl_t *c; - wh_reset_buffer (cb); + c = wh_curl_get (cb); + if (c == NULL) + return (-1); - return (0); -} /* }}} int wh_callback_init */ + curl_easy_setopt (c->curl, CURLOPT_POSTFIELDS, buffer); + status = curl_easy_perform (c->curl); + if (status != 0) + { + ERROR ("write_http plugin: curl_easy_perform failed with " + "status %i: %s", + status, c->errbuf); + } + + return (status); +} /* }}} wh_send_buffer */ +/* You must hold cb->send_lock when entering `wh_flush_nolock'. */ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ { + char buffer[sizeof (cb->send_buffer)]; int status; DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; " @@ -173,6 +207,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ return (0); } + /* Finalize the send buffer and copy it to `buffer'. */ if (cb->format == WH_FORMAT_COMMAND) { if (cb->send_buffer_fill <= 0) @@ -181,7 +216,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ return (0); } - status = wh_send_buffer (cb); + memcpy (buffer, cb->send_buffer, sizeof (buffer)); wh_reset_buffer (cb); } else if (cb->format == WH_FORMAT_JSON) @@ -203,7 +238,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ return (status); } - status = wh_send_buffer (cb); + memcpy (buffer, cb->send_buffer, sizeof (buffer)); wh_reset_buffer (cb); } else @@ -214,6 +249,14 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ return (-1); } + /* We copied the send buffer to `buffer' and reset it so we can do + * without the `send_lock' here. This allows other read-threads to + * append stuff to the new buffer while we wait for the web-server to + * reply. */ + pthread_mutex_unlock (&cb->send_lock); + status = wh_send_buffer (cb, buffer); + pthread_mutex_lock (&cb->send_lock); + return (status); } /* }}} wh_flush_nolock */ @@ -230,18 +273,6 @@ static int wh_flush (int timeout, /* {{{ */ cb = user_data->data; pthread_mutex_lock (&cb->send_lock); - - if (cb->curl == NULL) - { - status = wh_callback_init (cb); - if (status != 0) - { - ERROR ("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock (&cb->send_lock); - return (-1); - } - } - status = wh_flush_nolock (timeout, cb); pthread_mutex_unlock (&cb->send_lock); @@ -259,7 +290,6 @@ static void wh_callback_free (void *data) /* {{{ */ wh_flush_nolock (/* timeout = */ -1, cb); - curl_easy_cleanup (cb->curl); sfree (cb->location); sfree (cb->user); sfree (cb->pass); @@ -361,17 +391,6 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{ pthread_mutex_lock (&cb->send_lock); - if (cb->curl == NULL) - { - status = wh_callback_init (cb); - if (status != 0) - { - ERROR ("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock (&cb->send_lock); - return (-1); - } - } - if (command_len >= cb->send_buffer_free) { status = wh_flush_nolock (/* timeout = */ -1, cb); @@ -409,17 +428,6 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ * pthread_mutex_lock (&cb->send_lock); - if (cb->curl == NULL) - { - status = wh_callback_init (cb); - if (status != 0) - { - ERROR ("write_http plugin: wh_callback_init failed."); - pthread_mutex_unlock (&cb->send_lock); - return (-1); - } - } - status = format_json_value_list (cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, @@ -565,7 +573,6 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ cb->verify_host = 1; cb->cacert = NULL; cb->format = WH_FORMAT_COMMAND; - cb->curl = NULL; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); @@ -599,6 +606,27 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ DEBUG ("write_http: Registering write callback with URL %s", cb->location); + if (cb->user != NULL) + { + size_t credentials_size; + + credentials_size = strlen (cb->user) + 2; + if (cb->pass != NULL) + credentials_size += strlen (cb->pass); + + cb->credentials = (char *) malloc (credentials_size); + if (cb->credentials == NULL) + { + ERROR ("write_http plugin: malloc failed."); + return (-1); + } + + ssnprintf (cb->credentials, credentials_size, "%s:%s", + cb->user, (cb->pass == NULL) ? "" : cb->pass); + } + + wh_reset_buffer (cb); + memset (&user_data, 0, sizeof (user_data)); user_data.data = cb; user_data.free_func = NULL;