Merge pull request #1875 from rubenk/remove-configfile-h-from-plugins
authorRuben Kerkhof <ruben@rubenkerkhof.com>
Fri, 12 Aug 2016 08:58:59 +0000 (10:58 +0200)
committerGitHub <noreply@github.com>
Fri, 12 Aug 2016 08:58:59 +0000 (10:58 +0200)
Remove configfile.h from plugins

23 files changed:
.gitignore
configure.ac
proto/collectd.proto
proto/types.proto
src/ceph.c
src/chrony.c
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/common.c
src/daemon/common.h
src/daemon/utils_match.c
src/dns.c
src/exec.c
src/grpc.cc
src/iptables.c
src/md.c
src/mysql.c
src/ping.c
src/swap.c
src/turbostat.c
src/utils_db_query.c
src/utils_latency.c
src/write_sensu.c

index b85dc48..8154d73 100644 (file)
@@ -84,6 +84,9 @@ bindings/java/org/collectd/java/*.class
 #ide stuff
 .vscode
 
+# cscope stuff
+cscope.*
+
 # Unit tests
 src/daemon/test-suite.log
 src/tests/
index 1bacddd..8dda710 100644 (file)
@@ -486,6 +486,7 @@ then
 #include <linux/major.h>
 #include <linux/types.h>
 ])
+       AC_CHECK_HEADERS([sys/sysmacros.h])
 else
        have_linux_raid_md_u_h="no"
 fi
@@ -2376,7 +2377,7 @@ if test "x$with_libgrpcpp" = "xyes"
 then
   AC_LANG_PUSH(C++)
   SAVE_CPPFLAGS="$CPPFLAGS"
-  CPPFLAGS="$with_libgrpcpp_cppflags $GRPCPP_CFLAGS $CPPFLAGS -std=c++11"
+  CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS $CPPFLAGS"
   AC_CHECK_HEADERS([grpc++/grpc++.h], [],
     [with_libgrpcpp="no (<grpc++/grpc++.h> not found)"]
   )
@@ -2386,8 +2387,10 @@ fi
 if test "x$with_libgrpcpp" = "xyes"
 then
   AC_LANG_PUSH(C++)
+  SAVE_CPPFLAGS="$CPPFLAGS"
   SAVE_LDFLAGS="$LDFLAGS"
   SAVE_LIBS="$LIBS"
+  CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS $CPPFLAGS"
   LDFLAGS="$with_libgrpcpp_ldflags"
   if test "x$GRPCPP_LIBS" = "x"
   then
@@ -2409,6 +2412,7 @@ then
     ],
     [with_libgrpcpp="no (libgrpc++ not found)"]
   )
+  CPPFLAGS="$SAVE_CPPFLAGS"
   LDFLAGS="$SAVE_LDFLAGS"
   LIBS="$SAVE_LIBS"
   AC_LANG_POP(C++)
index 5134dbf..917c5de 100644 (file)
 syntax = "proto3";
 
 package collectd;
+option go_package = "collectd.org/rpc/proto";
 
 import "types.proto";
 
 service Collectd {
-       // Dispatch collected values to collectd.
-       rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
-
-       // Query a list of values available from collectd's value cache.
-       rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply);
+  // DispatchValues reads the value lists from the DispatchValuesRequest stream.
+  // The gRPC server embedded into collectd will inject them into the system
+  // just like the network plugin.
+  rpc DispatchValues(stream DispatchValuesRequest)
+      returns (DispatchValuesResponse);
+
+  // QueryValues returns a stream of matching value lists from collectd's
+  // internal cache.
+  rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse);
 }
 
 // The arguments to DispatchValues.
 message DispatchValuesRequest {
-       collectd.types.ValueList values = 1;
+  // value_list is the metric to be sent to the server.
+  collectd.types.ValueList value_list = 1;
 }
 
 // The response from DispatchValues.
-message DispatchValuesReply {
-}
+message DispatchValuesResponse {}
 
 // The arguments to QueryValues.
 message QueryValuesRequest {
-       // Query by the fields of the identifier. Only return values matching the
-       // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
-       // any value.
-       collectd.types.Identifier identifier = 1;
+  // Query by the fields of the identifier. Only return values matching the
+  // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
+  // any value.
+  collectd.types.Identifier identifier = 1;
 }
 
 // The response from QueryValues.
-message QueryValuesReply {
-       repeated collectd.types.ValueList values = 1;
-}
+message QueryValuesResponse { collectd.types.ValueList value_list = 1; }
index 4a852e4..952c541 100644 (file)
 syntax = "proto3";
 
 package collectd.types;
+option go_package = "collectd.org/rpc/proto/types";
 
 import "google/protobuf/duration.proto";
 import "google/protobuf/timestamp.proto";
 
 message Identifier {
-       string host = 1;
-       string plugin = 2;
-       string plugin_instance = 3;
-       string type = 4;
-       string type_instance = 5;
+  string host = 1;
+  string plugin = 2;
+  string plugin_instance = 3;
+  string type = 4;
+  string type_instance = 5;
 }
 
 message Value {
-       oneof value {
-               uint64 counter = 1;
-               double gauge = 2;
-               int64 derive = 3;
-               uint64 absolute = 4;
-       };
+  oneof value {
+    uint64 counter = 1;
+    double gauge = 2;
+    int64 derive = 3;
+    uint64 absolute = 4;
+  };
 }
 
 message ValueList {
-       repeated Value value = 1;
+  repeated Value values = 1;
 
-       google.protobuf.Timestamp time = 2;
-       google.protobuf.Duration interval = 3;
+  google.protobuf.Timestamp time = 2;
+  google.protobuf.Duration interval = 3;
 
-       Identifier identifier = 4;
+  Identifier identifier = 4;
+
+  repeated string ds_names = 5;
 }
index cbfdd22..5248a1a 100644 (file)
@@ -38,6 +38,9 @@
 #if HAVE_YAJL_YAJL_VERSION_H
 #include <yajl/yajl_version.h>
 #endif
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
 
 #include <limits.h>
 #include <poll.h>
@@ -1573,6 +1576,22 @@ static int ceph_read(void)
 static int ceph_init(void)
 {
     int ret;
+
+#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_DAC_OVERRIDE)
+  if (check_capability (CAP_DAC_OVERRIDE) != 0)
+  {
+    if (getuid () == 0)
+      WARNING ("ceph plugin: Running collectd as root, but the "
+          "CAP_DAC_OVERRIDE capability is missing. The plugin's read "
+          "function will probably fail. Is your init system dropping "
+          "capabilities?");
+    else
+      WARNING ("ceph plugin: collectd doesn't have the CAP_DAC_OVERRIDE "
+          "capability. If you don't want to run collectd as root, try running "
+          "\"setcap cap_dac_override=ep\" on the collectd binary.");
+  }
+#endif
+
     ceph_daemons_print();
 
     ret = cconn_main_loop(ASOK_REQ_VERSION);
index 0485036..f6294e4 100644 (file)
@@ -452,8 +452,8 @@ chrony_connect(void)
 
   if (chrony_set_timeout())
   {
-    ERROR(PLUGIN_NAME ": Error setting timeout to %lds. Errno = %d",
-          g_chrony_timeout, errno);
+    ERROR(PLUGIN_NAME ": Error setting timeout to %llds. Errno = %d",
+          (long long)g_chrony_timeout, errno);
     return CHRONY_RC_FAIL;
   }
   return CHRONY_RC_OK;
index e3f2aa3..e06465b 100644 (file)
 #</Plugin>
 
 #<Plugin grpc>
-#      WorkerThreads 5
 #      <Listen "0.0.0.0" "50051">
 #              EnableSSL true
 #              SSLRootCerts "/path/to/root.pem"
 #              User "db_user"
 #              Password "secret"
 #              Database "db_name"
+#              SSLKey "/path/to/key.pem"
+#              SSLCert "/path/to/cert.pem"
+#              SSLCA "/path/to/ca.pem"
+#              SSLCAPath "/path/to/cas/"
+#              SSLCipher "DHE-RSA-AES256-SHA"
 #              MasterStats true
 #              ConnectTimeout 10
 #              InnodbStats true
index 6ada5f1..9ae7310 100644 (file)
@@ -2751,11 +2751,6 @@ connections.
 
 =back
 
-=item B<WorkerThreads> I<Num>
-
-Number of threads to start for handling incoming connections. The default
-value is B<5>.
-
 =back
 
 =head2 Plugin C<hddtemp>
@@ -3734,6 +3729,11 @@ Synopsis:
       Port "3306"
       MasterStats true
       ConnectTimeout 10
+      SSLKey "/path/to/key.pem"
+      SSLCert "/path/to/cert.pem"
+      SSLCA "/path/to/ca.pem"
+      SSLCAPath "/path/to/cas/"
+      SSLCipher "DHE-RSA-AES256-SHA"
     </Database>
 
     <Database bar>
@@ -3755,7 +3755,8 @@ Synopsis:
 A B<Database> block defines one connection to a MySQL database. It accepts a
 single argument which specifies the name of the database. None of the other
 options are required. MySQL will use default values as documented in the
-section "mysql_real_connect()" in the B<MySQL reference manual>.
+"mysql_real_connect()" and "mysql_ssl_set()" sections in the
+B<MySQL reference manual>.
 
 =over 4
 
@@ -3830,6 +3831,26 @@ or SQL threads are not running. Defaults to B<false>.
 
 Sets the connect timeout for the MySQL client.
 
+=item B<SSLKey> I<Path>
+
+If provided, the X509 key in PEM format.
+
+=item B<SSLCert> I<Path>
+
+If provided, the X509 cert in PEM format.
+
+=item B<SSLCA> I<Path>
+
+If provided, the CA file in PEM format (check OpenSSL docs).
+
+=item B<SSLCAPath> I<Path>
+
+If provided, the CA directory (check OpenSSL docs).
+
+=item B<SSLCipher> I<String>
+
+If provided, the SSL cipher to use.
+
 =back
 
 =head2 Plugin C<netapp>
index c4dbecb..05b1199 100644 (file)
 # include <arpa/inet.h>
 #endif
 
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
+
 #ifdef HAVE_LIBKSTAT
 extern kstat_ctl_t *kc;
 #endif
@@ -1568,8 +1572,6 @@ void set_sock_opts (int sockfd) /* {{{ */
 
        socklen_t socklen = sizeof (socklen_t);
        int so_keepalive = 1;
