=over 4
+=item B<Server> I<Host> I<Port>
+
+The B<Server> statement sets the address of a server to which to send metrics
+via the C<DispatchValues> function.
+
+The argument I<Host> may be a hostname, an IPv4 address, or an IPv6 address.
+
+Optionally, B<Server> may be specified as a configuration block which supports
+the following options:
+
+=over 4
+
+=item B<EnableSSL> B<false>|B<true>
+
+Whether to require SSL for outgoing connections. Default: false.
+
+=item B<SSLCACertificateFile> I<Filename>
+
+=item B<SSLCertificateFile> I<Filename>
+
+=item B<SSLCertificateKeyFile> I<Filename>
+
+Filenames specifying SSL certificate and key material to be used with SSL
+connections.
+
+=back
+
=item B<Listen> I<Host> I<Port>
The B<Listen> statement sets the network address to bind to. When multiple
std::unique_ptr<grpc::Server> server_;
}; /* class CollectdServer */
+class CollectdClient final
+{
+public:
+ CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+ }
+
+ int DispatchValues(value_list_t const *vl) {
+ grpc::ClientContext ctx;
+
+ DispatchValuesRequest req;
+ auto status = marshal_value_list(vl, req.mutable_value_list());
+ if (!status.ok()) {
+ ERROR("grpc: Marshalling value_list_t failed.");
+ return -1;
+ }
+
+ DispatchValuesResponse res;
+ auto stream = stub_->DispatchValues(&ctx, &res);
+ if (!stream->Write(req)) {
+ NOTICE("grpc: Broken stream.");
+ /* intentionally not returning. */
+ }
+
+ stream->WritesDone();
+ status = stream->Finish();
+ if (!status.ok()) {
+ ERROR ("grpc: Error while closing stream.");
+ return -1;
+ }
+
+ return 0;
+ } /* int DispatchValues */
+
+private:
+ std::unique_ptr<Collectd::Stub> stub_;
+};
+
static CollectdServer *server = nullptr;
/*
* collectd plugin interface
*/
extern "C" {
+ static void c_grpc_destroy_write_callback (void *ptr) {
+ delete (CollectdClient *) ptr;
+ }
+
+ static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+ value_list_t const *vl,
+ user_data_t *ud) {
+ CollectdClient *c = (CollectdClient *) ud->data;
+ return c->DispatchValues(vl);
+ }
+
static int c_grpc_config_listen(oconfig_item_t *ci)
{
if ((ci->values_num != 2)
return 0;
} /* c_grpc_config_listen() */
+ static int c_grpc_config_server(oconfig_item_t *ci)
+ {
+ if ((ci->values_num != 2)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)
+ || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+ ERROR("grpc: The `%s` config option needs exactly "
+ "two string argument (address and port).", ci->key);
+ return -1;
+ }
+
+ grpc::SslCredentialsOptions ssl_opts;
+ bool use_ssl = false;
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ return -1;
+ }
+ ssl_opts.pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ return -1;
+ }
+ ssl_opts.pem_private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLCertificateFile", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ return -1;
+ }
+ ssl_opts.pem_cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
+ }
+
+ auto node = grpc::string(ci->values[0].value.string);
+ auto service = grpc::string(ci->values[1].value.string);
+ auto addr = node + ":" + service;
+
+ CollectdClient *client;
+ if (use_ssl) {
+ auto channel_creds = grpc::SslCredentials(ssl_opts);
+ auto channel = grpc::CreateChannel(addr, channel_creds);
+ client = new CollectdClient(channel);
+ } else {
+ auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+ client = new CollectdClient(channel);
+ }
+
+ auto callback_name = grpc::string("grpc/") + addr;
+ user_data_t ud = {
+ .data = client,
+ .free_func = c_grpc_destroy_write_callback,
+ };
+
+ plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+ return 0;
+ } /* c_grpc_config_server() */
+
static int c_grpc_config(oconfig_item_t *ci)
{
int i;
if (c_grpc_config_listen(child))
return -1;
}
+ else if (!strcasecmp("Server", child->key)) {
+ if (c_grpc_config_server(child))
+ return -1;
+ }
+
else {
WARNING("grpc: Option `%s` not allowed here.", child->key);
}