X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fgrpc.cc;h=ca3314ecf10cf743828d076c79b4e2217968fc91;hb=d26dd5c1db5c3310c9e69b2b910c6547469c99ce;hp=7517bb720bd72c553c0a250801a5599f49917ba7;hpb=38909dcf1dcb596c5f525c6bb25cf89c046013b5;p=collectd.git diff --git a/src/grpc.cc b/src/grpc.cc index 7517bb72..ca3314ec 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -36,7 +36,6 @@ extern "C" { #include #include -#include #include "collectd.h" #include "common.h" @@ -47,6 +46,7 @@ extern "C" { } using collectd::Collectd; +using collectd::Dispatch; using collectd::DispatchValuesRequest; using collectd::DispatchValuesReply; @@ -173,7 +173,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types:: msg->set_allocated_interval(new google::protobuf::Duration(d)); for (size_t i = 0; i < vl->values_len; ++i) { - auto v = msg->add_value(); + auto v = msg->add_values(); switch (ds->ds[i].type) { case DS_TYPE_COUNTER: v->set_counter(vl->values[i].counter); @@ -191,6 +191,9 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types:: return grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("unknown value type")); } + + auto name = msg->add_ds_names(); + name->assign(ds->ds[i].name); } return grpc::Status::OK; @@ -209,7 +212,7 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, size_t values_len = 0; status = grpc::Status::OK; - for (auto v : msg.value()) { + for (auto v : msg.values()) { value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values)); if (!val) { status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, @@ -257,29 +260,28 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, /* * request call-backs and call objects */ - -static grpc::Status Process(grpc::ServerContext *ctx, - DispatchValuesRequest request, DispatchValuesReply *reply) +static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesReply *reply) { value_list_t vl = VALUE_LIST_INIT; - auto status = unmarshal_value_list(request.values(), &vl); + auto status = unmarshal_value_list(request.value_list(), &vl); if (!status.ok()) return status; if (plugin_dispatch_values(&vl)) status = grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("failed to enqueue values for writing")); + + reply->Clear(); return status; -} /* Process(): DispatchValues */ +} /* grpc::Status DispatchValue */ -static grpc::Status Process(grpc::ServerContext *ctx, - QueryValuesRequest request, QueryValuesReply *reply) +static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesReply *res) { uc_iter_t *iter; char *name = NULL; value_list_t matcher; - auto status = unmarshal_ident(request.identifier(), &matcher, false); + auto status = unmarshal_ident(req.identifier(), &matcher, false); if (!status.ok()) return status; @@ -288,126 +290,162 @@ static grpc::Status Process(grpc::ServerContext *ctx, grpc::string("failed to query values: cannot create iterator")); } + status = grpc::Status::OK; while (uc_iterator_next(iter, &name) == 0) { - value_list_t res; - if (parse_identifier_vl(name, &res) != 0) - return grpc::Status(grpc::StatusCode::INTERNAL, + value_list_t vl; + if (parse_identifier_vl(name, &vl) != 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("failed to parse identifier")); + break; + } - if (!ident_matches(&res, &matcher)) + if (!ident_matches(&vl, &matcher)) continue; - if (uc_iterator_get_time(iter, &res.time) < 0) - return grpc::Status(grpc::StatusCode::INTERNAL, + if (uc_iterator_get_time(iter, &vl.time) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("failed to retrieve value timestamp")); - if (uc_iterator_get_interval(iter, &res.interval) < 0) - return grpc::Status(grpc::StatusCode::INTERNAL, + break; + } + if (uc_iterator_get_interval(iter, &vl.interval) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("failed to retrieve value interval")); - if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) - return grpc::Status(grpc::StatusCode::INTERNAL, + break; + } + if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("failed to retrieve values")); + break; + } - auto vl = reply->add_values(); - status = marshal_value_list(&res, vl); - free(res.values); + auto pb_vl = res->add_value_lists(); + status = marshal_value_list(&vl, pb_vl); + free(vl.values); if (!status.ok()) - return status; + break; } uc_iterator_destroy(iter); - return grpc::Status::OK; -} /* Process(): QueryValues */ + return status; +} /* grpc::Status QueryValues */ -class Call -{ +// CallData is the abstract base class for asynchronous calls. +class CallData { public: - Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) - : service_(service), cq_(cq), status_(CREATE) - { } - - virtual ~Call() - { } - - void Handle() - { - if (status_ == CREATE) { - Create(); - status_ = PROCESS; - } - else if (status_ == PROCESS) { - Process(); - status_ = FINISH; - } - else { - GPR_ASSERT(status_ == FINISH); - Finish(); - } - } /* Handle() */ + virtual ~CallData() {} + virtual void process(bool ok) = 0; protected: - virtual void Create() = 0; - virtual void Process() = 0; - virtual void Finish() = 0; - - Collectd::AsyncService *service_; - grpc::ServerCompletionQueue *cq_; - grpc::ServerContext ctx_; + CallData() {} private: - enum CallStatus { CREATE, PROCESS, FINISH }; - CallStatus status_; -}; /* class Call */ - -template -class RpcCall final : public Call -{ - typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *, - RequestT *, grpc::ServerAsyncResponseWriter *, - grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *); + CallData(const CallData&) = delete; + CallData& operator=(const CallData&) = delete; +}; +/* + * Collectd service + */ +// QueryValuesCallData holds the state and implements the logic for QueryValues calls. +class QueryValuesCallData : public CallData { public: - RpcCall(Collectd::AsyncService *service, - CreatorT creator, grpc::ServerCompletionQueue *cq) - : Call(service, cq), creator_(creator), responder_(&ctx_) - { - Handle(); - } /* RpcCall() */ - - virtual ~RpcCall() - { } + QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq) + : cq_(cq), service_(service), writer_(&context_) { + // As part of the initialization, we *request* that the system start + // processing QueryValues requests. In this request, "this" acts as + // the tag uniquely identifying the request (so that different + // QueryValuesCallData instances can serve different requests + // concurrently), in this case the memory address of this + // QueryValuesCallData instance. + service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this); + } -private: - void Create() - { - (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this); - } /* Create() */ + void process(bool ok) final { + if (done_) { + delete this; + } else { + // Spawn a new QueryValuesCallData instance to serve new clients + // while we process the one for this QueryValuesCallData. The + // instance will deallocate itself as part of its FINISH state. + new QueryValuesCallData(service_, cq_); + + auto status = QueryValues(&context_, request_, &response_); + if (!status.ok()) { + writer_.FinishWithError(status, this); + } else { + writer_.Finish(response_, grpc::Status::OK, this); + } - void Process() - { - // Add a new request object to the queue. - new RpcCall(service_, creator_, cq_); - grpc::Status status = ::Process(&ctx_, request_, &reply_); - responder_.Finish(reply_, status, this); - } /* Process() */ + done_ = true; + } + } - void Finish() - { - delete this; - } /* Finish() */ +private: + bool done_ = false; + grpc::ServerContext context_; + grpc::ServerCompletionQueue* cq_; + Collectd::AsyncService* service_; + QueryValuesRequest request_; + QueryValuesReply response_; + grpc::ServerAsyncResponseWriter writer_; +}; - CreatorT creator_; +/* + * Dispatch service + */ +// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls. +class DispatchValuesCallData : public CallData { +public: + DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq) + : cq_(cq), service_(service), reader_(&context_) { + process(true); + } - RequestT request_; - ReplyT reply_; + void process(bool ok) final { + if (status == Status::INIT) { + service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this); + status = Status::CALL; + } else if (status == Status::CALL) { + reader_.Read(&request_, this); + status = Status::READ; + } else if (status == Status::READ && ok) { + (void) DispatchValue(&context_, request_, &response_); + + reader_.Read(&request_, this); + } else if (status == Status::READ) { + response_.Clear(); + + status = Status::DONE; + } else if (status == Status::DONE) { + new DispatchValuesCallData(service_, cq_); + delete this; + } else { + ERROR("grpc: DispatchValuesCallData: invalid state"); + } + } - grpc::ServerAsyncResponseWriter responder_; -}; /* class RpcCall */ +private: + enum class Status { + INIT, + CALL, + READ, + DONE, + }; + Status status = Status::INIT; + + grpc::ServerContext context_; + grpc::ServerCompletionQueue* cq_; + Dispatch::AsyncService* service_; + + DispatchValuesRequest request_; + DispatchValuesReply response_; + grpc::ServerAsyncReader reader_; +}; /* * gRPC server implementation */ - class CollectdServer final { public: @@ -437,9 +475,14 @@ public: } } - builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); + + builder.RegisterService(&collectd_service_); + builder.RegisterService(&dispatch_service_); + server_ = builder.BuildAndStart(); + new QueryValuesCallData(&collectd_service_, cq_.get()); + new DispatchValuesCallData(&dispatch_service_, cq_.get()); } /* Start() */ void Shutdown() @@ -450,29 +493,23 @@ public: void Mainloop() { - // Register request types. - new RpcCall(&service_, - &Collectd::AsyncService::RequestDispatchValues, cq_.get()); - new RpcCall(&service_, - &Collectd::AsyncService::RequestQueryValues, cq_.get()); - while (true) { - void *req = NULL; + void *tag = NULL; bool ok = false; - if (!cq_->Next(&req, &ok)) + // Block waiting to read the next event from the completion queue. + // The event is uniquely identified by its tag, which in this case + // is the memory address of a CallData instance. + if (!cq_->Next(&tag, &ok)) break; // Queue shut down. - if (!ok) { - ERROR("grpc: Failed to read from queue"); - break; - } - static_cast(req)->Handle(); + static_cast(tag)->process(ok); } } /* Mainloop() */ private: - Collectd::AsyncService service_; + Collectd::AsyncService collectd_service_; + Dispatch::AsyncService dispatch_service_; std::unique_ptr server_; std::unique_ptr cq_; @@ -483,7 +520,6 @@ static CollectdServer *server = nullptr; /* * collectd plugin interface */ - extern "C" { static pthread_t *workers; static size_t workers_num = 5;