-       int tcp_keepidle = ((CDTIME_T_TO_MS(plugin_get_interval()) - 1) / 100 + 1);
-       int tcp_keepintvl = ((CDTIME_T_TO_MS(plugin_get_interval()) - 1) / 1000 + 1);
 
        status = getsockopt (sockfd, SOL_SOCKET, SO_TYPE, &socktype, &socklen);
        if (status != 0)
@@ -1586,6 +1588,7 @@ void set_sock_opts (int sockfd) /* {{{ */
                        WARNING ("set_sock_opts: failed to set socket keepalive flag");
 
 #ifdef TCP_KEEPIDLE
+               int tcp_keepidle = ((CDTIME_T_TO_MS(plugin_get_interval()) - 1) / 100 + 1);
                status = setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPIDLE,
                                &tcp_keepidle, sizeof (tcp_keepidle));
                if (status != 0)
@@ -1593,6 +1596,7 @@ void set_sock_opts (int sockfd) /* {{{ */
 #endif
 
 #ifdef TCP_KEEPINTVL
+               int tcp_keepintvl = ((CDTIME_T_TO_MS(plugin_get_interval()) - 1) / 1000 + 1);
                status = setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPINTVL,
                                &tcp_keepintvl, sizeof (tcp_keepintvl));
                if (status != 0)
