2 * collectd - src/write_stackdriver.c
5 * Copyright (C) 2017 Florian Forster
7 * Permission to use, copy, modify, and/or distribute this software for any
8 * purpose with or without fee is hereby granted, provided that the above
9 * copyright notice and this permission notice appear in all copies.
11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Florian Forster <octo at collectd.org>
26 #include "configfile.h"
28 #include "utils_format_stackdriver.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
32 #include <curl/curl.h>
39 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
42 #ifndef MONITORING_SCOPE
43 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
46 struct wg_callback_s {
51 sd_resource_t *resource;
55 sd_output_t *formatter;
57 char curl_errbuf[CURL_ERROR_SIZE];
59 size_t timeseries_count;
60 cdtime_t send_buffer_init_time;
64 typedef struct wg_callback_s wg_callback_t;
70 typedef struct wg_memory_s wg_memory_t;
72 static size_t wg_write_memory_cb(void *contents, size_t size,
73 size_t nmemb, /* {{{ */
75 size_t realsize = size * nmemb;
76 wg_memory_t *mem = (wg_memory_t *)userp;
78 if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
79 ERROR("integer overflow");
83 mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
84 if (mem->memory == NULL) {
86 ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
90 memcpy(&(mem->memory[mem->size]), contents, realsize);
91 mem->size += realsize;
92 mem->memory[mem->size] = 0;
94 } /* }}} size_t wg_write_memory_cb */
96 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
98 char access_token[256];
99 char authorization_header[256];
101 assert((cb->auth != NULL) || gce_check());
102 if (cb->auth != NULL)
103 status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
105 status = gce_access_token(cb->email, access_token, sizeof(access_token));
107 ERROR("write_stackdriver plugin: Failed to get access token");
111 status = snprintf(authorization_header, sizeof(authorization_header),
112 "Authorization: Bearer %s", access_token);
113 if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
116 return strdup(authorization_header);
117 } /* }}} char *wg_get_authorization_header */
119 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
120 char const *payload) {
122 char final_url[1024];
124 snprintf(final_url, sizeof(final_url), "%s/projects/%s/metricDescriptors",
125 cb->url, cb->project);
126 if ((status < 1) || ((size_t)status >= sizeof(final_url)))
129 char *authorization_header = wg_get_authorization_header(cb);
130 if (authorization_header == NULL)
133 struct curl_slist *headers = NULL;
134 headers = curl_slist_append(headers, "Content-Type: application/json");
135 headers = curl_slist_append(headers, authorization_header);
137 CURL *curl = curl_easy_init();
139 ERROR("write_stackdriver plugin: curl_easy_init failed.");
140 curl_slist_free_all(headers);
141 sfree(authorization_header);
145 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
146 curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
147 PACKAGE_NAME "/" PACKAGE_VERSION);
148 char curl_errbuf[CURL_ERROR_SIZE];
149 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_errbuf);
150 curl_easy_setopt(curl, CURLOPT_URL, final_url);
151 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
152 curl_easy_setopt(curl, CURLOPT_POST, 1L);
153 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload);
156 .memory = NULL, .size = 0,
158 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
159 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
161 status = curl_easy_perform(curl);
162 if (status != CURLE_OK) {
164 "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
165 status, curl_errbuf);
167 curl_easy_cleanup(curl);
168 curl_slist_free_all(headers);
169 sfree(authorization_header);
174 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
175 if ((http_code < 200) || (http_code >= 300)) {
176 ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
177 final_url, http_code);
178 INFO("write_stackdriver plugin: Server replied: %s", res.memory);
180 curl_easy_cleanup(curl);
181 curl_slist_free_all(headers);
182 sfree(authorization_header);
187 curl_easy_cleanup(curl);
188 curl_slist_free_all(headers);
189 sfree(authorization_header);
191 } /* }}} int wg_call_metricdescriptor_create */
193 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
195 cb->timeseries_count = 0;
196 cb->send_buffer_init_time = cdtime();
197 } /* }}} wg_reset_buffer */
199 static int wg_call_timeseries_write(wg_callback_t *cb,
200 char const *payload) /* {{{ */
202 char final_url[1024];
203 int status = snprintf(final_url, sizeof(final_url),
204 "%s/projects/%s/timeSeries", cb->url, cb->project);
205 if ((status < 1) || ((size_t)status >= sizeof(final_url)))
208 char *authorization_header = wg_get_authorization_header(cb);
209 if (authorization_header == NULL)
212 struct curl_slist *headers = NULL;
213 headers = curl_slist_append(headers, authorization_header);
214 headers = curl_slist_append(headers, "Content-Type: application/json");
216 curl_easy_setopt(cb->curl, CURLOPT_URL, final_url);
217 curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
218 curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
219 curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
222 .memory = NULL, .size = 0,
224 curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
225 curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, &res);
227 status = curl_easy_perform(cb->curl);
228 if (status != CURLE_OK) {
230 "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
231 status, cb->curl_errbuf);
233 curl_slist_free_all(headers);
234 sfree(authorization_header);
239 curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
240 if ((http_code < 200) || (http_code >= 300)) {
241 ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
242 final_url, http_code);
243 INFO("write_stackdriver plugin: Server replied: %s", res.memory);
245 curl_slist_free_all(headers);
246 sfree(authorization_header);
251 curl_slist_free_all(headers);
252 sfree(authorization_header);
254 } /* }}} wg_call_timeseries_write */
256 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
258 if (cb->curl != NULL)
261 cb->formatter = sd_output_create(cb->resource);
262 if (cb->formatter == NULL) {
263 ERROR("write_stackdriver plugin: sd_output_create failed.");
267 cb->curl = curl_easy_init();
268 if (cb->curl == NULL) {
269 ERROR("write_stackdriver plugin: curl_easy_init failed.");
273 curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
274 curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
275 PACKAGE_NAME "/" PACKAGE_VERSION);
276 curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
280 } /* }}} int wg_callback_init */
282 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
284 if (cb->timeseries_count == 0) {
285 cb->send_buffer_init_time = cdtime();
289 /* timeout == 0 => flush unconditionally */
291 cdtime_t now = cdtime();
293 if ((cb->send_buffer_init_time + timeout) > now)
297 char *payload = sd_output_reset(cb->formatter);
298 int status = wg_call_timeseries_write(cb, payload);
300 ERROR("write_stackdriver plugin: Sending buffer failed with status %d.",
306 } /* }}} wg_flush_nolock */
308 static int wg_flush(cdtime_t timeout, /* {{{ */
309 const char *identifier __attribute__((unused)),
310 user_data_t *user_data) {
314 if (user_data == NULL)
317 cb = user_data->data;
319 pthread_mutex_lock(&cb->lock);
321 if (cb->curl == NULL) {
322 status = wg_callback_init(cb);
324 ERROR("write_stackdriver plugin: wg_callback_init failed.");
325 pthread_mutex_unlock(&cb->lock);
330 status = wg_flush_nolock(timeout, cb);
331 pthread_mutex_unlock(&cb->lock);
334 } /* }}} int wg_flush */
336 static void wg_callback_free(void *data) /* {{{ */
338 wg_callback_t *cb = data;
342 sd_output_destroy(cb->formatter);
343 cb->formatter = NULL;
349 oauth_destroy(cb->auth);
351 curl_easy_cleanup(cb->curl);
355 } /* }}} void wg_callback_free */
357 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
358 const value_list_t *vl) {
360 for (size_t i = 0; i < ds->ds_num; i++) {
363 int status = sd_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
365 ERROR("write_stackdriver plugin: sd_format_metric_descriptor failed "
372 status = wg_call_metricdescriptor_create(cb, buffer);
374 ERROR("write_stackdriver plugin: wg_call_metricdescriptor_create failed "
382 return sd_output_register_metric(cb->formatter, ds, vl);
383 } /* }}} int wg_metric_descriptors_create */
385 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
386 user_data_t *user_data) {
387 wg_callback_t *cb = user_data->data;
391 pthread_mutex_lock(&cb->lock);
393 if (cb->curl == NULL) {
394 int status = wg_callback_init(cb);
396 ERROR("write_stackdriver plugin: wg_callback_init failed.");
397 pthread_mutex_unlock(&cb->lock);
404 status = sd_output_add(cb->formatter, ds, vl);
405 if (status == 0) { /* success */
407 } else if (status == ENOBUFS) { /* success, flush */
408 wg_flush_nolock(0, cb);
411 } else if (status == EEXIST) {
412 /* metric already in the buffer; flush and retry */
413 wg_flush_nolock(0, cb);
415 } else if (status == ENOENT) {
416 /* new metric, create metric descriptor first */
417 status = wg_metric_descriptors_create(cb, ds, vl);
428 cb->timeseries_count++;
431 pthread_mutex_unlock(&cb->lock);
433 } /* }}} int wg_write */
435 static void wg_check_scope(char const *email) /* {{{ */
437 char *scope = gce_scope(email);
439 WARNING("write_stackdriver plugin: Unable to determine scope of this "
444 if (strstr(scope, MONITORING_SCOPE) == NULL) {
447 /* Strip trailing newline characers for printing. */
448 scope_len = strlen(scope);
449 while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
450 scope[--scope_len] = 0;
452 WARNING("write_stackdriver plugin: The determined scope of this instance "
453 "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
454 "to add this scope to the list of scopes passed to gcutil with "
455 "--service_account_scopes when creating the instance. "
456 "Alternatively, to use this plugin on an instance which does not "
457 "have this scope, use a Service Account.",
458 scope, MONITORING_SCOPE);
462 } /* }}} void wg_check_scope */
464 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
466 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
467 ERROR("write_stackdriver plugin: The \"%s\" option requires exactly one "
473 char *resource_type = ci->values[0].value.string;
475 if (cb->resource != NULL) {
476 sd_resource_destroy(cb->resource);
479 cb->resource = sd_resource_create(resource_type);
480 if (cb->resource == NULL) {
481 ERROR("write_stackdriver plugin: sd_resource_create(\"%s\") failed.",
486 for (int i = 0; i < ci->children_num; i++) {
487 oconfig_item_t *child = ci->children + i;
489 if (strcasecmp("Label", child->key) == 0) {
490 if ((child->values_num != 2) ||
491 (child->values[0].type != OCONFIG_TYPE_STRING) ||
492 (child->values[1].type != OCONFIG_TYPE_STRING)) {
493 ERROR("write_stackdriver plugin: The \"Label\" option needs exactly "
494 "two string arguments.");
498 sd_resource_add_label(cb->resource, child->values[0].value.string,
499 child->values[1].value.string);
504 } /* }}} int wg_config_resource */
506 static int wg_config(oconfig_item_t *ci) /* {{{ */
512 wg_callback_t *cb = calloc(1, sizeof(*cb));
514 ERROR("write_stackdriver plugin: calloc failed.");
517 cb->url = strdup(GCM_API_URL);
518 pthread_mutex_init(&cb->lock, /* attr = */ NULL);
520 char *credential_file = NULL;
522 for (int i = 0; i < ci->children_num; i++) {
523 oconfig_item_t *child = ci->children + i;
524 if (strcasecmp("Project", child->key) == 0)
525 cf_util_get_string(child, &cb->project);
526 else if (strcasecmp("Email", child->key) == 0)
527 cf_util_get_string(child, &cb->email);
528 else if (strcasecmp("Url", child->key) == 0)
529 cf_util_get_string(child, &cb->url);
530 else if (strcasecmp("CredentialFile", child->key) == 0)
531 cf_util_get_string(child, &credential_file);
532 else if (strcasecmp("Resource", child->key) == 0)
533 wg_config_resource(child, cb);
535 ERROR("write_stackdriver plugin: Invalid configuration option: %s.",
537 wg_callback_free(cb);
542 /* Set up authentication */
543 /* Option 1: Credentials file given => use service account */
544 if (credential_file != NULL) {
546 oauth_create_google_file(credential_file, MONITORING_SCOPE);
547 if (cfg.oauth == NULL) {
548 ERROR("write_stackdriver plugin: oauth_create_google_file failed");
549 wg_callback_free(cb);
552 cb->auth = cfg.oauth;
554 if (cb->project == NULL) {
555 cb->project = cfg.project_id;
557 "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
560 sfree(cfg.project_id);
563 /* Option 2: Look for credentials in well-known places */
564 if (cb->auth == NULL) {
565 oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
566 cb->auth = cfg.oauth;
568 if (cb->project == NULL) {
569 cb->project = cfg.project_id;
571 "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
574 sfree(cfg.project_id);
578 if ((cb->auth != NULL) && (cb->email != NULL)) {
579 NOTICE("write_stackdriver plugin: A service account email was configured "
581 "not used for authentication because %s used instead.",
582 (credential_file != NULL) ? "a credential file was"
583 : "application default credentials were");
586 /* Option 3: Running on GCE => use metadata service */
587 if ((cb->auth == NULL) && gce_check()) {
588 wg_check_scope(cb->email);
589 } else if (cb->auth == NULL) {
590 ERROR("write_stackdriver plugin: Unable to determine credentials. Please "
592 "specify the \"Credentials\" option or set up Application Default "
594 wg_callback_free(cb);
598 if ((cb->project == NULL) && gce_check()) {
599 cb->project = gce_project_id();
601 if (cb->project == NULL) {
602 ERROR("write_stackdriver plugin: Unable to determine the project number. "
603 "Please specify the \"Project\" option manually.");
604 wg_callback_free(cb);
608 if ((cb->resource == NULL) && gce_check()) {
609 /* TODO(octo): add error handling */
610 cb->resource = sd_resource_create("gce_instance");
611 sd_resource_add_label(cb->resource, "project_id", gce_project_id());
612 sd_resource_add_label(cb->resource, "instance_id", gce_instance_id());
613 sd_resource_add_label(cb->resource, "zone", gce_zone());
615 if (cb->resource == NULL) {
616 /* TODO(octo): add error handling */
617 cb->resource = sd_resource_create("global");
618 sd_resource_add_label(cb->resource, "project_id", cb->project);
621 DEBUG("write_stackdriver plugin: Registering write callback with URL %s",
623 assert((cb->auth != NULL) || gce_check());
625 user_data_t user_data = {
628 plugin_register_flush("write_stackdriver", wg_flush, &user_data);
630 user_data.free_func = wg_callback_free;
631 plugin_register_write("write_stackdriver", wg_write, &user_data);
634 } /* }}} int wg_config */
636 static int wg_init(void) {
638 /* Call this while collectd is still single-threaded to avoid
639 * initialization issues in libgcrypt. */
640 curl_global_init(CURL_GLOBAL_SSL);
643 } /* }}} int wg_init */
645 void module_register(void) /* {{{ */
647 plugin_register_complex_config("write_stackdriver", wg_config);
648 plugin_register_init("write_stackdriver", wg_init);
649 } /* }}} void module_register */
651 /* vim: set sw=2 sts=2 et fdm=marker : */