--- /dev/null
+/**
+ * collectd - src/cloud_pubsub.c
+ * Copyright (C) 2018 Florian Forster
+ *
+ * MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_format_json.h"
+#include "utils_gce.h"
+#include "utils_oauth.h"
+#include "utils_parse_json.h"
+#include "utils_strbuf.h"
+
+#include <curl/curl.h>
+#include <openssl/bio.h>
+#include <openssl/buffer.h>
+#include <openssl/err.h>
+#include <openssl/evp.h> /* for BIO_f_base64() */
+
+#include <yajl/yajl_gen.h>
+#include <yajl/yajl_tree.h>
+
+#ifndef CLOUD_PUBSUB_BUFFER_SIZE
+#define CLOUD_PUBSUB_BUFFER_SIZE 65536
+#endif
+
+#ifndef CLOUD_PUBSUB_URL
+#define CLOUD_PUBSUB_URL "https://pubsub.googleapis.com/v1"
+#endif
+
+#ifndef CLOUD_PUBSUB_SCOPE
+#define CLOUD_PUBSUB_SCOPE "https://www.googleapis.com/auth/pubsub"
+#endif
+
+struct topic_s {
+ char *name;
+
+ char *email;
+ char *project;
+ char *url;
+ _Bool store_rates;
+ size_t max_messages;
+
+ oauth_t *auth;
+ CURL *curl;
+ char curl_errbuf[256];
+ strbuf_t *buffer;
+ pthread_mutex_t lock;
+};
+typedef struct topic_s topic_t;
+
+struct message_s {
+ char *ack_id;
+ char *data;
+ char *message_id;
+};
+typedef struct message_s message_t;
+
+struct pull_response_s {
+ message_t *messages;
+ size_t messages_num;
+};
+typedef struct pull_response_s pull_response_t;
+
+struct blob_s {
+ char *data;
+ size_t size;
+};
+typedef struct blob_s blob_t;
+
+static size_t write_callback(void *contents, size_t size, size_t nmemb,
+ void *ud) {
+ size_t realsize = size * nmemb;
+ blob_t *blob = ud;
+
+ if ((0x7FFFFFF0 < blob->size) || (0x7FFFFFF0 - blob->size < realsize)) {
+ ERROR("cloud_pubsub plugin: write_callback: integer overflow");
+ return 0;
+ }
+
+ blob->data = realloc(blob->data, blob->size + realsize + 1);
+ if (blob->data == NULL) {
+ /* out of memory! */
+ ERROR("cloud_pubsub plugin: write_callback: not enough memory (realloc "
+ "returned NULL)");
+ return 0;
+ }
+
+ memcpy(blob->data + blob->size, contents, realsize);
+ blob->size += realsize;
+ blob->data[blob->size] = 0;
+
+ return realsize;
+} /* size_t write_callback */
+
+static int access_token(topic_t *t, char *buffer, size_t buffer_size) {
+ /* t->auth is NULL only if we're running on GCE. */
+ assert(t->auth || gce_check());
+
+ if (t->auth != NULL)
+ return oauth_access_token(t->auth, buffer, buffer_size);
+
+ return gce_access_token(t->email, buffer, buffer_size);
+}
+
+typedef struct {
+ int code;
+ char *message;
+} api_error_t;
+
+static api_error_t *parse_api_error(char const *body) {
+ char errbuf[1024];
+ yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
+ if (root == NULL) {
+ ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
+ return NULL;
+ }
+
+ api_error_t *err = calloc(1, sizeof(*err));
+ if (err == NULL) {
+ ERROR("cloud_pubsub plugin: calloc failed");
+ yajl_tree_free(root);
+ return NULL;
+ }
+
+ yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
+ yajl_t_number);
+ if (code != NULL) {
+ err->code = YAJL_GET_INTEGER(code);
+ }
+
+ yajl_val message = yajl_tree_get(
+ root, (char const *[]){"error", "message", NULL}, yajl_t_string);
+ if (message != NULL) {
+ err->message = strdup(YAJL_GET_STRING(message));
+ }
+
+ return err;
+}
+
+static char *api_error_string(api_error_t *err, char *buffer,
+ size_t buffer_size) {
+ if (err == NULL) {
+ strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
+ } else if (err->message == NULL) {
+ snprintf(buffer, buffer_size, "API error %d", err->code);
+ } else {
+ snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
+ }
+
+ return buffer;
+}
+#define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
+
+// do_post does a HTTP POST request, assuming a JSON payload and using OAuth
+// authentication. Returns -1 on error and the HTTP status code otherwise.
+// ret_content, if not NULL, will contain the server's response.
+// If ret_content is provided and the server responds with a 4xx or 5xx error,
+// an appropriate message will be logged.
+static int do_post(topic_t *t, char const *url, void const *payload,
+ blob_t *ret_content) {
+ if (t->curl == NULL) {
+ t->curl = curl_easy_init();
+ if (t->curl == NULL) {
+ ERROR("cloud_pubsub plugin: curl_easy_init() failed");
+ return -1;
+ }
+
+ curl_easy_setopt(t->curl, CURLOPT_ERRORBUFFER, t->curl_errbuf);
+ curl_easy_setopt(t->curl, CURLOPT_NOSIGNAL, 1L);
+ }
+
+ curl_easy_setopt(t->curl, CURLOPT_POST, 1L);
+ curl_easy_setopt(t->curl, CURLOPT_URL, url);
+
+ /* header */
+ char tok[256];
+ int status = access_token(t, tok, sizeof(tok));
+ if (status != 0) {
+ ERROR("cloud_pubsub plugin: getting access token failed with status %d",
+ status);
+ return -1;
+ }
+ char auth_header[384];
+ snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", tok);
+
+ struct curl_slist *headers =
+ curl_slist_append(NULL, "Content-Type: application/json");
+ headers = curl_slist_append(headers, auth_header);
+ curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, headers);
+
+ curl_easy_setopt(t->curl, CURLOPT_POSTFIELDS, payload);
+
+ curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION,
+ ret_content ? write_callback : NULL);
+ curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, ret_content);
+
+ status = curl_easy_perform(t->curl);
+
+ /* clean up that has to happen in any case */
+ curl_slist_free_all(headers);
+ curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, NULL);
+ curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, NULL);
+ curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, NULL);
+
+ if (status != CURLE_OK) {
+ ERROR("cloud_pubsub plugin: POST %s failed: %s", url, t->curl_errbuf);
+ sfree(ret_content->data);
+ ret_content->size = 0;
+ return -1;
+ }
+
+ long http_code = 0;
+ curl_easy_getinfo(t->curl, CURLINFO_RESPONSE_CODE, &http_code);
+
+ if (ret_content != NULL) {
+ if ((status >= 400) && (status < 500)) {
+ ERROR("write_stackdriver plugin: POST %s: %s", url,
+ API_ERROR_STRING(parse_api_error(ret_content->data)));
+ } else if (status >= 500) {
+ WARNING("write_stackdriver plugin: POST %s: %s", url, ret_content->data);
+ }
+ }
+
+ return (int)http_code;
+}
+
+static void topic_free(topic_t *t) {
+ if (t == NULL)
+ return;
+
+ sfree(t->name);
+ oauth_destroy(t->auth);
+ sfree(t->email);
+ sfree(t->project);
+ if (t->curl != NULL)
+ curl_easy_cleanup(t->curl);
+ strbuf_destroy(t->buffer);
+ sfree(t);
+} /* void topic_free */
+
+static topic_t *topic_alloc(void) {
+ topic_t *t = calloc(1, sizeof(*t));
+ if (t == NULL)
+ return NULL;
+
+ t->url = strdup(CLOUD_PUBSUB_URL);
+ t->max_messages = 1000;
+ pthread_mutex_init(&t->lock, /* attr = */ NULL);
+
+ return t;
+} /* topic_t *topic_alloc */
+
+/* pubsub_topic_publish calls a topic's "publish" method. */
+static int pubsub_topic_publish(topic_t *t) {
+ char url[1024];
+ snprintf(url, sizeof(url), "%s/projects/%s/topics/%s:publish", t->url,
+ t->project, t->name);
+
+ char *payload = t->buffer->buffer;
+
+ blob_t response = {0};
+
+ int status = do_post(t, url, payload, &response);
+ if (status == -1) {
+ ERROR("cloud_pubsub plugin: POST %s failed", url);
+ return -1;
+ }
+ sfree(response.data);
+
+ if (status != 200) {
+ ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+ "want 200",
+ url, status);
+ return -1;
+ }
+ return 0;
+} /* int pubsub_topic_publish */
+
+static int base64_encode(char *buffer, size_t buffer_size) {
+ /* Set up the memory-base64 chain */
+ BIO *b64 = BIO_new(BIO_f_base64());
+ BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+ b64 = BIO_push(b64, BIO_new(BIO_s_mem()));
+
+ /* Write data to the chain */
+ BIO_write(b64, buffer, strlen(buffer));
+ if (BIO_flush(b64) != 1) {
+ BIO_free_all(b64);
+ return (-1);
+ }
+
+ /* Never fails */
+ BUF_MEM *ptr;
+ BIO_get_mem_ptr(b64, &ptr);
+
+ if (buffer_size <= ptr->length) {
+ BIO_free_all(b64);
+ return (ENOMEM);
+ }
+
+ /* Copy data to buffer. */
+ memcpy(buffer, ptr->data, ptr->length);
+ buffer[ptr->length] = 0;
+
+ BIO_free_all(b64);
+ return 0;
+} /* int base64_encode */
+
+static int base64_decode(char const *in, char *buffer, size_t buffer_size) {
+ /* Set up the memory-base64 chain */
+ BIO *b64 = BIO_new(BIO_f_base64());
+ BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+ b64 = BIO_push(b64, BIO_new_mem_buf(in, -1));
+
+ int status = BIO_read(b64, buffer, buffer_size);
+ if (status < 0) {
+ BIO_free_all(b64);
+ return status;
+ }
+ if (status >= buffer_size) {
+ BIO_free_all(b64);
+ return ENOMEM;
+ }
+
+ buffer[status] = 0;
+
+ BIO_free_all(b64);
+ return 0;
+}
+
+/* parse_pull parses the JSON returned by a "pull" request and returns the
+ * messages in a pull_response_t*. */
+static pull_response_t *parse_pull(char const *body) {
+ char const *path[] = {"receivedMessages", NULL};
+
+ char errbuf[1024];
+ yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
+ if (root == NULL) {
+ ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
+ return NULL;
+ }
+
+ yajl_val messages = yajl_tree_get(root, path, yajl_t_array);
+ if ((messages == NULL) || !YAJL_IS_ARRAY(messages)) {
+ yajl_tree_free(root);
+ return NULL;
+ }
+
+ pull_response_t *res = calloc(1, sizeof(*res));
+ if (res == NULL) {
+ yajl_tree_free(root);
+ return NULL;
+ }
+
+ for (size_t i = 0; i < YAJL_GET_ARRAY(messages)->len; i++) {
+ yajl_val msg_var = YAJL_GET_ARRAY(messages)->values[i];
+ char const *ackid_path[] = {"ackId", NULL};
+ char const *data_path[] = {"message", "data", NULL};
+ char const *msg_id_path[] = {"message", "messageId", NULL};
+
+ yajl_val ackid_var = yajl_tree_get(msg_var, ackid_path, yajl_t_string);
+ yajl_val data_var = yajl_tree_get(msg_var, data_path, yajl_t_string);
+ yajl_val msg_id_var = yajl_tree_get(msg_var, msg_id_path, yajl_t_string);
+
+ if ((ackid_var == NULL) || (data_var == NULL) || (msg_id_var == NULL))
+ continue;
+
+ message_t *msg = realloc(res->messages,
+ sizeof(*res->messages) * (res->messages_num + 1));
+ if (msg == NULL)
+ continue;
+ res->messages = msg;
+ msg = res->messages + res->messages_num;
+
+ msg->ack_id = strdup(YAJL_GET_STRING(ackid_var));
+ msg->message_id = strdup(YAJL_GET_STRING(msg_id_var));
+ msg->data = strdup(YAJL_GET_STRING(data_var));
+
+ res->messages_num++;
+ }
+
+ yajl_tree_free(root);
+ return res;
+} /* pull_response_t *parse_pull */
+
+/* pubsub_topic_pull calls a subscription's "pull" method. */
+static int pubsub_topic_pull(topic_t *t, blob_t *ret_content) {
+ char url[1024];
+ snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:pull", t->url,
+ t->project, t->name);
+
+ char payload[128];
+ snprintf(payload, sizeof(payload),
+ "{\"returnImmediately\":\"false\",\"maxMessages\":\"%zu\"}",
+ t->max_messages);
+
+ int status = do_post(t, url, payload, ret_content);
+ if (status == -1)
+ return -1;
+
+ if (status != 200) {
+ ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+ "want 200",
+ url, status);
+ sfree(ret_content->data);
+ ret_content->size = 0;
+ return -1;
+ }
+ return 0;
+} /* char *pubsub_topic_pull */
+
+static char *format_ack(char **ack_ids, size_t ack_ids_num) {
+ yajl_gen gen;
+ unsigned char const *out;
+ size_t out_size;
+ char *ret;
+ char key[] = "ackIds";
+ size_t i;
+
+ gen = yajl_gen_alloc(NULL);
+ if (gen == NULL)
+ return NULL;
+
+ yajl_gen_map_open(gen);
+ yajl_gen_string(gen, (unsigned char *)&key[0], (unsigned int)strlen(key));
+ yajl_gen_array_open(gen);
+ for (i = 0; i < ack_ids_num; i++)
+ yajl_gen_string(gen, (unsigned char *)ack_ids[i],
+ (unsigned int)strlen(ack_ids[i]));
+ yajl_gen_array_close(gen);
+ yajl_gen_map_close(gen);
+
+ if (yajl_gen_get_buf(gen, &out, &out_size) != yajl_gen_status_ok) {
+ yajl_gen_free(gen);
+ return NULL;
+ }
+
+ ret = strdup((char const *)out);
+ yajl_gen_free(gen);
+ return ret;
+} /* char *format_ack */
+
+/* pubsub_topic_ack calls a subscription's "ack" method. */
+static int pubsub_topic_ack(topic_t *t, char **ack_ids, size_t ack_ids_num) {
+ char url[1024];
+ snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:acknowledge",
+ t->url, t->project, t->name);
+
+ char *payload = format_ack(ack_ids, ack_ids_num);
+
+ blob_t response = {0};
+
+ int status = do_post(t, url, payload, &response);
+ sfree(payload);
+ if (status == -1) {
+ ERROR("cloud_pubsub plugin: POST %s failed", url);
+ return -1;
+ }
+ sfree(response.data);
+
+ if (status != 200) {
+ ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+ "want 200",
+ url, status);
+ return -1;
+ }
+ return 0;
+} /* int pubsub_topic_ack */
+
+/* topic_tryadd tries to add a valud list to the topic's buffer. If
+ * this fails, for example because there is not enough space, it may leave the
+ * buffer in a modified / inconsistent state. */
+static int topic_tryadd(topic_t *t, data_set_t const *ds,
+ value_list_t const *vl) {
+ char json[1024];
+ size_t json_fill = 0;
+ size_t json_free = sizeof(json);
+ int status;
+
+ status = format_json_initialize(json, &json_fill, &json_free);
+ if (status != 0)
+ return status;
+
+ status = format_json_value_list(json, &json_fill, &json_free, ds, vl,
+ t->store_rates);
+ if (status != 0)
+ return status;
+
+ status = format_json_finalize(json, &json_fill, &json_free);
+ if (status != 0)
+ return status;
+
+ status = base64_encode(json, sizeof(json));
+ if (status != 0)
+ return -1;
+
+ if (t->buffer->used == 0) /* initialize */
+ status = strbuf_add(t->buffer, "{\"messages\":[{\"data\":\"");
+ else
+ status = strbuf_add(t->buffer, "\"},{\"data\":\"");
+ if (status != 0)
+ return status;
+
+ return strbuf_add(t->buffer, json);
+} /* int topic_tryadd */
+
+/* topic_add adds a value list to the topic's buffer in an
+ * all-or-nothing fashion, i.e. if there is not enough space, it will not
+ * modify
+ * the buffer and return ENOMEM. */
+static int topic_add(topic_t *t, data_set_t const *ds, value_list_t const *vl) {
+ size_t saved_used = t->buffer->used;
+ size_t saved_free = t->buffer->free;
+ int status;
+
+ status = topic_tryadd(t, ds, vl);
+ if ((status == 0) && (t->buffer->free < 5))
+ status = ENOMEM;
+
+ if (status == 0)
+ return 0;
+
+ t->buffer->used = saved_used;
+ t->buffer->free = saved_free;
+ t->buffer->buffer[t->buffer->used] = 0;
+
+ return status;
+} /* int topic_add */
+
+static int handle_message(message_t msg) {
+ size_t json_size = ((strlen(msg.data) * 3) / 4) + 1;
+ char json[json_size];
+
+ int status = base64_decode(msg.data, json, json_size);
+ if (status != 0) {
+ ERROR("cloud_pubsub plugin: base64_decode failed: %d", status);
+ return status;
+ }
+
+ value_list_t **value_lists = NULL;
+ size_t value_lists_num = 0;
+ status = parse_json(json, &value_lists, &value_lists_num);
+ if (status != 0) {
+ ERROR("cloud_pubsub plugin: parse_json() failed: %d", status);
+ return status;
+ }
+
+ for (size_t i = 0; i < value_lists_num; i++) {
+ value_list_t *vl = value_lists[i];
+
+ int status = plugin_dispatch_values(vl);
+ if (status != 0) {
+ NOTICE("cloud_pubsub plugin: plugin_dispatch_values() failed: %d",
+ status);
+ }
+
+ sfree(vl->values);
+ sfree(vl);
+ }
+
+ sfree(value_lists);
+ return 0;
+} /* int handle_message */
+
+static int topic_pull(topic_t *t) {
+ blob_t json = {0};
+ int status = pubsub_topic_pull(t, &json);
+ if (status != 0) {
+ return status;
+ }
+
+ pull_response_t *res = parse_pull(json.data);
+ sfree(json.data);
+ if (res == NULL)
+ return -1;
+
+ char **ack_ids = NULL;
+ size_t ack_ids_num = 0;
+ for (size_t i = 0; i < res->messages_num; i++) {
+ message_t msg = res->messages[i];
+
+ int status = handle_message(msg);
+ if (status == 0)
+ strarray_add(&ack_ids, &ack_ids_num, msg.ack_id);
+
+ sfree(msg.ack_id);
+ sfree(msg.data);
+ sfree(msg.message_id);
+ }
+
+ sfree(res->messages);
+ sfree(res);
+
+ if (ack_ids_num > 0) {
+ int status = pubsub_topic_ack(t, ack_ids, ack_ids_num);
+ strarray_free(ack_ids, ack_ids_num);
+ return status;
+ }
+
+ return 0;
+} /* int topic_pull */
+
+static void *receive_thread(void *arg) {
+ topic_t *t = arg;
+ cdtime_t delay = 0;
+
+ while (42) {
+ int status = topic_pull(t);
+ if (status == 0) {
+ delay = 0;
+ continue;
+ }
+
+ /* failure: use exponential backoff. */
+ if (delay == 0) {
+ delay = MS_TO_CDTIME_T(64);
+ } else {
+ delay = 2 * delay;
+ }
+ if (delay > TIME_T_TO_CDTIME_T(10)) {
+ delay = TIME_T_TO_CDTIME_T(10);
+ }
+ NOTICE(
+ "cloud_pubsub plugin: topic_pull() failed, sleeping for %.3f seconds.",
+ CDTIME_T_TO_DOUBLE(delay));
+
+ struct timespec ts = CDTIME_T_TO_TIMESPEC(delay);
+ while (ts.tv_sec != 0 || ts.tv_nsec != 0) {
+ int status = nanosleep(&ts, &ts);
+ if (status == 0) {
+ break;
+ }
+ }
+ }
+
+ return (NULL);
+} /* void *receive_thread */
+
+/* topic_flush_locked flushes a topic's internal buffer. You must hold t->lock
+ * when calling this function. */
+static int topic_flush_locked(topic_t *t) {
+ int status;
+
+ if (t->buffer->used == 0)
+ return 0;
+
+ /* finalize the buffer */
+ status = strbuf_add(t->buffer, "\"}]}");
+ if (status != 0) {
+ strbuf_reset(t->buffer);
+ return status;
+ }
+
+ status = pubsub_topic_publish(t);
+ strbuf_reset(t->buffer);
+
+ return status;
+} /* int topic_flush_locked */
+
+static int cps_flush(__attribute__((unused)) cdtime_t timeout,
+ __attribute__((unused)) char const *id, user_data_t *ud) {
+ topic_t *t = ud->data;
+
+ pthread_mutex_lock(&t->lock);
+ int status = topic_flush_locked(t);
+ pthread_mutex_unlock(&t->lock);
+
+ return status;
+} /* int cps_flush */
+
+static int cps_write(data_set_t const *ds, value_list_t const *vl,
+ user_data_t *ud) {
+ topic_t *t = ud->data;
+
+ pthread_mutex_lock(&t->lock);
+
+ int status = topic_add(t, ds, vl);
+ if (status != ENOMEM) {
+ if (status != 0)
+ ERROR("cloud_pubsub plugin: topic_add (\"%s\") failed with status %d.",
+ t->name, status);
+ pthread_mutex_unlock(&t->lock);
+ return status;
+ }
+
+ status = topic_flush_locked(t);
+ if (status != 0) {
+ ERROR("cloud_pubsub plugin: topic_flush_locked (\"%s\") failed with status "
+ "%d.",
+ t->name, status);
+ pthread_mutex_unlock(&t->lock);
+ return status;
+ }
+
+ status = topic_add(t, ds, vl);
+ if (status != 0)
+ ERROR("cloud_pubsub plugin: topic_add[retry] (\"%s\") failed with status "
+ "%d.",
+ t->name, status);
+
+ pthread_mutex_unlock(&t->lock);
+ return status;
+} /* int cps_write */
+
+static int cps_init(void) {
+ /* Call this while collectd is still single-threaded to avoid
+ * initialization issues in libgcrypt. */
+ curl_global_init(CURL_GLOBAL_SSL);
+
+ ERR_load_crypto_strings();
+
+ return 0;
+} /* int cps_init */
+
+static void check_scope(char const *email) /* {{{ */
+{
+ char *scope = gce_scope(email);
+ if (scope == NULL) {
+ WARNING("cloud_pubsub plugin: Unable to determine scope of this "
+ "instance.");
+ return;
+ }
+
+ if (strstr(scope, CLOUD_PUBSUB_SCOPE) == NULL) {
+ size_t scope_len;
+
+ /* Strip trailing newline characers for printing. */
+ scope_len = strlen(scope);
+ while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
+ scope[--scope_len] = 0;
+
+ WARNING("cloud_pubsub plugin: The determined scope of this instance "
+ "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
+ "to add this scope to the list of scopes passed to gcutil with "
+ "--service_account_scopes when creating the instance. "
+ "Alternatively, to use this plugin on an instance which does not "
+ "have this scope, use a Service Account.",
+ scope, CLOUD_PUBSUB_SCOPE);
+ }
+
+ sfree(scope);
+} /* }}} void check_scope */
+
+static int cps_config_topic(oconfig_item_t *ci) {
+ topic_t *t = topic_alloc();
+ if (t == NULL)
+ return ENOMEM;
+
+ if (cf_util_get_string(ci, &t->name) != 0) {
+ topic_free(t);
+ return -1;
+ }
+
+ char *credential_file = NULL;
+ int conf_buffer_size = CLOUD_PUBSUB_BUFFER_SIZE;
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Project", child->key) == 0)
+ cf_util_get_string(child, &t->project);
+ else if (strcasecmp("Email", child->key) == 0)
+ cf_util_get_string(child, &t->email);
+ else if (strcasecmp("Url", child->key) == 0)
+ cf_util_get_string(child, &t->url);
+ else if (strcasecmp("CredentialFile", child->key) == 0)
+ cf_util_get_string(child, &credential_file);
+ else if (strcasecmp("StoreRates", child->key) == 0)
+ cf_util_get_boolean(child, &t->store_rates);
+ else if (strcasecmp("BufferSize", child->key) == 0) {
+ cf_util_get_int(child, &conf_buffer_size);
+ if (conf_buffer_size < 1024) {
+ ERROR("cloud_pubsub plugin: BufferSize %d is too small. Using 1024 "
+ "byte.",
+ conf_buffer_size);
+ conf_buffer_size = 1024;
+ }
+ } else if (strcasecmp("MaxMessages", child->key) == 0) {
+ int max_messages = 0;
+ if (cf_util_get_int(child, &max_messages) != 0) {
+ continue;
+ }
+ if (max_messages < 1) {
+ ERROR("cloud_pubsub plugin: MaxMessages %d is too small.",
+ max_messages);
+ continue;
+ }
+ t->max_messages = (size_t)max_messages;
+ } else
+ ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
+ child->key);
+ }
+
+ /* Set up authentication */
+ /* Option 1: Credentials file given => use service account */
+ if (credential_file != NULL) {
+ oauth_google_t cfg =
+ oauth_create_google_file(credential_file, CLOUD_PUBSUB_SCOPE);
+ if (cfg.oauth == NULL) {
+ ERROR("cloud_pubsub plugin: oauth_create_google_file failed");
+ topic_free(t);
+ return EINVAL;
+ }
+ t->auth = cfg.oauth;
+
+ if (t->project == NULL) {
+ t->project = cfg.project_id;
+ INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
+ t->project);
+ } else {
+ sfree(cfg.project_id);
+ }
+ }
+
+ /* Option 2: Look for credentials in well-known places */
+ if (t->auth == NULL) {
+ oauth_google_t cfg = oauth_create_google_default(CLOUD_PUBSUB_SCOPE);
+ t->auth = cfg.oauth;
+
+ if (t->project == NULL) {
+ t->project = cfg.project_id;
+ INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
+ t->project);
+ } else {
+ sfree(cfg.project_id);
+ }
+ }
+
+ if ((t->auth != NULL) && (t->email != NULL)) {
+ NOTICE("cloud_pubsub plugin: A service account email was configured "
+ "but is "
+ "not used for authentication because %s used instead.",
+ (credential_file != NULL) ? "a credential file was"
+ : "application default credentials were");
+ }
+
+ /* Option 3: Running on GCE => use metadata service */
+ if ((t->auth == NULL) && gce_check()) {
+ check_scope(t->email);
+ } else if (t->auth == NULL) {
+ ERROR("cloud_pubsub plugin: Unable to determine credentials. Please "
+ "either "
+ "specify the \"Credentials\" option or set up Application Default "
+ "Credentials.");
+ topic_free(t);
+ return EINVAL;
+ }
+
+ if ((t->project == NULL) && gce_check()) {
+ t->project = gce_project_id();
+ }
+ if (t->project == NULL) {
+ ERROR("cloud_pubsub plugin: Unable to determine the project number. "
+ "Please specify the \"Project\" option manually.");
+ topic_free(t);
+ return EINVAL;
+ }
+
+ if (strcasecmp("Publish", ci->key) == 0) {
+ t->buffer = strbuf_create((size_t)conf_buffer_size);
+ if (t->buffer == NULL) {
+ ERROR("cloud_pubsub plugin: strbuf_create failed.");
+ topic_free(t);
+ return -1;
+ }
+
+ assert(t->name != NULL);
+ assert(t->project != NULL);
+
+ char cbname[128];
+ snprintf(cbname, sizeof(cbname), "cloud_pubsub/%s", t->name);
+
+ plugin_register_write(cbname, cps_write,
+ &(user_data_t){
+ .data = t, .free_func = (void *)topic_free,
+ });
+ plugin_register_flush(cbname, cps_flush, &(user_data_t){.data = t});
+ } else { /* if (strcasecmp ("Subscribe", ci->key) == 0) */
+ pthread_t tid;
+
+ /* TODO(octo): Store thread_id and kill threads in shutdown. */
+ int status = plugin_thread_create(&tid,
+ /* attrs = */ NULL, receive_thread,
+ /* arg = */ t,
+ /* name = */ "cloud_pubsub recv");
+ if (status != 0) {
+ char errbuf[1024];
+ ERROR("cloud_pubsub plugin: pthread_create failed: %s",
+ sstrerror(errno, errbuf, sizeof(errbuf)));
+ topic_free(t);
+ return -1;
+ }
+ }
+
+ return 0;
+} /* int cps_config_topic */
+
+static int cps_config(oconfig_item_t *ci) {
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ int status = -1;
+
+ if (strcasecmp("Publish", child->key) == 0)
+ status = cps_config_topic(child);
+ else if (strcasecmp("Subscribe", child->key) == 0)
+ status = cps_config_topic(child);
+ else
+ ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
+ child->key);
+
+ if (status != 0)
+ return status;
+ }
+
+ return 0;
+} /* int cps_config */
+
+void module_register(void) {
+ plugin_register_complex_config("cloud_pubsub", cps_config);
+ plugin_register_init("cloud_pubsub", cps_init);
+}