@@ -1668,3 +1672,52 @@ void strarray_free (char **array, size_t array_len) /* {{{ */
                sfree (array[i]);
        sfree (array);
 } /* }}} void strarray_free */
+
+#ifdef HAVE_SYS_CAPABILITY_H
+int check_capability (int capability) /* {{{ */
+{
+#ifdef _LINUX_CAPABILITY_VERSION_3
+       cap_user_header_t cap_header = calloc(1, sizeof (*cap_header));
+       if (cap_header == NULL)
+       {
+               ERROR("check_capability: calloc failed");
+               return (-1);
+       }
+
+       cap_user_data_t cap_data = calloc(1, sizeof (*cap_data));
+       if (cap_data == NULL)
+       {
+               ERROR("check_capability: calloc failed");
+               sfree(cap_header);
+               return (-1);
+       }
+
+       cap_header->pid = getpid();
+       cap_header->version = _LINUX_CAPABILITY_VERSION;
+       if (capget(cap_header, cap_data) < 0)
+       {
+               ERROR("check_capability: capget failed");
+               sfree(cap_header);
+               sfree(cap_data);
+               return (-1);
+       }
+
+       if ((cap_data->effective & (1 << capability)) == 0)
+       {
+               sfree(cap_header);
+               sfree(cap_data);
+               return (-1);
+       }
+       else
+       {
+               sfree(cap_header);
+               sfree(cap_data);
+               return (0);
+       }
+#else
+       WARNING ("check_capability: unsupported capability implementation. "
+           "Some plugin(s) may require elevated privileges to work properly.");
+       return (0);
+#endif /* _LINUX_CAPABILITY_VERSION_3 */
+} /* }}} int check_capability */
+#endif /* HAVE_SYS_CAPABILITY_H */
index 5ad2b50..720e5f1 100644 (file)
@@ -375,4 +375,12 @@ int strtogauge (const char *string, gauge_t *ret_value);
 int strarray_add (char ***ret_array, size_t *ret_array_len, char const *str);
 void strarray_free (char **array, size_t array_len);
 
+#ifdef HAVE_SYS_CAPABILITY_H
+/** Check if the current process benefits from the capability passed in
+ * argument. Returns zero if it does, less than zero if it doesn't or on error.
+ * See capabilities(7) for the list of possible capabilities.
+ * */
+int check_capability (int capability);
+#endif /* HAVE_SYS_CAPABILITY_H */
+
 #endif /* COMMON_H */
index 5273c90..914b6e2 100644 (file)
@@ -170,7 +170,7 @@ static int default_callback (const char __attribute__((unused)) *str,
 
     if (data->ds_type & UTILS_MATCH_CF_DERIVE_INC)
     {
-      data->value.counter++;
+      data->value.derive++;
       data->values_num++;
       return (0);
     }
index e23a2bb..0494b4b 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
 
 #include <pcap.h>
 
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
+
 /*
  * Private data types
  */
@@ -346,6 +350,20 @@ static int dns_init (void)
 
        listen_thread_init = 1;
 
+#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_NET_RAW)
+       if (check_capability (CAP_NET_RAW) != 0)
+       {
+               if (getuid () == 0)
+                       WARNING ("dns plugin: Running collectd as root, but the CAP_NET_RAW "
+                                       "capability is missing. The plugin's read function will probably "
+                                       "fail. Is your init system dropping capabilities?");
+               else
+                       WARNING ("dns plugin: collectd doesn't have the CAP_NET_RAW capability. "
+                                       "If you don't want to run collectd as root, try running \"setcap "
+                                       "cap_net_raw=ep\" on the collectd binary.");
+       }
+#endif
+
        return (0);
 } /* int dns_init */
 
