grpc plugin: Turn QueryValues into a server-side streaming RPC.
authorFlorian Forster <octo@collectd.org>
Tue, 2 Aug 2016 06:14:24 +0000 (08:14 +0200)
committerFlorian Forster <octo@collectd.org>
Wed, 10 Aug 2016 13:02:40 +0000 (15:02 +0200)
proto/collectd.proto
src/grpc.cc

index 9ea73b2..24aa52b 100644 (file)
@@ -30,7 +30,7 @@ import "types.proto";
 
 service Collectd {
        // Query a list of values available from collectd's value cache.
-       rpc QueryValues(QueryValuesRequest) returns (QueryValuesResponse);
+       rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse);
 }
 
 service Dispatch {
@@ -57,5 +57,5 @@ message QueryValuesRequest {
 
 // The response from QueryValues.
 message QueryValuesResponse {
-       repeated collectd.types.ValueList value_lists = 1;
+       collectd.types.ValueList value_list = 1;
 }
index 9c4f258..aeb1c6d 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <fstream>
 #include <iostream>
+#include <queue>
 #include <vector>
 
 #include "collectd.grpc.pb.h"
@@ -275,61 +276,6 @@ static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesReques
        return status;
 } /* grpc::Status DispatchValue */
 
-static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesResponse *res)
-{
-       uc_iter_t *iter;
-       char *name = NULL;
-
-       value_list_t matcher;
-       auto status = unmarshal_ident(req.identifier(), &matcher, false);
-       if (!status.ok())
-               return status;
-
-       if ((iter = uc_get_iterator()) == NULL) {
-               return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to query values: cannot create iterator"));
-       }
-
-       status = grpc::Status::OK;
-       while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t vl;
-               if (parse_identifier_vl(name, &vl) != 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to parse identifier"));
-                       break;
-               }
-
-               if (!ident_matches(&vl, &matcher))
-                       continue;
-
-               if (uc_iterator_get_time(iter, &vl.time) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value timestamp"));
-                       break;
-               }
-               if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value interval"));
-                       break;
-               }
-               if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-                       break;
-               }
-
-               auto pb_vl = res->add_value_lists();
-               status = marshal_value_list(&vl, pb_vl);
-               free(vl.values);
-               if (!status.ok())
-                       break;
-       }
-
-       uc_iterator_destroy(iter);
-
-       return status;
-} /* grpc::Status QueryValues */
-
 // CallData is the abstract base class for asynchronous calls.
 class CallData {
 public:
@@ -347,48 +293,126 @@ private:
 /*
  * Collectd service
  */
+
 // QueryValuesCallData holds the state and implements the logic for QueryValues calls.
 class QueryValuesCallData : public CallData {
 public:
        QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
                        : cq_(cq), service_(service), writer_(&context_) {
-               // As part of the initialization, we *request* that the system start
-               // processing QueryValues requests. In this request, "this" acts as
-               // the tag uniquely identifying the request (so that different
-               // QueryValuesCallData instances can serve different requests
-               // concurrently), in this case the memory address of this
-               // QueryValuesCallData instance.
-               service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+               process(true);
        }
 
        void process(bool ok) final {
-               if (done_) {
+               if (status_ == Status::INIT) {
+                       status_ = Status::READ;
+                       service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+               } else if (status_ == Status::READ) {
+                       auto err = queryValues();
+                       if (!err.ok()) {
+                               writer_.Finish(err, this);
+                               status_ = Status::DONE;
+                               return;
+                       }
+                       respond();
+               } else if (status_ == Status::WRITE) {
+                       respond();
+               } else if (status_ == Status::DONE) {
+                       new QueryValuesCallData(service_, cq_);
                        delete this;
                } else {
-                       // Spawn a new QueryValuesCallData instance to serve new clients
-                       // while we process the one for this QueryValuesCallData. The
-                       // instance will deallocate itself as part of its FINISH state.
-                       new QueryValuesCallData(service_, cq_);
+                       throw std::logic_error("Unhandled state enum.");
+               }
+       }
+
+private:
+       enum class Status {
+               INIT,
+               READ,
+               WRITE,
+               DONE,
+       };
+
+       grpc::Status queryValues (void) {
+               uc_iter_t *iter;
+               char *name = NULL;
+
+               value_list_t matcher;
+               auto status = unmarshal_ident(request_.identifier(), &matcher, false);
+               if (!status.ok())
+                       return status;
+
+               if ((iter = uc_get_iterator()) == NULL) {
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                               grpc::string("failed to query values: cannot create iterator"));
+               }
+
+               status = grpc::Status::OK;
+               while (uc_iterator_next(iter, &name) == 0) {
+                       value_list_t vl;
+                       if (parse_identifier_vl(name, &vl) != 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to parse identifier"));
+                               break;
+                       }
+
+                       if (!ident_matches(&vl, &matcher))
+                               continue;
 
-                       auto status = QueryValues(&context_, request_, &response_);
-                       if (!status.ok()) {
-                               writer_.FinishWithError(status, this);
-                       } else {
-                               writer_.Finish(response_, grpc::Status::OK, this);
+                       if (uc_iterator_get_time(iter, &vl.time) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value timestamp"));
+                               break;
+                       }
+                       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value interval"));
+                               break;
+                       }
+                       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve values"));
+                               break;
                        }
 
-                       done_ = true;
+                       value_lists_.push(vl);
                }
+
+               uc_iterator_destroy(iter);
+               return status;
        }
 
-private:
-       bool done_ = false;
+       void respond() {
+               if (value_lists_.empty()) {
+                       writer_.Finish(grpc::Status::OK, this);
+                       status_ = Status::DONE;
+                       return;
+               }
+
+               auto vl = value_lists_.front();
+
+               response_.Clear();
+               auto err = marshal_value_list(&vl, response_.mutable_value_list());
+               if (!err.ok()) {
+                       writer_.Finish(err, this);
+                       status_ = Status::DONE;
+                       return;
+               }
+
+               value_lists_.pop();
+               free(vl.values);
+
+               writer_.Write(response_, this);
+               status_ = Status::WRITE;
+       }
+
+       Status status_ = Status::INIT;
        grpc::ServerContext context_;
        grpc::ServerCompletionQueue* cq_;
        Collectd::AsyncService* service_;
        QueryValuesRequest request_;
+       std::queue<value_list_t> value_lists_;
        QueryValuesResponse response_;
-       grpc::ServerAsyncResponseWriter<QueryValuesResponse> writer_;
+       grpc::ServerAsyncWriter<QueryValuesResponse> writer_;
 };
 
 /*