#include "common.h"
#include "configfile.h"
#include "plugin.h"
+
+ typedef struct {
+ char *addr;
+ char *port;
+ } listener_t;
+
+ static listener_t *listeners;
+ static size_t listeners_num;
}
using google::protobuf::util::TimeUtil;
void Start()
{
// TODO: make configurable
- std::string addr("0.0.0.0:50051");
-
- // TODO: make configurable
auto auth = grpc::InsecureServerCredentials();
grpc::ServerBuilder builder;
- builder.AddListeningPort(addr, auth);
+
+ if (!listeners_num) {
+ std::string default_addr("0.0.0.0:50051");
+ builder.AddListeningPort(default_addr, auth);
+ INFO("grpc: Listening on %s", default_addr.c_str());
+ }
+ else {
+ size_t i;
+ for (i = 0; i < listeners_num; i++) {
+ auto l = listeners[i];
+ std::string addr(l.addr);
+ addr += std::string(":") + std::string(l.port);
+ builder.AddListeningPort(addr, auth);
+ INFO("grpc: Listening on %s", addr.c_str());
+ }
+ }
+
builder.RegisterAsyncService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
-
- INFO("grpc: Listening on %s", addr.c_str());
} /* Start() */
void Shutdown()
extern "C" {
static pthread_t *workers;
- static size_t workers_num;
+ static size_t workers_num = 5;
static void *worker_thread(void *arg)
{
return NULL;
} /* worker_thread() */
+ static int c_grpc_config_listen(oconfig_item_t *ci)
+ {
+ listener_t *listener;
+ int i;
+
+ if ((ci->values_num != 2)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)
+ || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+ ERROR("grpc: The `%s` config option needs exactly "
+ "two string argument (address and port).", ci->key);
+ return -1;
+ }
+
+ listener = (listener_t *)realloc(listeners,
+ (listeners_num + 1) * sizeof(*listeners));
+ if (!listener) {
+ ERROR("grpc: Failed to allocate listeners");
+ return -1;
+ }
+ listeners = listener;
+ listener = listeners + listeners_num;
+ listeners_num++;
+
+ listener->addr = strdup(ci->values[0].value.string);
+ listener->port = strdup(ci->values[1].value.string);
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
+
+ return 0;
+ } /* c_grpc_config_listen() */
+
+ static int c_grpc_config(oconfig_item_t *ci)
+ {
+ int i;
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (!strcasecmp("Listen", child->key)) {
+ if (c_grpc_config_listen(child))
+ return -1;
+ }
+ else if (!strcasecmp("WorkerThreads", child->key)) {
+ int n;
+ if (cf_util_get_int(child, &n))
+ return -1;
+ workers_num = (size_t)n;
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed here.", child->key);
+ }
+ }
+
+ return 0;
+ } /* c_grpc_config() */
+
static int c_grpc_init(void)
{
server = new CollectdServer();
return -1;
}
- workers = (pthread_t *)calloc(5, sizeof(*workers));
+ workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
if (! workers) {
delete server;
server = nullptr;
ERROR("grpc: Failed to allocate worker threads");
return -1;
}
- workers_num = 5;
server->Start();
for (i = 0; i < workers_num; i++) {
void module_register(void)
{
+ plugin_register_complex_config("grpc", c_grpc_config);
plugin_register_init("grpc", c_grpc_init);
plugin_register_shutdown("grpc", c_grpc_shutdown);
} /* module_register() */