index e90f83c..dfd4b05 100644 (file)
 #include <grp.h>
 #include <signal.h>
 
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
+
 #define PL_NORMAL        0x01
 #define PL_NOTIF_ACTION  0x02
 
@@ -806,6 +810,22 @@ static int exec_init (void) /* {{{ */
 
   sigaction (SIGCHLD, &sa, NULL);
 
+#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_SETUID) && defined(CAP_SETGID)
+  if ((check_capability (CAP_SETUID) != 0) ||
+      (check_capability (CAP_SETGID) != 0))
+  {
+    if (getuid () == 0)
+      WARNING ("exec plugin: Running collectd as root, but the CAP_SETUID "
+          "or CAP_SETGID capabilities are missing. The plugin's read function "
+          "will probably fail. Is your init system dropping capabilities?");
+    else
+      WARNING ("exec plugin: collectd doesn't have the CAP_SETUID or "
+          "CAP_SETGID capabilities. If you don't want to run collectd as root, "
+          "try running \"setcap 'cap_setuid=ep cap_setgid=ep'\" on the "
+          "collectd binary.");
+  }
+#endif
+
   return (0);
 } /* int exec_init }}} */
 
index 06f5615..a38abc1 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <fstream>
 #include <iostream>
+#include <queue>
 #include <vector>
 
 #include "collectd.grpc.pb.h"
