package collectd;
import "types.proto";
+import "google/protobuf/timestamp.proto";
service Collectd {
// Dispatch collected values to collectd.
rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
+
+ // Retrieve a list of all values available in collectd's value cache.
+ rpc ListValues(ListValuesRequest) returns (ListValuesReply);
}
// The arguments to DispatchValues.
// The response from DispatchValues.
message DispatchValuesReply {
}
+
+// The arguments to ListValues.
+message ListValuesRequest {
+}
+
+// The response from ListValues.
+message ListValuesReply {
+ message Value {
+ string name = 1;
+ google.protobuf.Timestamp time = 2;
+ }
+
+ repeated Value value = 1;
+}
=head2 Plugin C<grpc>
-The I<grpc> plugin provides an RPC interface to submit values to collectd
-based on the open source gRPC framework. It exposes and end-point for
-dispatching values to the daemon.
+The I<grpc> plugin provides an RPC interface to submit values to or query
+values from collectd based on the open source gRPC framework. It exposes and
+end-point for dispatching values to the daemon.
The B<gRPC> homepage can be found at L<https://grpc.io/>.
#include "configfile.h"
#include "plugin.h"
+#include "daemon/utils_cache.h"
+
typedef struct {
char *addr;
char *port;
collectd::DispatchValuesReply reply_;
grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-};
+}; /* class DispatchValuesCall */
+
+class ListValuesCall : public Call
+{
+public:
+ ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+ : Call(service, cq), responder_(&ctx_)
+ {
+ Handle();
+ } /* ListValuesCall() */
+
+ virtual ~ListValuesCall()
+ { }
+
+private:
+ void Create()
+ {
+ service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+ } /* Create() */
+
+ void Process()
+ {
+ new ListValuesCall(service_, cq_);
+
+ char **names = NULL;
+ cdtime_t *times = NULL;
+ size_t i, n = 0;
+
+ auto status = grpc::Status::OK;
+ if (uc_get_names(&names, ×, &n)) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ }
+
+ for (i = 0; i < n; i++) {
+ auto v = reply_.add_value();
+ auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
+ v->set_name(names[i]);
+ v->set_allocated_time(new google::protobuf::Timestamp(t));
+ sfree(names[i]);
+ }
+ sfree(names);
+ sfree(times);
+
+ responder_.Finish(reply_, status, this);
+ } /* Process() */
+
+ void Finish()
+ {
+ delete this;
+ } /* Finish() */
+
+ collectd::ListValuesRequest request_;
+ collectd::ListValuesReply reply_;
+
+ grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
+}; /* class ListValuesCall */
/*
* gRPC server implementation
{
// Register request types.
new DispatchValuesCall(&service_, cq_.get());
+ new ListValuesCall(&service_, cq_.get());
while (true) {
void *req = NULL;