2 * collectd - src/grpc.cc
3 * Copyright (C) 2015-2016 Sebastian Harl
4 * Copyright (C) 2016 Florian octo Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastian Harl <sh at tokkee.org>
26 * Florian octo Forster <octo at collectd.org>
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
37 #include "collectd.grpc.pb.h"
47 #include "daemon/utils_cache.h"
50 using collectd::Collectd;
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
57 using google::protobuf::util::TimeUtil;
67 grpc::SslServerCredentialsOptions *ssl;
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
77 if (fnmatch(matcher->host, vl->host, 0))
80 if (fnmatch(matcher->plugin, vl->plugin, 0))
82 if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
85 if (fnmatch(matcher->type, vl->type, 0))
87 if (fnmatch(matcher->type_instance, vl->type_instance, 0))
93 static grpc::string read_file(const char *filename) {
95 grpc::string s, content;
99 ERROR("grpc: Failed to open '%s'", filename);
103 while (std::getline(f, s)) {
105 content.push_back('\n');
115 static void marshal_ident(const value_list_t *vl,
116 collectd::types::Identifier *msg) {
117 msg->set_host(vl->host);
118 msg->set_plugin(vl->plugin);
119 if (vl->plugin_instance[0] != '\0')
120 msg->set_plugin_instance(vl->plugin_instance);
121 msg->set_type(vl->type);
122 if (vl->type_instance[0] != '\0')
123 msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
127 value_list_t *vl, bool require_fields) {
131 if (!s.length() && require_fields)
132 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
133 grpc::string("missing host name"));
134 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137 if (!s.length() && require_fields)
138 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
139 grpc::string("missing plugin name"));
140 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143 if (!s.length() && require_fields)
144 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
145 grpc::string("missing type name"));
146 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
148 s = msg.plugin_instance();
149 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
151 s = msg.type_instance();
152 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
154 return grpc::Status::OK;
155 } /* unmarshal_ident() */
157 static grpc::Status marshal_value_list(const value_list_t *vl,
158 collectd::types::ValueList *msg) {
159 auto id = msg->mutable_identifier();
160 marshal_ident(vl, id);
162 auto ds = plugin_get_ds(vl->type);
163 if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
164 return grpc::Status(grpc::StatusCode::INTERNAL,
165 grpc::string("failed to retrieve data-set for values"));
168 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
169 auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
170 msg->set_allocated_time(new google::protobuf::Timestamp(t));
171 msg->set_allocated_interval(new google::protobuf::Duration(d));
173 msg->clear_meta_data();
174 if (vl->meta != nullptr) {
175 char **meta_data_keys = nullptr;
176 int meta_data_keys_len = meta_data_toc(vl->meta, &meta_data_keys);
177 if (meta_data_keys_len < 0) {
178 return grpc::Status(grpc::StatusCode::INTERNAL,
179 grpc::string("error getting metadata keys"));
182 for (int i = 0; i < meta_data_keys_len; i++) {
183 char *key = meta_data_keys[i];
184 int md_type = meta_data_type(vl->meta, key);
186 collectd::types::MetadataValue md_value;
192 if (meta_data_get_string(vl->meta, key, &md_string) != 0 || md_string == nullptr) {
193 strarray_free(meta_data_keys, meta_data_keys_len);
194 return grpc::Status(grpc::StatusCode::INTERNAL,
195 grpc::string("missing metadata"));
197 md_value.set_string_value(md_string);
200 case MD_TYPE_SIGNED_INT:
202 if (meta_data_get_signed_int(vl->meta, key, &int64_value) != 0) {
203 strarray_free(meta_data_keys, meta_data_keys_len);
204 return grpc::Status(grpc::StatusCode::INTERNAL,
205 grpc::string("missing metadata"));
207 md_value.set_int64_value(int64_value);
209 case MD_TYPE_UNSIGNED_INT:
210 uint64_t uint64_value;
211 if (meta_data_get_unsigned_int(vl->meta, key, &uint64_value) != 0) {
212 strarray_free(meta_data_keys, meta_data_keys_len);
213 return grpc::Status(grpc::StatusCode::INTERNAL,
214 grpc::string("missing metadata"));
216 md_value.set_uint64_value(uint64_value);
220 if (meta_data_get_double(vl->meta, key, &double_value) != 0) {
221 strarray_free(meta_data_keys, meta_data_keys_len);
222 return grpc::Status(grpc::StatusCode::INTERNAL,
223 grpc::string("missing metadata"));
225 md_value.set_double_value(double_value);
227 case MD_TYPE_BOOLEAN:
229 if (meta_data_get_boolean(vl->meta, key, &bool_value) != 0) {
230 strarray_free(meta_data_keys, meta_data_keys_len);
231 return grpc::Status(grpc::StatusCode::INTERNAL,
232 grpc::string("missing metadata"));
234 md_value.set_bool_value(bool_value);
237 strarray_free(meta_data_keys, meta_data_keys_len);
238 return grpc::Status(grpc::StatusCode::INTERNAL,
239 grpc::string("unknown metadata type"));
242 (*msg->mutable_meta_data())[grpc::string(key)] = md_value;
244 if (meta_data_keys != nullptr) {
245 strarray_free(meta_data_keys, meta_data_keys_len);
250 for (size_t i = 0; i < vl->values_len; ++i) {
251 auto v = msg->add_values();
252 int value_type = ds->ds[i].type;
253 switch (value_type) {
254 case DS_TYPE_COUNTER:
255 v->set_counter(vl->values[i].counter);
258 v->set_gauge(vl->values[i].gauge);
261 v->set_derive(vl->values[i].derive);
263 case DS_TYPE_ABSOLUTE:
264 v->set_absolute(vl->values[i].absolute);
267 ERROR("grpc: invalid value type (%d)", value_type);
268 return grpc::Status(grpc::StatusCode::INTERNAL,
269 grpc::string("unknown value type"));
272 auto name = msg->add_ds_names();
273 name->assign(ds->ds[i].name);
276 return grpc::Status::OK;
277 } /* marshal_value_list */
279 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
281 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
283 NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
285 auto status = unmarshal_ident(msg.identifier(), vl, true);
289 vl->meta = meta_data_create();
290 if (vl->meta == nullptr) {
291 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
292 grpc::string("failed to metadata list"));
294 for (auto kv: msg.meta_data()) {
295 auto k = kv.first.c_str();
298 // The meta_data collection individually allocates copies of the keys and
299 // string values for each entry, so it's safe for us to pass a reference
300 // to our short-lived strings.
302 switch (v.value_case()) {
303 case collectd::types::MetadataValue::ValueCase::kStringValue:
304 meta_data_add_string(vl->meta, k, v.string_value().c_str());
306 case collectd::types::MetadataValue::ValueCase::kInt64Value:
307 meta_data_add_signed_int(vl->meta, k, v.int64_value());
309 case collectd::types::MetadataValue::ValueCase::kUint64Value:
310 meta_data_add_unsigned_int(vl->meta, k, v.uint64_value());
312 case collectd::types::MetadataValue::ValueCase::kDoubleValue:
313 meta_data_add_double(vl->meta, k, v.double_value());
315 case collectd::types::MetadataValue::ValueCase::kBoolValue:
316 meta_data_add_boolean(vl->meta, k, v.bool_value());
319 meta_data_destroy(vl->meta);
320 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
321 grpc::string("Metadata of unknown type"));
325 value_t *values = NULL;
326 size_t values_len = 0;
328 status = grpc::Status::OK;
329 for (auto v : msg.values()) {
331 (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
333 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
334 grpc::string("failed to allocate values array"));
339 val = values + values_len;
342 switch (v.value_case()) {
343 case collectd::types::Value::ValueCase::kCounter:
344 val->counter = counter_t(v.counter());
346 case collectd::types::Value::ValueCase::kGauge:
347 val->gauge = gauge_t(v.gauge());
349 case collectd::types::Value::ValueCase::kDerive:
350 val->derive = derive_t(v.derive());
352 case collectd::types::Value::ValueCase::kAbsolute:
353 val->absolute = absolute_t(v.absolute());
356 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
357 grpc::string("unknown value type"));
366 vl->values_len = values_len;
368 meta_data_destroy(vl->meta);
375 } /* unmarshal_value_list() */
380 class CollectdImpl : public collectd::Collectd::Service {
383 QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
384 grpc::ServerWriter<QueryValuesResponse> *writer) override {
386 auto status = unmarshal_ident(req->identifier(), &match, false);
391 std::queue<value_list_t> value_lists;
392 status = this->queryValuesRead(&match, &value_lists);
394 status = this->queryValuesWrite(ctx, writer, &value_lists);
397 while (!value_lists.empty()) {
398 auto vl = value_lists.front();
401 meta_data_destroy(vl.meta);
407 grpc::Status PutValues(grpc::ServerContext *ctx,
408 grpc::ServerReader<PutValuesRequest> *reader,
409 PutValuesResponse *res) override {
410 PutValuesRequest req;
412 while (reader->Read(&req)) {
413 value_list_t vl = {0};
414 auto status = unmarshal_value_list(req.value_list(), &vl);
418 if (plugin_dispatch_values(&vl))
420 grpc::StatusCode::INTERNAL,
421 grpc::string("failed to enqueue values for writing"));
425 return grpc::Status::OK;
429 grpc::Status queryValuesRead(value_list_t const *match,
430 std::queue<value_list_t> *value_lists) {
432 if ((iter = uc_get_iterator()) == NULL) {
434 grpc::StatusCode::INTERNAL,
435 grpc::string("failed to query values: cannot create iterator"));
438 grpc::Status status = grpc::Status::OK;
440 while (uc_iterator_next(iter, &name) == 0) {
442 if (parse_identifier_vl(name, &vl) != 0) {
443 status = grpc::Status(grpc::StatusCode::INTERNAL,
444 grpc::string("failed to parse identifier"));
448 if (!ident_matches(&vl, match))
450 if (uc_iterator_get_time(iter, &vl.time) < 0) {
452 grpc::Status(grpc::StatusCode::INTERNAL,
453 grpc::string("failed to retrieve value timestamp"));
456 if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
458 grpc::Status(grpc::StatusCode::INTERNAL,
459 grpc::string("failed to retrieve value interval"));
462 if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
463 status = grpc::Status(grpc::StatusCode::INTERNAL,
464 grpc::string("failed to retrieve values"));
467 if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
468 status = grpc::Status(grpc::StatusCode::INTERNAL,
469 grpc::string("failed to retrieve value metadata"));
472 value_lists->push(vl);
473 } // while (uc_iterator_next(iter, &name) == 0)
475 uc_iterator_destroy(iter);
479 grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
480 grpc::ServerWriter<QueryValuesResponse> *writer,
481 std::queue<value_list_t> *value_lists) {
482 while (!value_lists->empty()) {
483 auto vl = value_lists->front();
484 QueryValuesResponse res;
487 auto status = marshal_value_list(&vl, res.mutable_value_list());
492 if (!writer->Write(res)) {
493 return grpc::Status::CANCELLED;
500 return grpc::Status::OK;
505 * gRPC server implementation
507 class CollectdServer final {
510 auto auth = grpc::InsecureServerCredentials();
512 grpc::ServerBuilder builder;
514 if (listeners.empty()) {
515 builder.AddListeningPort(default_addr, auth);
516 INFO("grpc: Listening on %s", default_addr.c_str());
518 for (auto l : listeners) {
519 grpc::string addr = l.addr + ":" + l.port;
521 auto use_ssl = grpc::string("");
523 if (l.ssl != nullptr) {
524 use_ssl = grpc::string(" (SSL enabled)");
525 a = grpc::SslServerCredentials(*l.ssl);
528 builder.AddListeningPort(addr, a);
529 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
533 builder.RegisterService(&collectd_service_);
535 server_ = builder.BuildAndStart();
538 void Shutdown() { server_->Shutdown(); } /* Shutdown() */
541 CollectdImpl collectd_service_;
543 std::unique_ptr<grpc::Server> server_;
544 }; /* class CollectdServer */
546 class CollectdClient final {
548 CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
549 : stub_(Collectd::NewStub(channel)) {}
551 int PutValues(value_list_t const *vl) {
552 grpc::ClientContext ctx;
554 PutValuesRequest req;
555 auto status = marshal_value_list(vl, req.mutable_value_list());
557 ERROR("grpc: Marshalling value_list_t failed.");
561 PutValuesResponse res;
562 auto stream = stub_->PutValues(&ctx, &res);
563 if (!stream->Write(req)) {
564 NOTICE("grpc: Broken stream.");
565 /* intentionally not returning. */
568 stream->WritesDone();
569 status = stream->Finish();
571 ERROR("grpc: Error while closing stream.");
576 } /* int PutValues */
579 std::unique_ptr<Collectd::Stub> stub_;
582 static CollectdServer *server = nullptr;
585 * collectd plugin interface
588 static void c_grpc_destroy_write_callback(void *ptr) {
589 delete (CollectdClient *)ptr;
592 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
593 value_list_t const *vl, user_data_t *ud) {
594 CollectdClient *c = (CollectdClient *)ud->data;
595 return c->PutValues(vl);
598 static int c_grpc_config_listen(oconfig_item_t *ci) {
599 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
600 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
601 ERROR("grpc: The `%s` config option needs exactly "
602 "two string argument (address and port).",
607 auto listener = Listener();
608 listener.addr = grpc::string(ci->values[0].value.string);
609 listener.port = grpc::string(ci->values[1].value.string);
610 listener.ssl = nullptr;
612 auto ssl_opts = new (grpc::SslServerCredentialsOptions);
613 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
614 bool use_ssl = false;
616 for (int i = 0; i < ci->children_num; i++) {
617 oconfig_item_t *child = ci->children + i;
619 if (!strcasecmp("EnableSSL", child->key)) {
620 if (cf_util_get_boolean(child, &use_ssl)) {
621 ERROR("grpc: Option `%s` expects a boolean value", child->key);
624 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
626 if (cf_util_get_string(child, &certs)) {
627 ERROR("grpc: Option `%s` expects a string value", child->key);
630 ssl_opts->pem_root_certs = read_file(certs);
631 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
633 if (cf_util_get_string(child, &key)) {
634 ERROR("grpc: Option `%s` expects a string value", child->key);
637 pkcp.private_key = read_file(key);
638 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
640 if (cf_util_get_string(child, &cert)) {
641 ERROR("grpc: Option `%s` expects a string value", child->key);
644 pkcp.cert_chain = read_file(cert);
646 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
651 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
653 listener.ssl = ssl_opts;
657 listeners.push_back(listener);
659 } /* c_grpc_config_listen() */
661 static int c_grpc_config_server(oconfig_item_t *ci) {
662 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
663 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
664 ERROR("grpc: The `%s` config option needs exactly "
665 "two string argument (address and port).",
670 grpc::SslCredentialsOptions ssl_opts;
671 bool use_ssl = false;
673 for (int i = 0; i < ci->children_num; i++) {
674 oconfig_item_t *child = ci->children + i;
676 if (!strcasecmp("EnableSSL", child->key)) {
677 if (cf_util_get_boolean(child, &use_ssl)) {
680 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
682 if (cf_util_get_string(child, &certs)) {
685 ssl_opts.pem_root_certs = read_file(certs);
686 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
688 if (cf_util_get_string(child, &key)) {
691 ssl_opts.pem_private_key = read_file(key);
692 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
694 if (cf_util_get_string(child, &cert)) {
697 ssl_opts.pem_cert_chain = read_file(cert);
699 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
704 auto node = grpc::string(ci->values[0].value.string);
705 auto service = grpc::string(ci->values[1].value.string);
706 auto addr = node + ":" + service;
708 CollectdClient *client;
710 auto channel_creds = grpc::SslCredentials(ssl_opts);
711 auto channel = grpc::CreateChannel(addr, channel_creds);
712 client = new CollectdClient(channel);
715 grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
716 client = new CollectdClient(channel);
719 auto callback_name = grpc::string("grpc/") + addr;
721 .data = client, .free_func = c_grpc_destroy_write_callback,
724 plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
726 } /* c_grpc_config_server() */
728 static int c_grpc_config(oconfig_item_t *ci) {
731 for (i = 0; i < ci->children_num; i++) {
732 oconfig_item_t *child = ci->children + i;
734 if (!strcasecmp("Listen", child->key)) {
735 if (c_grpc_config_listen(child))
737 } else if (!strcasecmp("Server", child->key)) {
738 if (c_grpc_config_server(child))
743 WARNING("grpc: Option `%s` not allowed here.", child->key);
748 } /* c_grpc_config() */
750 static int c_grpc_init(void) {
751 server = new CollectdServer();
753 ERROR("grpc: Failed to create server");
759 } /* c_grpc_init() */
761 static int c_grpc_shutdown(void) {
771 } /* c_grpc_shutdown() */
773 void module_register(void) {
774 plugin_register_complex_config("grpc", c_grpc_config);
775 plugin_register_init("grpc", c_grpc_init);
776 plugin_register_shutdown("grpc", c_grpc_shutdown);
777 } /* module_register() */