@@ -47,9 +48,9 @@ extern "C" {
 using collectd::Collectd;
 
 using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesReply;
+using collectd::DispatchValuesResponse;
 using collectd::QueryValuesRequest;
-using collectd::QueryValuesReply;
+using collectd::QueryValuesResponse;
 
 using google::protobuf::util::TimeUtil;
 
@@ -171,7 +172,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::
        msg->set_allocated_interval(new google::protobuf::Duration(d));
 
        for (size_t i = 0; i < vl->values_len; ++i) {
-               auto v = msg->add_value();
+               auto v = msg->add_values();
                switch (ds->ds[i].type) {
                        case DS_TYPE_COUNTER:
                                v->set_counter(vl->values[i].counter);
@@ -189,6 +190,9 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::
                                return grpc::Status(grpc::StatusCode::INTERNAL,
                                                grpc::string("unknown value type"));
                }
+
+               auto name = msg->add_ds_names();
+               name->assign(ds->ds[i].name);
        }
 
        return grpc::Status::OK;
@@ -207,7 +211,7 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
        size_t values_len = 0;
 
        status = grpc::Status::OK;
-       for (auto v : msg.value()) {
+       for (auto v : msg.values()) {
                value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
                if (!val) {
                        status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
@@ -253,168 +257,124 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call-backs and call objects
+ * Collectd service
  */
-
-static grpc::Status Process(grpc::ServerContext *ctx,
-               DispatchValuesRequest request, DispatchValuesReply *reply)
-{
-       value_list_t vl = VALUE_LIST_INIT;
-       auto status = unmarshal_value_list(request.values(), &vl);
-       if (!status.ok())
-               return status;
-
-       if (plugin_dispatch_values(&vl))
-               status = grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to enqueue values for writing"));
-       return status;
-} /* Process(): DispatchValues */
-
-static grpc::Status Process(grpc::ServerContext *ctx,
-               QueryValuesRequest request, QueryValuesReply *reply)
-{
-       uc_iter_t *iter;
-       char *name = NULL;
-
-       value_list_t matcher;
-       auto status = unmarshal_ident(request.identifier(), &matcher, false);
-       if (!status.ok())
-               return status;
-
-       if ((iter = uc_get_iterator()) == NULL) {
-               return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to query values: cannot create iterator"));
-       }
-
-       status = grpc::Status::OK;
-       while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t res;
-               if (parse_identifier_vl(name, &res) != 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to parse identifier"));
-                       break;
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+       grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+               value_list_t match;
+               auto status = unmarshal_ident(req->identifier(), &match, false);
+               if (!status.ok()) {
+                       return status;
                }
 
-               if (!ident_matches(&res, &matcher))
-                       continue;
-
-               if (uc_iterator_get_time(iter, &res.time) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value timestamp"));
-                       break;
+               std::queue<value_list_t> value_lists;
+               status = this->queryValuesRead(&match, &value_lists);
+               if (status.ok()) {
+                       status = this->queryValuesWrite(ctx, writer, &value_lists);
                }
-               if (uc_iterator_get_interval(iter, &res.interval) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value interval"));
-                       break;
-               }
-               if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-                       break;
+
+               while (!value_lists.empty()) {
+                       auto vl = value_lists.front();
+                       value_lists.pop();
+                       sfree(vl.values);
                }
 
-               auto vl = reply->add_values();
-               status = marshal_value_list(&res, vl);
-               free(res.values);
-               if (!status.ok())
-                       break;
+               return status;
        }
 
-       uc_iterator_destroy(iter);
-
-       return status;
-} /* Process(): QueryValues */
-
-class Call
-{
-public:
-       Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : service_(service), cq_(cq), status_(CREATE)
-       { }
+       grpc::Status DispatchValues(grpc::ServerContext *ctx,
+                                                               grpc::ServerReader<DispatchValuesRequest> *reader,
+                                                               DispatchValuesResponse *res) override {
+               DispatchValuesRequest req;
 
-       virtual ~Call()
-       { }
+               while (reader->Read(&req)) {
+                       value_list_t vl = VALUE_LIST_INIT;
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
 
-       void Handle()
-       {
-               if (status_ == CREATE) {
-                       Create();
-                       status_ = PROCESS;
-               }
-               else if (status_ == PROCESS) {
-                       Process();
-                       status_ = FINISH;
-               }
-               else {
-                       GPR_ASSERT(status_ == FINISH);
-                       Finish();
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
                }
-       } /* Handle() */
 
-protected:
-       virtual void Create() = 0;
-       virtual void Process() = 0;
-       virtual void Finish() = 0;
-
-       Collectd::AsyncService *service_;
-       grpc::ServerCompletionQueue *cq_;
-       grpc::ServerContext ctx_;
+               res->Clear();
+               return grpc::Status::OK;
+       }
 
 private:
-       enum CallStatus { CREATE, PROCESS, FINISH };
-       CallStatus status_;
-}; /* class Call */
+       grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+               uc_iter_t *iter;
+               if ((iter = uc_get_iterator()) == NULL) {
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                               grpc::string("failed to query values: cannot create iterator"));
+               }
 
-template<typename RequestT, typename ReplyT>
-class RpcCall final : public Call
-{
-       typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
-                       RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
-                       grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+               grpc::Status status = grpc::Status::OK;
+               char *name = NULL;
+               while (uc_iterator_next(iter, &name) == 0) {
+                       value_list_t vl;
+                       if (parse_identifier_vl(name, &vl) != 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to parse identifier"));
+                               break;
+                       }
 
-public:
-       RpcCall(Collectd::AsyncService *service,
-                       CreatorT creator, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), creator_(creator), responder_(&ctx_)
-       {
-               Handle();
-       } /* RpcCall() */
+                       if (!ident_matches(&vl, match))
+                               continue;
 
-       virtual ~RpcCall()
-       { }
+                       if (uc_iterator_get_time(iter, &vl.time) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value timestamp"));
+                               break;
+                       }
+                       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value interval"));
+                               break;
+                       }
+                       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve values"));
+                               break;
+                       }
 
-private:
-       void Create()
-       {
-               (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
-       } /* Create() */
+                       value_lists->push(vl);
+               } // while (uc_iterator_next(iter, &name) == 0)
 
-       void Process()
-       {
-               // Add a new request object to the queue.
-               new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
-               grpc::Status status = ::Process(&ctx_, request_, &reply_);
-               responder_.Finish(reply_, status, this);
-       } /* Process() */
+               uc_iterator_destroy(iter);
+               return status;
+       }
 
-       void Finish()
-       {
-               delete this;
-       } /* Finish() */
+       grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+                                          grpc::ServerWriter<QueryValuesResponse> *writer,
+                                          std::queue<value_list_t> *value_lists) {
+               while (!value_lists->empty()) {
+                       auto vl = value_lists->front();
+                       QueryValuesResponse res;
+                       res.Clear();
+
+                       auto status = marshal_value_list(&vl, res.mutable_value_list());
+                       if (!status.ok()) {
+                               return status;
+                       }
 
-       CreatorT creator_;
+                       if (!writer->Write(res)) {
+                               return grpc::Status::CANCELLED;
+                       }
 
-       RequestT request_;
-       ReplyT reply_;
+                       value_lists->pop();
+                       sfree(vl.values);
+               }
 
-       grpc::ServerAsyncResponseWriter<ReplyT> responder_;
-}; /* class RpcCall */
+               return grpc::Status::OK;
+       }
+};
 
 /*
  * gRPC server implementation
  */
-
 class CollectdServer final
 {
 public:
@@ -444,45 +404,20 @@ public:
                        }
                }
 
-               builder.RegisterService(&service_);
-               cq_ = builder.AddCompletionQueue();
+               builder.RegisterService(&collectd_service_);
+
                server_ = builder.BuildAndStart();
        } /* Start() */
 
        void Shutdown()
        {
                server_->Shutdown();
-               cq_->Shutdown();
        } /* Shutdown() */
 
-       void Mainloop()
-       {
-               // Register request types.
-               new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestDispatchValues, cq_.get());
-               new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestQueryValues, cq_.get());
-
-               while (true) {
-                       void *req = NULL;
-                       bool ok = false;
-
-                       if (!cq_->Next(&req, &ok))
-                               break; // Queue shut down.
-                       if (!ok) {
-                               ERROR("grpc: Failed to read from queue");
-                               break;
-                       }
-
-                       static_cast<Call *>(req)->Handle();
-               }
-       } /* Mainloop() */
-
 private:
