From ac69a17e48a7c8c6e499de74031d6d78ec586d23 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Tue, 2 Aug 2016 08:14:24 +0200 Subject: [PATCH] grpc plugin: Turn QueryValues into a server-side streaming RPC. --- proto/collectd.proto | 4 +- src/grpc.cc | 176 +++++++++++++++++++++++++++++---------------------- 2 files changed, 102 insertions(+), 78 deletions(-) diff --git a/proto/collectd.proto b/proto/collectd.proto index 9ea73b2e..24aa52b7 100644 --- a/proto/collectd.proto +++ b/proto/collectd.proto @@ -30,7 +30,7 @@ import "types.proto"; service Collectd { // Query a list of values available from collectd's value cache. - rpc QueryValues(QueryValuesRequest) returns (QueryValuesResponse); + rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse); } service Dispatch { @@ -57,5 +57,5 @@ message QueryValuesRequest { // The response from QueryValues. message QueryValuesResponse { - repeated collectd.types.ValueList value_lists = 1; + collectd.types.ValueList value_list = 1; } diff --git a/src/grpc.cc b/src/grpc.cc index 9c4f2589..aeb1c6d0 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -29,6 +29,7 @@ #include #include +#include #include #include "collectd.grpc.pb.h" @@ -275,61 +276,6 @@ static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesReques return status; } /* grpc::Status DispatchValue */ -static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesResponse *res) -{ - uc_iter_t *iter; - char *name = NULL; - - value_list_t matcher; - auto status = unmarshal_ident(req.identifier(), &matcher, false); - if (!status.ok()) - return status; - - if ((iter = uc_get_iterator()) == NULL) { - return grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to query values: cannot create iterator")); - } - - status = grpc::Status::OK; - while (uc_iterator_next(iter, &name) == 0) { - value_list_t vl; - if (parse_identifier_vl(name, &vl) != 0) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to parse identifier")); - break; - } - - if (!ident_matches(&vl, &matcher)) - continue; - - if (uc_iterator_get_time(iter, &vl.time) < 0) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to retrieve value timestamp")); - break; - } - if (uc_iterator_get_interval(iter, &vl.interval) < 0) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to retrieve value interval")); - break; - } - if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to retrieve values")); - break; - } - - auto pb_vl = res->add_value_lists(); - status = marshal_value_list(&vl, pb_vl); - free(vl.values); - if (!status.ok()) - break; - } - - uc_iterator_destroy(iter); - - return status; -} /* grpc::Status QueryValues */ - // CallData is the abstract base class for asynchronous calls. class CallData { public: @@ -347,48 +293,126 @@ private: /* * Collectd service */ + // QueryValuesCallData holds the state and implements the logic for QueryValues calls. class QueryValuesCallData : public CallData { public: QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq) : cq_(cq), service_(service), writer_(&context_) { - // As part of the initialization, we *request* that the system start - // processing QueryValues requests. In this request, "this" acts as - // the tag uniquely identifying the request (so that different - // QueryValuesCallData instances can serve different requests - // concurrently), in this case the memory address of this - // QueryValuesCallData instance. - service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this); + process(true); } void process(bool ok) final { - if (done_) { + if (status_ == Status::INIT) { + status_ = Status::READ; + service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this); + } else if (status_ == Status::READ) { + auto err = queryValues(); + if (!err.ok()) { + writer_.Finish(err, this); + status_ = Status::DONE; + return; + } + respond(); + } else if (status_ == Status::WRITE) { + respond(); + } else if (status_ == Status::DONE) { + new QueryValuesCallData(service_, cq_); delete this; } else { - // Spawn a new QueryValuesCallData instance to serve new clients - // while we process the one for this QueryValuesCallData. The - // instance will deallocate itself as part of its FINISH state. - new QueryValuesCallData(service_, cq_); + throw std::logic_error("Unhandled state enum."); + } + } + +private: + enum class Status { + INIT, + READ, + WRITE, + DONE, + }; + + grpc::Status queryValues (void) { + uc_iter_t *iter; + char *name = NULL; + + value_list_t matcher; + auto status = unmarshal_ident(request_.identifier(), &matcher, false); + if (!status.ok()) + return status; + + if ((iter = uc_get_iterator()) == NULL) { + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to query values: cannot create iterator")); + } + + status = grpc::Status::OK; + while (uc_iterator_next(iter, &name) == 0) { + value_list_t vl; + if (parse_identifier_vl(name, &vl) != 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to parse identifier")); + break; + } + + if (!ident_matches(&vl, &matcher)) + continue; - auto status = QueryValues(&context_, request_, &response_); - if (!status.ok()) { - writer_.FinishWithError(status, this); - } else { - writer_.Finish(response_, grpc::Status::OK, this); + if (uc_iterator_get_time(iter, &vl.time) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value timestamp")); + break; + } + if (uc_iterator_get_interval(iter, &vl.interval) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value interval")); + break; + } + if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve values")); + break; } - done_ = true; + value_lists_.push(vl); } + + uc_iterator_destroy(iter); + return status; } -private: - bool done_ = false; + void respond() { + if (value_lists_.empty()) { + writer_.Finish(grpc::Status::OK, this); + status_ = Status::DONE; + return; + } + + auto vl = value_lists_.front(); + + response_.Clear(); + auto err = marshal_value_list(&vl, response_.mutable_value_list()); + if (!err.ok()) { + writer_.Finish(err, this); + status_ = Status::DONE; + return; + } + + value_lists_.pop(); + free(vl.values); + + writer_.Write(response_, this); + status_ = Status::WRITE; + } + + Status status_ = Status::INIT; grpc::ServerContext context_; grpc::ServerCompletionQueue* cq_; Collectd::AsyncService* service_; QueryValuesRequest request_; + std::queue value_lists_; QueryValuesResponse response_; - grpc::ServerAsyncResponseWriter writer_; + grpc::ServerAsyncWriter writer_; }; /* -- 2.11.0