2 * collectd - src/grpc.cc
3 * Copyright (C) 2015-2016 Sebastian Harl
4 * Copyright (C) 2016 Florian octo Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastian Harl <sh at tokkee.org>
26 * Florian octo Forster <octo at collectd.org>
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
37 #include "collectd.grpc.pb.h"
47 #include "daemon/utils_cache.h"
50 using collectd::Collectd;
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
57 using google::protobuf::util::TimeUtil;
67 grpc::SslServerCredentialsOptions *ssl;
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
77 if (fnmatch(matcher->host, vl->host, 0))
80 if (fnmatch(matcher->plugin, vl->plugin, 0))
82 if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
85 if (fnmatch(matcher->type, vl->type, 0))
87 if (fnmatch(matcher->type_instance, vl->type_instance, 0))
93 static grpc::string read_file(const char *filename) {
95 grpc::string s, content;
99 ERROR("grpc: Failed to open '%s'", filename);
103 while (std::getline(f, s)) {
105 content.push_back('\n');
115 static void marshal_ident(const value_list_t *vl,
116 collectd::types::Identifier *msg) {
117 msg->set_host(vl->host);
118 msg->set_plugin(vl->plugin);
119 if (vl->plugin_instance[0] != '\0')
120 msg->set_plugin_instance(vl->plugin_instance);
121 msg->set_type(vl->type);
122 if (vl->type_instance[0] != '\0')
123 msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
127 value_list_t *vl, bool require_fields) {
131 if (!s.length() && require_fields)
132 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
133 grpc::string("missing host name"));
134 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137 if (!s.length() && require_fields)
138 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
139 grpc::string("missing plugin name"));
140 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143 if (!s.length() && require_fields)
144 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
145 grpc::string("missing type name"));
146 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
148 s = msg.plugin_instance();
149 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
151 s = msg.type_instance();
152 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
154 return grpc::Status::OK;
155 } /* unmarshal_ident() */
157 static grpc::Status marshal_value_list(const value_list_t *vl,
158 collectd::types::ValueList *msg) {
159 auto id = msg->mutable_identifier();
160 marshal_ident(vl, id);
162 auto ds = plugin_get_ds(vl->type);
163 if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
164 return grpc::Status(grpc::StatusCode::INTERNAL,
165 grpc::string("failed to retrieve data-set for values"));
168 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
169 auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
170 msg->set_allocated_time(new google::protobuf::Timestamp(t));
171 msg->set_allocated_interval(new google::protobuf::Duration(d));
173 for (size_t i = 0; i < vl->values_len; ++i) {
174 auto v = msg->add_values();
175 switch (ds->ds[i].type) {
176 case DS_TYPE_COUNTER:
177 v->set_counter(vl->values[i].counter);
180 v->set_gauge(vl->values[i].gauge);
183 v->set_derive(vl->values[i].derive);
185 case DS_TYPE_ABSOLUTE:
186 v->set_absolute(vl->values[i].absolute);
189 return grpc::Status(grpc::StatusCode::INTERNAL,
190 grpc::string("unknown value type"));
193 auto name = msg->add_ds_names();
194 name->assign(ds->ds[i].name);
197 return grpc::Status::OK;
198 } /* marshal_value_list */
200 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
202 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
204 NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
206 auto status = unmarshal_ident(msg.identifier(), vl, true);
210 value_t *values = NULL;
211 size_t values_len = 0;
213 status = grpc::Status::OK;
214 for (auto v : msg.values()) {
216 (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
218 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
219 grpc::string("failed to allocate values array"));
224 val = values + values_len;
227 switch (v.value_case()) {
228 case collectd::types::Value::ValueCase::kCounter:
229 val->counter = counter_t(v.counter());
231 case collectd::types::Value::ValueCase::kGauge:
232 val->gauge = gauge_t(v.gauge());
234 case collectd::types::Value::ValueCase::kDerive:
235 val->derive = derive_t(v.derive());
237 case collectd::types::Value::ValueCase::kAbsolute:
238 val->absolute = absolute_t(v.absolute());
241 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
242 grpc::string("unknown value type"));
251 vl->values_len = values_len;
257 } /* unmarshal_value_list() */
262 class CollectdImpl : public collectd::Collectd::Service {
265 QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
266 grpc::ServerWriter<QueryValuesResponse> *writer) override {
268 auto status = unmarshal_ident(req->identifier(), &match, false);
273 std::queue<value_list_t> value_lists;
274 status = this->queryValuesRead(&match, &value_lists);
276 status = this->queryValuesWrite(ctx, writer, &value_lists);
279 while (!value_lists.empty()) {
280 auto vl = value_lists.front();
288 grpc::Status PutValues(grpc::ServerContext *ctx,
289 grpc::ServerReader<PutValuesRequest> *reader,
290 PutValuesResponse *res) override {
291 PutValuesRequest req;
293 while (reader->Read(&req)) {
294 value_list_t vl = {0};
295 auto status = unmarshal_value_list(req.value_list(), &vl);
299 if (plugin_dispatch_values(&vl))
301 grpc::StatusCode::INTERNAL,
302 grpc::string("failed to enqueue values for writing"));
306 return grpc::Status::OK;
310 grpc::Status queryValuesRead(value_list_t const *match,
311 std::queue<value_list_t> *value_lists) {
313 if ((iter = uc_get_iterator()) == NULL) {
315 grpc::StatusCode::INTERNAL,
316 grpc::string("failed to query values: cannot create iterator"));
319 grpc::Status status = grpc::Status::OK;
321 while (uc_iterator_next(iter, &name) == 0) {
323 if (parse_identifier_vl(name, &vl) != 0) {
324 status = grpc::Status(grpc::StatusCode::INTERNAL,
325 grpc::string("failed to parse identifier"));
329 if (!ident_matches(&vl, match))
332 if (uc_iterator_get_time(iter, &vl.time) < 0) {
334 grpc::Status(grpc::StatusCode::INTERNAL,
335 grpc::string("failed to retrieve value timestamp"));
338 if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
340 grpc::Status(grpc::StatusCode::INTERNAL,
341 grpc::string("failed to retrieve value interval"));
344 if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
345 status = grpc::Status(grpc::StatusCode::INTERNAL,
346 grpc::string("failed to retrieve values"));
350 value_lists->push(vl);
351 } // while (uc_iterator_next(iter, &name) == 0)
353 uc_iterator_destroy(iter);
357 grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
358 grpc::ServerWriter<QueryValuesResponse> *writer,
359 std::queue<value_list_t> *value_lists) {
360 while (!value_lists->empty()) {
361 auto vl = value_lists->front();
362 QueryValuesResponse res;
365 auto status = marshal_value_list(&vl, res.mutable_value_list());
370 if (!writer->Write(res)) {
371 return grpc::Status::CANCELLED;
378 return grpc::Status::OK;
383 * gRPC server implementation
385 class CollectdServer final {
388 auto auth = grpc::InsecureServerCredentials();
390 grpc::ServerBuilder builder;
392 if (listeners.empty()) {
393 builder.AddListeningPort(default_addr, auth);
394 INFO("grpc: Listening on %s", default_addr.c_str());
396 for (auto l : listeners) {
397 grpc::string addr = l.addr + ":" + l.port;
399 auto use_ssl = grpc::string("");
401 if (l.ssl != nullptr) {
402 use_ssl = grpc::string(" (SSL enabled)");
403 a = grpc::SslServerCredentials(*l.ssl);
406 builder.AddListeningPort(addr, a);
407 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
411 builder.RegisterService(&collectd_service_);
413 server_ = builder.BuildAndStart();
416 void Shutdown() { server_->Shutdown(); } /* Shutdown() */
419 CollectdImpl collectd_service_;
421 std::unique_ptr<grpc::Server> server_;
422 }; /* class CollectdServer */
424 class CollectdClient final {
426 CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
427 : stub_(Collectd::NewStub(channel)) {}
429 int PutValues(value_list_t const *vl) {
430 grpc::ClientContext ctx;
432 PutValuesRequest req;
433 auto status = marshal_value_list(vl, req.mutable_value_list());
435 ERROR("grpc: Marshalling value_list_t failed.");
439 PutValuesResponse res;
440 auto stream = stub_->PutValues(&ctx, &res);
441 if (!stream->Write(req)) {
442 NOTICE("grpc: Broken stream.");
443 /* intentionally not returning. */
446 stream->WritesDone();
447 status = stream->Finish();
449 ERROR("grpc: Error while closing stream.");
454 } /* int PutValues */
457 std::unique_ptr<Collectd::Stub> stub_;
460 static CollectdServer *server = nullptr;
463 * collectd plugin interface
466 static void c_grpc_destroy_write_callback(void *ptr) {
467 delete (CollectdClient *)ptr;
470 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
471 value_list_t const *vl, user_data_t *ud) {
472 CollectdClient *c = (CollectdClient *)ud->data;
473 return c->PutValues(vl);
476 static int c_grpc_config_listen(oconfig_item_t *ci) {
477 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
478 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
479 ERROR("grpc: The `%s` config option needs exactly "
480 "two string argument (address and port).",
485 auto listener = Listener();
486 listener.addr = grpc::string(ci->values[0].value.string);
487 listener.port = grpc::string(ci->values[1].value.string);
488 listener.ssl = nullptr;
490 auto ssl_opts = new (grpc::SslServerCredentialsOptions);
491 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
492 bool use_ssl = false;
494 for (int i = 0; i < ci->children_num; i++) {
495 oconfig_item_t *child = ci->children + i;
497 if (!strcasecmp("EnableSSL", child->key)) {
498 if (cf_util_get_boolean(child, &use_ssl)) {
499 ERROR("grpc: Option `%s` expects a boolean value", child->key);
502 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
504 if (cf_util_get_string(child, &certs)) {
505 ERROR("grpc: Option `%s` expects a string value", child->key);
508 ssl_opts->pem_root_certs = read_file(certs);
509 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
511 if (cf_util_get_string(child, &key)) {
512 ERROR("grpc: Option `%s` expects a string value", child->key);
515 pkcp.private_key = read_file(key);
516 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
518 if (cf_util_get_string(child, &cert)) {
519 ERROR("grpc: Option `%s` expects a string value", child->key);
522 pkcp.cert_chain = read_file(cert);
524 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
529 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
531 listener.ssl = ssl_opts;
535 listeners.push_back(listener);
537 } /* c_grpc_config_listen() */
539 static int c_grpc_config_server(oconfig_item_t *ci) {
540 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
541 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
542 ERROR("grpc: The `%s` config option needs exactly "
543 "two string argument (address and port).",
548 grpc::SslCredentialsOptions ssl_opts;
549 bool use_ssl = false;
551 for (int i = 0; i < ci->children_num; i++) {
552 oconfig_item_t *child = ci->children + i;
554 if (!strcasecmp("EnableSSL", child->key)) {
555 if (cf_util_get_boolean(child, &use_ssl)) {
558 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
560 if (cf_util_get_string(child, &certs)) {
563 ssl_opts.pem_root_certs = read_file(certs);
564 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
566 if (cf_util_get_string(child, &key)) {
569 ssl_opts.pem_private_key = read_file(key);
570 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
572 if (cf_util_get_string(child, &cert)) {
575 ssl_opts.pem_cert_chain = read_file(cert);
577 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
582 auto node = grpc::string(ci->values[0].value.string);
583 auto service = grpc::string(ci->values[1].value.string);
584 auto addr = node + ":" + service;
586 CollectdClient *client;
588 auto channel_creds = grpc::SslCredentials(ssl_opts);
589 auto channel = grpc::CreateChannel(addr, channel_creds);
590 client = new CollectdClient(channel);
593 grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
594 client = new CollectdClient(channel);
597 auto callback_name = grpc::string("grpc/") + addr;
599 .data = client, .free_func = c_grpc_destroy_write_callback,
602 plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
604 } /* c_grpc_config_server() */
606 static int c_grpc_config(oconfig_item_t *ci) {
609 for (i = 0; i < ci->children_num; i++) {
610 oconfig_item_t *child = ci->children + i;
612 if (!strcasecmp("Listen", child->key)) {
613 if (c_grpc_config_listen(child))
615 } else if (!strcasecmp("Server", child->key)) {
616 if (c_grpc_config_server(child))
621 WARNING("grpc: Option `%s` not allowed here.", child->key);
626 } /* c_grpc_config() */
628 static int c_grpc_init(void) {
629 server = new CollectdServer();
631 ERROR("grpc: Failed to create server");
637 } /* c_grpc_init() */
639 static int c_grpc_shutdown(void) {
649 } /* c_grpc_shutdown() */
651 void module_register(void) {
652 plugin_register_complex_config("grpc", c_grpc_config);
653 plugin_register_init("grpc", c_grpc_init);
654 plugin_register_shutdown("grpc", c_grpc_shutdown);
655 } /* module_register() */