-       Collectd::AsyncService service_;
+       CollectdImpl collectd_service_;
 
        std::unique_ptr<grpc::Server> server_;
-       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
 }; /* class CollectdServer */
 
 static CollectdServer *server = nullptr;
@@ -490,18 +425,7 @@ static CollectdServer *server = nullptr;
 /*
  * collectd plugin interface
  */
-
 extern "C" {
-       static pthread_t *workers;
-       static size_t workers_num = 5;
-
-       static void *worker_thread(void *arg)
-       {
-               CollectdServer *s = (CollectdServer *)arg;
-               s->Mainloop();
-               return NULL;
-       } /* worker_thread() */
-
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -585,12 +509,6 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
-                       else if (!strcasecmp("WorkerThreads", child->key)) {
-                               int n;
-                               if (cf_util_get_int(child, &n))
-                                       return -1;
-                               workers_num = (size_t)n;
-                       }
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }
@@ -602,47 +520,22 @@ extern "C" {
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
-               size_t i;
-
-               if (! server) {
+               if (!server) {
                        ERROR("grpc: Failed to create server");
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
-               if (! workers) {
-                       delete server;
-                       server = nullptr;
-
-                       ERROR("grpc: Failed to allocate worker threads");
-                       return -1;
-               }
-
                server->Start();
-               for (i = 0; i < workers_num; i++) {
-                       plugin_thread_create(&workers[i], /* attr = */ NULL,
-                                       worker_thread, server);
-               }
-               INFO("grpc: Started %zu workers", workers_num);
                return 0;
        } /* c_grpc_init() */
 
        static int c_grpc_shutdown(void)
        {
-               size_t i;
-
                if (!server)
-                       return -1;
+                       return 0;
 
                server->Shutdown();
 
-               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
-               for (i = 0; i < workers_num; i++)
-                       pthread_join(workers[i], NULL);
-               free(workers);
-               workers = NULL;
-               workers_num = 0;
-
                delete server;
                server = nullptr;
 
index 11849f2..f691122 100644 (file)
 #include <libiptc/libiptc.h>
 #include <libiptc/libip6tc.h>
 
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
+
 /*
  * iptc_handle_t was available before libiptc was officially available as a
  * shared library. Note, that when the shared lib was introduced, the API and
@@ -498,10 +502,30 @@ static int iptables_shutdown (void)
     return (0);
 } /* int iptables_shutdown */
 
+static int iptables_init (void)
+{
+#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_NET_ADMIN)
+    if (check_capability (CAP_NET_ADMIN) != 0)
+    {
+        if (getuid () == 0)
+            WARNING ("iptables plugin: Running collectd as root, but the "
+                  "CAP_NET_ADMIN capability is missing. The plugin's read "
+                  "function will probably fail. Is your init system dropping "
+                  "capabilities?");
+        else
+            WARNING ("iptables plugin: collectd doesn't have the CAP_NET_ADMIN "
+                  "capability. If you don't want to run collectd as root, try "
+                  "running \"setcap cap_net_admin=ep\" on the collectd binary.");
+    }
+#endif
+    return (0);
+} /* int iptables_init */
+
 void module_register (void)
 {
     plugin_register_config ("iptables", iptables_config,
                              config_keys, config_keys_num);
+    plugin_register_init ("iptables", iptables_init);
     plugin_register_read ("iptables", iptables_read);
     plugin_register_shutdown ("iptables", iptables_shutdown);
 } /* void module_register */
index 6a96a90..44cad2e 100644 (file)
--- a/src/md.c
+++ b/src/md.c
 #include <linux/major.h>
 #include <linux/raid/md_u.h>
 
+#ifdef HAVE_SYS_SYSMACROS_H
+#include <sys/sysmacros.h>
+#endif
+
 #define PROC_DISKSTATS "/proc/diskstats"
 #define DEV_DIR "/dev"
 
index 9167c6b..eea3df8 100644 (file)
@@ -46,6 +46,14 @@ struct mysql_database_s /* {{{ */
        char *user;
        char *pass;
        char *database;
+
+       /* mysql_ssl_set params */
+       char *key;
+       char *cert;
+       char *ca;
+       char *capath;
+       char *cipher;
+
        char *socket;
        int   port;
        int   timeout;
@@ -87,6 +95,11 @@ static void mysql_database_free (void *arg) /* {{{ */
        sfree (db->socket);
        sfree (db->instance);
        sfree (db->database);
+       sfree (db->key);
+       sfree (db->cert);
+       sfree (db->ca);
+       sfree (db->capath);
+       sfree (db->cipher);
        sfree (db);
 } /* }}} void mysql_database_free */
 
@@ -126,6 +139,12 @@ static int mysql_config_database (oconfig_item_t *ci) /* {{{ */
        db->user     = NULL;
        db->pass     = NULL;
        db->database = NULL;
+       db->key      = NULL;
+       db->cert     = NULL;
+       db->ca       = NULL;
+       db->capath   = NULL;
+       db->cipher   = NULL;
+
        db->socket   = NULL;
        db->con      = NULL;
        db->timeout  = 0;
@@ -168,6 +187,16 @@ static int mysql_config_database (oconfig_item_t *ci) /* {{{ */
                        status = cf_util_get_string (child, &db->socket);
                else if (strcasecmp ("Database", child->key) == 0)
                        status = cf_util_get_string (child, &db->database);
+               else if (strcasecmp ("SSLKey", child->key) == 0)
+                       status = cf_util_get_string (child, &db->key);
+               else if (strcasecmp ("SSLCert", child->key) == 0)
+                       status = cf_util_get_string (child, &db->cert);
+               else if (strcasecmp ("SSLCA", child->key) == 0)
+                       status = cf_util_get_string (child, &db->ca);
+               else if (strcasecmp ("SSLCAPath", child->key) == 0)
+                       status = cf_util_get_string (child, &db->capath);
+               else if (strcasecmp ("SSLCipher", child->key) == 0)
+                       status = cf_util_get_string (child, &db->cipher);
                else if (strcasecmp ("ConnectTimeout", child->key) == 0)
                        status = cf_util_get_int (child, &db->timeout);
                else if (strcasecmp ("MasterStats", child->key) == 0)
@@ -245,6 +274,8 @@ static int mysql_config (oconfig_item_t *ci) /* {{{ */
 
 static MYSQL *getconnection (mysql_database_t *db)
 {
+       const char *cipher;
+
        if (db->is_connected)
        {
                int status;
@@ -272,6 +303,8 @@ static MYSQL *getconnection (mysql_database_t *db)
        /* Configure TCP connect timeout (default: 0) */
        db->con->options.connect_timeout = db->timeout;
 
+       mysql_ssl_set (db->con, db->key, db->cert, db->ca, db->capath, db->cipher);
+
        if (mysql_real_connect (db->con, db->host, db->user, db->pass,
                                db->database, db->port, db->socket, 0) == NULL)
        {
@@ -283,10 +316,14 @@ static MYSQL *getconnection (mysql_database_t *db)
                return (NULL);
        }
 
+       cipher = mysql_get_ssl_cipher (db->con);
+
        INFO ("mysql plugin: Successfully connected to database %s "
-                       "at server %s (server version: %s, protocol version: %d)",
+                       "at server %s with cipher %s "
+                       "(server version: %s, protocol version: %d) ",
                        (db->database != NULL) ? db->database : "<none>",
                        mysql_get_host_info (db->con),
+                       (cipher != NULL) ?  cipher : "<none>",
                        mysql_get_server_info (db->con),
                        mysql_get_proto_info (db->con));
 
@@ -963,7 +1000,7 @@ static int mysql_read (user_data_t *ud)
                                counter_submit ("mysql_sort", "scan", val, db);
 
                }
-               else if (strncmp (key, "Slow_queries", strlen ("Slow_queries")) == 0) 
+               else if (strncmp (key, "Slow_queries", strlen ("Slow_queries")) == 0)
                {
                        counter_submit ("mysql_slow_queries", NULL , val, db);
                }
index 59ddbc9..5f66aab 100644 (file)
 # include <netdb.h> /* NI_MAXHOST */
 #endif
 
+#ifdef HAVE_SYS_CAPABILITY_H
+# include <sys/capability.h>
+#endif
+
 #include <oping.h>
 
 #ifndef NI_MAXHOST
@@ -447,6 +451,20 @@ static int ping_init (void) /* {{{ */
         "Will use a timeout of %gs.", ping_timeout);
   }
 
+#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_NET_RAW)
+  if (check_capability (CAP_NET_RAW) != 0)
+  {
+    if (getuid () == 0)
+      WARNING ("ping plugin: Running collectd as root, but the CAP_NET_RAW "
+          "capability is missing. The plugin's read function will probably "
+          "fail. Is your init system dropping capabilities?");
+    else
+      WARNING ("ping plugin: collectd doesn't have the CAP_NET_RAW capability. "
+          "If you don't want to run collectd as root, try running \"setcap "
+          "cap_net_raw=ep\" on the collectd binary.");
+  }
+#endif
+
   return (start_thread ());
 } /* }}} int ping_init */
 
