}
using collectd::Collectd;
-using collectd::Dispatch;
using collectd::DispatchValuesRequest;
using collectd::DispatchValuesResponse;
}
std::queue<value_list_t> value_lists;
- err = this->read(&match, &value_lists);
+ err = this->queryValuesRead(&match, &value_lists);
if (err.ok()) {
- err = this->write(ctx, writer, &value_lists);
+ err = this->queryValuesWrite(ctx, writer, &value_lists);
}
while (!value_lists.empty()) {
return err;
}
+ grpc::Status DispatchValues(grpc::ServerContext *ctx,
+ grpc::ServerReader<DispatchValuesRequest> *reader,
+ DispatchValuesResponse *res) override {
+ DispatchValuesRequest req;
+
+ while (reader->Read(&req)) {
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(req.value_list(), &vl);
+ if (!status.ok())
+ return status;
+
+ if (plugin_dispatch_values(&vl))
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
+ }
+
+ res->Clear();
+ return grpc::Status::OK;
+ }
+
private:
- grpc::Status read(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+ grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
uc_iter_t *iter;
if ((iter = uc_get_iterator()) == NULL) {
return grpc::Status(grpc::StatusCode::INTERNAL,
return status;
}
- grpc::Status write(grpc::ServerContext *ctx,
+ grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
grpc::ServerWriter<QueryValuesResponse> *writer,
std::queue<value_list_t> *value_lists) {
while (!value_lists->empty()) {
};
/*
- * Dispatch service
- */
-class DispatchImpl : public collectd::Dispatch::Service {
-public:
- grpc::Status DispatchValues(grpc::ServerContext *ctx,
- grpc::ServerReader<DispatchValuesRequest> *reader,
- DispatchValuesResponse *res) override {
- DispatchValuesRequest req;
-
- while (reader->Read(&req)) {
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(req.value_list(), &vl);
- if (!status.ok())
- return status;
-
- if (plugin_dispatch_values(&vl))
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
- }
-
- res->Clear();
- return grpc::Status::OK;
- }
-};
-
-/*
* gRPC server implementation
*/
class CollectdServer final
}
builder.RegisterService(&collectd_service_);
- builder.RegisterService(&dispatch_service_);
server_ = builder.BuildAndStart();
} /* Start() */
private:
CollectdImpl collectd_service_;
- DispatchImpl dispatch_service_;
std::unique_ptr<grpc::Server> server_;
}; /* class CollectdServer */