index 15eca9a..9c63e9b 100644 (file)
@@ -674,6 +674,7 @@ static int swap_read (void) /* {{{ */
        {
                ERROR ("swap plugin: Total swap space (%g) is less than used swap space (%g).",
                                total, used);
+               sfree (swap_entries);
                return (-1);
        }
 
index 913511f..2d8a08e 100644 (file)
@@ -1474,35 +1474,22 @@ out:
 static int
 check_permissions(void)
 {
-#ifdef HAVE_SYS_CAPABILITY_H
-       struct __user_cap_header_struct cap_header_data;
-       cap_user_header_t cap_header = &cap_header_data;
-       struct __user_cap_data_struct cap_data_data;
-       cap_user_data_t cap_data = &cap_data_data;
-       int ret = 0;
-#endif /* HAVE_SYS_CAPABILITY_H */
 
        if (getuid() == 0) {
                /* We have everything we need */
                return 0;
-#ifndef HAVE_SYS_CAPABILITY_H
+#if !defined(HAVE_SYS_CAPABILITY_H) && !defined(CAP_SYS_RAWIO)
        } else {
                ERROR("turbostat plugin: Initialization failed: this plugin "
                      "requires collectd to run as root");
                return -1;
        }
-#else /* HAVE_SYS_CAPABILITY_H */
+#else /* HAVE_SYS_CAPABILITY_H && CAP_SYS_RAWIO */
        }
 
-       /* check for CAP_SYS_RAWIO */
-       cap_header->pid = getpid();
-       cap_header->version = _LINUX_CAPABILITY_VERSION;
-       if (capget(cap_header, cap_data) < 0) {
-               ERROR("turbostat plugin: capget failed");
-               return -1;
-       }
+       int ret = 0;
 
-       if ((cap_data->effective & (1 << CAP_SYS_RAWIO)) == 0) {
+       if (check_capability(CAP_SYS_RAWIO) != 0) {
                WARNING("turbostat plugin: Collectd doesn't have the "
                        "CAP_SYS_RAWIO capability. If you don't want to run "
                        "collectd as root, try running \"setcap "
@@ -1524,7 +1511,7 @@ check_permissions(void)
                      "collectd a special capability (CAP_SYS_RAWIO) and read "
                       "access to /dev/cpu/*/msr (see previous warnings)");
        return ret;
-#endif /* HAVE_SYS_CAPABILITY_H */
+#endif /* HAVE_SYS_CAPABILITY_H && CAP_SYS_RAWIO */
 }
 
 static int
index 8a8bb10..bbee90a 100644 (file)
@@ -256,7 +256,7 @@ static int udb_result_submit (udb_result_t *r, /* {{{ */
     {
       int status = strjoin (vl.type_instance, sizeof (vl.type_instance),
           r_area->instances_buffer, r->instances_num, "-");
-      if (status != 0)
+      if (status < 0)
       {
         ERROR ("udb_result_submit: creating type_instance failed with status %d.",
             status);
@@ -269,7 +269,7 @@ static int udb_result_submit (udb_result_t *r, /* {{{ */
 
       int status = strjoin (tmp, sizeof (tmp), r_area->instances_buffer,
           r->instances_num, "-");
-      if (status != 0)
+      if (status < 0)
       {
         ERROR ("udb_result_submit: creating type_instance failed with status %d.",
             status);
index bfb9292..c67752a 100644 (file)
@@ -125,8 +125,8 @@ latency_counter_t *latency_counter_create (void) /* {{{ */
   if (lc == NULL)
     return (NULL);
 
-  latency_counter_reset (lc);
   lc->bin_width = HISTOGRAM_DEFAULT_BIN_WIDTH;
+  latency_counter_reset (lc);
   return (lc);
 } /* }}} latency_counter_t *latency_counter_create */
 
@@ -175,6 +175,28 @@ void latency_counter_reset (latency_counter_t *lc) /* {{{ */
     return;
 
   cdtime_t bin_width = lc->bin_width;
+  cdtime_t max_bin = (lc->max - 1) / lc->bin_width;
+
+/*
+  If max latency is REDUCE_THRESHOLD times less than histogram's range,
+  then cut it in half. REDUCE_THRESHOLD must be >= 2.
+  Value of 4 is selected to reduce frequent changes of bin width.
+*/
+#define REDUCE_THRESHOLD 4
+  if ((lc->num > 0) && (lc->bin_width >= HISTOGRAM_DEFAULT_BIN_WIDTH * 2)
+     && (max_bin < HISTOGRAM_NUM_BINS / REDUCE_THRESHOLD))
+  {
+    /* new bin width will be the previous power of 2 */
+    bin_width = bin_width / 2;
+
+    DEBUG("utils_latency: latency_counter_reset: max_latency = %.3f; "
+          "max_bin = %"PRIu64"; old_bin_width = %.3f; new_bin_width = %.3f;",
+        CDTIME_T_TO_DOUBLE (lc->max),
+        max_bin,
+        CDTIME_T_TO_DOUBLE (lc->bin_width),
+        CDTIME_T_TO_DOUBLE (bin_width));
+  }
+
   memset (lc, 0, sizeof (*lc));
 
   /* preserve bin width */
index 1f46242..2f1f35a 100644 (file)
@@ -531,7 +531,7 @@ static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
        in_place_replace_sensu_name_reserved(service_buffer);
 
        // finalize the buffer by setting the output and closing curly bracket
-       res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %ld\"}\n", ret_str, service_buffer, value_str, CDTIME_T_TO_TIME_T(vl->time));
+       res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n",ret_str, service_buffer, value_str, (long long)CDTIME_T_TO_TIME_T(vl->time));
        free(ret_str);
        free(value_str);
        if (res == -1) {
@@ -667,7 +667,7 @@ static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
        ret_str = temp_str;
 
        // incorporate the timestamp
-       res = my_asprintf(&temp_str, "%s, \"timestamp\": %ld", ret_str, CDTIME_T_TO_TIME_T(n->time));
+       res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str, (long long)CDTIME_T_TO_TIME_T(n->time));
        free(ret_str);
        if (res == -1) {
                ERROR("write_sensu plugin: Unable to alloc memory");