Merge pull request #2796 from elfiesmelfie/feat_ipmi_SEL_ignore_list
authorPavel Rochnyak <pavel2000@ngs.ru>
Wed, 30 May 2018 12:59:32 +0000 (19:59 +0700)
committerGitHub <noreply@github.com>
Wed, 30 May 2018 12:59:32 +0000 (19:59 +0700)
ipmi plugin: Add SELSensor and SELIgnoreSelected config options.

33 files changed:
AUTHORS
Makefile.am
README
configure.ac
contrib/redhat/collectd.spec
src/amqp1.c [new file with mode: 0644]
src/barometer.c
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/plugin_mock.c
src/dbi.c
src/dpdkevents.c
src/intel_pmu.c
src/intel_rdt.c
src/ipmi.c
src/modbus.c
src/netlink.c
src/oracle.c
src/processes.c
src/redis.c
src/snmp.c
src/statsd.c
src/table.c
src/turbostat.c
src/utils_config_cores.c [new file with mode: 0644]
src/utils_config_cores.h [new file with mode: 0644]
src/utils_config_cores_test.c [new file with mode: 0644]
src/utils_db_query.c
src/utils_deq.h [new file with mode: 0644]
src/utils_ovs.c
src/valgrind.suppress [new file with mode: 0644]
src/virt.c
src/virt_test.c

diff --git a/AUTHORS b/AUTHORS
index 4df743c..409655a 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -59,6 +59,9 @@ Andreas Henriksson <andreas at fatal.se>
 Andy Parkins <andyp at fussylogic.co.uk>
  - battery plugin: sysfs code.
 
+Andy Smith <ansmith at redhat.com>
+ - AMQP 1.0 plugin.
+
 Anthony Dewhurst <dewhurst at gmail.com>
  - zfs_arc plugin.
 
@@ -286,7 +289,7 @@ Scott Sanders <scott at jssjr.com>
  - Write-Graphite plugin.
 
 Sebastien Pahl <sebastien.pahl at dotcloud.com>
- - AMQP plugin.
+ - AMQP 0.9 plugin.
 
 Serhiy Pshyk <serhiyx.pshyk at intel.com>
  - intel_pmu plugin
index b944b41..c345904 100644 (file)
@@ -61,6 +61,7 @@ EXTRA_DIST = \
        src/types.db \
        src/types.db.pod \
        src/valgrind.FreeBSD.suppress \
+       src/valgrind.suppress \
        testwrapper.sh \
        version-gen.sh
 
@@ -141,7 +142,8 @@ check_PROGRAMS = \
        test_utils_subst \
        test_utils_time \
        test_utils_vl_lookup \
-       test_libcollectd_network_parse
+       test_libcollectd_network_parse \
+       test_utils_config_cores
 
 
 TESTS = $(check_PROGRAMS)
@@ -326,6 +328,11 @@ test_utils_subst_SOURCES = \
        src/daemon/utils_subst.h
 test_utils_subst_LDADD = libplugin_mock.la
 
+test_utils_config_cores_SOURCES = \
+       src/utils_config_cores_test.c \
+       src/testing.h
+test_utils_config_cores_LDADD = libplugin_mock.la
+
 libavltree_la_SOURCES = \
        src/daemon/utils_avltree.c \
        src/daemon/utils_avltree.h
@@ -543,6 +550,20 @@ amqp_la_LIBADD = \
        libformat_json.la
 endif
 
+if BUILD_PLUGIN_AMQP1
+pkglib_LTLIBRARIES += amqp1.la
+amqp1_la_SOURCES = \
+       src/amqp1.c \
+       src/utils_deq.h
+amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS)
+amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS)
+amqp1_la_LIBADD = \
+       $(BUILD_WITH_LIBQPIDPROTON_LIBS) \
+       libcmds.la \
+       libformat_graphite.la \
+       libformat_json.la
+endif
+
 if BUILD_PLUGIN_APACHE
 pkglib_LTLIBRARIES += apache.la
 apache_la_SOURCES = src/apache.c
@@ -593,7 +614,7 @@ if BUILD_PLUGIN_BAROMETER
 pkglib_LTLIBRARIES += barometer.la
 barometer_la_SOURCES = src/barometer.c
 barometer_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-barometer_la_LIBADD = -lm
+barometer_la_LIBADD = -lm $(BUILD_WITH_LIBI2C_LIBS)
 endif
 
 if BUILD_PLUGIN_BATTERY
@@ -914,7 +935,10 @@ endif
 
 if BUILD_PLUGIN_INTEL_PMU
 pkglib_LTLIBRARIES += intel_pmu.la
-intel_pmu_la_SOURCES = src/intel_pmu.c
+intel_pmu_la_SOURCES = \
+       src/intel_pmu.c \
+       src/utils_config_cores.h \
+       src/utils_config_cores.c
 intel_pmu_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBJEVENTS_CPPFLAGS)
 intel_pmu_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBJEVENTS_LDFLAGS)
 intel_pmu_la_LIBADD = $(BUILD_WITH_LIBJEVENTS_LIBS)
@@ -922,7 +946,10 @@ endif
 
 if BUILD_PLUGIN_INTEL_RDT
 pkglib_LTLIBRARIES += intel_rdt.la
-intel_rdt_la_SOURCES = src/intel_rdt.c
+intel_rdt_la_SOURCES = \
+       src/intel_rdt.c \
+       src/utils_config_cores.h \
+       src/utils_config_cores.c
 intel_rdt_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBPQOS_CPPFLAGS)
 intel_rdt_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBPQOS_LDFLAGS)
 intel_rdt_la_LIBADD = $(BUILD_WITH_LIBPQOS_LIBS)
@@ -1759,18 +1786,15 @@ virt_la_CFLAGS = $(AM_CFLAGS) \
 virt_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 virt_la_LIBADD = libignorelist.la $(BUILD_WITH_LIBVIRT_LIBS) $(BUILD_WITH_LIBXML2_LIBS)
 
-# TODO: enable once we support only modern libvirts which depends on libnl-3
-# the libvirt on wheezy is linked in libnl v1, and there is a small leak here,
-# triggered by the library initialization. There are no means to avoid it,
-# and libvirt switched to libnl3 anyway
-#test_plugin_virt_SOURCES = src/virt_test.c
-#test_plugin_virt_CPPFLAGS = $(AM_CPPFLAGS) \
-#      $(BUILD_WITH_LIBVIRT_CFLAGS) $(BUILD_WITH_LIBXML2_CFLAGS)
-#test_plugin_virt_LDFLAGS = $(PLUGIN_LDFLAGS)
-#test_plugin_virt_LDADD = libplugin_mock.la \
-#      $(BUILD_WITH_LIBVIRT_LIBS) $(BUILD_WITH_LIBXML2_LIBS)
-#check_PROGRAMS += test_plugin_virt
-#TESTS += test_plugin_virt
+test_plugin_virt_SOURCES = src/virt_test.c
+test_plugin_virt_CPPFLAGS = $(AM_CPPFLAGS) \
+       $(BUILD_WITH_LIBVIRT_CPPFLAGS) $(BUILD_WITH_LIBXML2_CFLAGS)
+test_plugin_virt_LDFLAGS = $(PLUGIN_LDFLAGS) \
+       $(BUILD_WITH_LIBVIRT_LDFLAGS) $(BUILD_WITH_LIBXML2_LDFLAGS)
+test_plugin_virt_LDADD = libplugin_mock.la \
+       $(BUILD_WITH_LIBVIRT_LIBS) $(BUILD_WITH_LIBXML2_LIBS)
+check_PROGRAMS += test_plugin_virt
+TESTS += test_plugin_virt
 endif
 
 if BUILD_PLUGIN_VMEM
diff --git a/README b/README
index 43df03b..2210b2b 100644 (file)
--- a/README
+++ b/README
@@ -465,7 +465,11 @@ Features
 
     - amqp
       Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
-      server, such as RabbitMQ.
+      0.9.1 server, such as RabbitMQ.
+
+    - amqp1
+      Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
+      1.0 server, such as Qpid Dispatch Router or Apache Artemis Broker.
 
     - csv
       Write to comma separated values (CSV) files. This needs lots of
@@ -908,8 +912,14 @@ Prerequisites
     are supported.
     <http://www.python.org/>
 
+  * libqpid-proton (optional)
+    Used by the `amqp1' plugin for AMQP 1.0 connections, for example to
+    Qdrouterd.
+    <http://qpid.apache.org/>
+
   * librabbitmq (optional; also called “rabbitmq-c”)
-    Used by the `amqp' plugin for AMQP connections, for example to RabbitMQ.
+    Used by the `amqp' plugin for AMQP 0.9.1 connections, for example to
+    RabbitMQ.
     <http://hg.rabbitmq.com/rabbitmq-c/>
 
   * librdkafka (optional; also called “rdkafka”)
index 8ef9fc8..1e31e21 100644 (file)
@@ -1875,14 +1875,23 @@ fi
 
 # libi2c-dev
 if test "x$ac_system" = "xLinux"; then
+  with_libi2c_libs=""
+  AC_CHECK_HEADERS([i2c/smbus.h],
+    [with_libi2c_libs="-li2c"]
+  )
   AC_CHECK_DECL([i2c_smbus_read_i2c_block_data],
     [with_libi2c="yes"],
     [with_libi2c="no (symbol i2c_smbus_read_i2c_block_data not found - have you installed libi2c-dev ?)"],
     [[
       #include <stdlib.h>
       #include <linux/i2c-dev.h>
+      #if HAVE_I2C_SMBUS_H
+      # include <i2c/smbus.h>
+      #endif
     ]]
   )
+  BUILD_WITH_LIBI2C_LIBS="$with_libi2c_libs"
+  AC_SUBST([BUILD_WITH_LIBI2C_LIBS])
 else
   with_libi2c="no (Linux only)"
 fi
@@ -4710,6 +4719,56 @@ if test "$with_libpython" != "xno"; then
 fi
 # }}} --with-libpython
 
+# --with-libqpid_proton {{{
+AC_ARG_WITH([libqpid_proton],
+  [AS_HELP_STRING([--with-libqpid_proton@<:@=PREFIX@:>@], [Path to libqpid_proton.])],
+  [
+    if test "x$withval" != "xno" && test "x$withval" != "xyes"; then
+      with_libqpid_proton_cppflags="-I$withval/include"
+      with_libqpid_proton_ldflags="-L$withval/lib"
+      with_libqpid_proton="yes"
+    else
+      with_libqpid_proton="$withval"
+    fi
+  ],
+  [with_libqpid_proton="yes"]
+)
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  SAVE_CPPFLAGS="$CPPFLAGS"
+  CPPFLAGS="$CPPFLAGS $with_libqpid_proton_cppflags"
+
+  AC_CHECK_HEADERS([proton/proactor.h],
+    [with_libqpid_proton="yes"],
+    [with_libqpid_proton="no (proton/proactor.h not found)"]
+  )
+
+  CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  SAVE_LDFLAGS="$LDFLAGS"
+  LDFLAGS="$LDFLAGS $with_libqpid_proton_ldflags"
+
+  AC_CHECK_LIB([qpid-proton], [pn_connection],
+    [with_libqpid_proton="yes"],
+    [with_libqpid_proton="no (Symbol 'pn_connection' not found)"])
+
+  LDFLAGS="$SAVE_LDFLAGS"
+fi
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  BUILD_WITH_LIBQPIDPROTON_CPPFLAGS="$with_libqpid_proton_cppflags"
+  BUILD_WITH_LIBQPIDPROTON_LDFLAGS="$with_libqpid_proton_ldflags"
+  BUILD_WITH_LIBQPIDPROTON_LIBS="-lqpid-proton"
+fi
+
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS)
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LDFLAGS)
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LIBS)
+
+# }}}
+
 # --with-librabbitmq {{{
 AC_ARG_WITH([librabbitmq],
   [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
@@ -6535,6 +6594,7 @@ m4_divert_once([HELP_ENABLE], [])
 
 AC_PLUGIN([aggregation],         [yes],                     [Aggregation plugin])
 AC_PLUGIN([amqp],                [$with_librabbitmq],       [AMQP output plugin])
+AC_PLUGIN([amqp1],               [$with_libqpid_proton],    [AMQP 1.0 output plugin])
 AC_PLUGIN([apache],              [$with_libcurl],           [Apache httpd statistics])
 AC_PLUGIN([apcups],              [yes],                     [Statistics of UPSes by APC])
 AC_PLUGIN([apple_sensors],       [$with_libiokit],          [Apple hardware sensors])
@@ -6923,6 +6983,7 @@ AC_MSG_RESULT([    libpqos . . . . . . . $with_libpqos])
 AC_MSG_RESULT([    libprotobuf . . . . . $with_libprotobuf])
 AC_MSG_RESULT([    libprotobuf-c . . . . $with_libprotobuf_c])
 AC_MSG_RESULT([    libpython . . . . . . $with_libpython])
+AC_MSG_RESULT([    libqpid-proton .  . . $with_libqpid_proton])
 AC_MSG_RESULT([    librabbitmq . . . . . $with_librabbitmq])
 AC_MSG_RESULT([    libriemann-client . . $with_libriemann_client])
 AC_MSG_RESULT([    librdkafka  . . . . . $with_librdkafka])
@@ -6954,6 +7015,7 @@ AC_MSG_RESULT()
 AC_MSG_RESULT([  Modules:])
 AC_MSG_RESULT([    aggregation . . . . . $enable_aggregation])
 AC_MSG_RESULT([    amqp    . . . . . . . $enable_amqp])
+AC_MSG_RESULT([    amqp1   . . . . . . . $enable_amqp1])
 AC_MSG_RESULT([    apache  . . . . . . . $enable_apache])
 AC_MSG_RESULT([    apcups  . . . . . . . $enable_apcups])
 AC_MSG_RESULT([    apple_sensors . . . . $enable_apple_sensors])
index 752b024..6f86b7e 100644 (file)
@@ -44,6 +44,7 @@
 # plugins enabled by default
 %define with_aggregation 0%{!?_without_aggregation:1}
 %define with_amqp 0%{!?_without_amqp:1}
+%define with_amqp1 0%{!?_without_amqp1:1}
 %define with_apache 0%{!?_without_apache:1}
 %define with_apcups 0%{!?_without_apcups:1}
 %define with_ascent 0%{!?_without_ascent:1}
@@ -280,13 +281,24 @@ every 10 seconds by default.
 
 %if %{with_amqp}
 %package amqp
-Summary:       AMQP plugin for collectd
+Summary:       AMQP 0.9 plugin for collectd
 Group:         System Environment/Daemons
 Requires:      %{name}%{?_isa} = %{version}-%{release}
 BuildRequires: librabbitmq-devel
 %description amqp
-The AMQP plugin transmits or receives values collected by collectd via the
-Advanced Message Queuing Protocol (AMQP).
+The AMQP 0.9 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v0.9 (AMQP).
+%endif
+
+%if %{with_amqp1}
+%package amqp1
+Summary:       AMQP 1.0 plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: qpid-proton-c-devel
+%description amqp1
+The AMQP 1.0 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v1.0 (AMQP1).
 %endif
 
 %if %{with_apache}
@@ -1018,6 +1030,12 @@ Collectd utilities
 %define _with_amqp --disable-amqp
 %endif
 
+%if %{with_amqp1}
+%define _with_amqp1 --enable-amqp1
+%else
+%define _with_amqp1 --disable-amqp1
+%endif
+
 %if %{with_apache}
 %define _with_apache --enable-apache
 %else
@@ -1898,6 +1916,7 @@ Collectd utilities
        --enable-target_v5upgrade \
        %{?_with_aggregation} \
        %{?_with_amqp} \
+       %{?_with_amqp1} \
        %{?_with_apache} \
        %{?_with_apcups} \
        %{?_with_apple_sensors} \
@@ -2409,6 +2428,11 @@ fi
 %{_libdir}/%{name}/amqp.so
 %endif
 
+%if %{with_amqp1}
+%files amqp1
+%{_libdir}/%{name}/amqp1.so
+%endif
+
 %if %{with_apache}
 %files apache
 %{_libdir}/%{name}/apache.so
diff --git a/src/amqp1.c b/src/amqp1.c
new file mode 100644 (file)
index 0000000..a6a2d35
--- /dev/null
@@ -0,0 +1,701 @@
+/**
+ * collectd - src/amqp1.c
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Andy Smith <ansmith@redhat.com>
+ */
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_deq.h"
+#include "utils_format_graphite.h"
+#include "utils_format_json.h"
+#include "utils_random.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <errno.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define BUFSIZE 8192
+#define AMQP1_FORMAT_JSON 0
+#define AMQP1_FORMAT_COMMAND 1
+#define AMQP1_FORMAT_GRAPHITE 2
+
+typedef struct amqp1_config_transport_s {
+  DEQ_LINKS(struct amqp1_config_transport_s);
+  char *name;
+  char *host;
+  char *port;
+  char *user;
+  char *password;
+  char *address;
+  int retry_delay;
+} amqp1_config_transport_t;
+
+typedef struct amqp1_config_instance_s {
+  DEQ_LINKS(struct amqp1_config_instance_s);
+  char *name;
+  bool notify;
+  uint8_t format;
+  unsigned int graphite_flags;
+  bool store_rates;
+  char *prefix;
+  char *postfix;
+  char escape_char;
+  bool pre_settle;
+  char send_to[1024];
+} amqp1_config_instance_t;
+
+DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
+
+typedef struct cd_message_s {
+  DEQ_LINKS(struct cd_message_s);
+  pn_rwbytes_t mbuf;
+  amqp1_config_instance_t *instance;
+} cd_message_t;
+
+DEQ_DECLARE(cd_message_t, cd_message_list_t);
+
+/*
+ * Globals
+ */
+static pn_connection_t *conn;
+static pn_link_t *sender;
+static pn_proactor_t *proactor;
+static pthread_mutex_t send_lock;
+static cd_message_list_t out_messages;
+static uint64_t cd_tag = 1;
+static uint64_t acknowledged;
+static amqp1_config_transport_t *transport;
+static bool stopping;
+static bool event_thread_running;
+static pthread_t event_thread_id;
+
+/*
+ * Functions
+ */
+static void cd_message_free(cd_message_t *cdm) {
+  free(cdm->mbuf.start);
+  free(cdm);
+} /* }}} void cd_message_free */
+
+static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
+{
+  uint64_t dtag;
+  cd_message_list_t to_send;
+  cd_message_t *cdm;
+  int link_credit = pn_link_credit(link);
+  int event_count = 0;
+  pn_delivery_t *dlv;
+
+  if (stopping) {
+    return 0;
+  }
+
+  DEQ_INIT(to_send);
+
+  pthread_mutex_lock(&send_lock);
+
+  if (link_credit > 0) {
+    dtag = cd_tag;
+    cdm = DEQ_HEAD(out_messages);
+    while (cdm) {
+      DEQ_REMOVE_HEAD(out_messages);
+      DEQ_INSERT_TAIL(to_send, cdm);
+      if (DEQ_SIZE(to_send) == (size_t)link_credit)
+        break;
+      cdm = DEQ_HEAD(out_messages);
+    }
+    cd_tag += DEQ_SIZE(to_send);
+  }
+
+  pthread_mutex_unlock(&send_lock);
+
+  /* message is already formatted and encoded */
+  cdm = DEQ_HEAD(to_send);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(to_send);
+    dtag++;
+    dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
+    pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
+    pn_link_advance(link);
+    if (cdm->instance->pre_settle == true) {
+      pn_delivery_settle(dlv);
+    }
+    event_count++;
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(to_send);
+  }
+
+  return event_count;
+} /* }}} int amqp1_send_out_messages */
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
+{
+  if (pn_condition_is_set(cond)) {
+    ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
+          pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    conn = NULL;
+  }
+} /* }}} void check_condition */
+
+static bool handle(pn_event_t *event) /* {{{ */
+{
+
+  switch (pn_event_type(event)) {
+
+  case PN_CONNECTION_INIT: {
+    conn = pn_event_connection(event);
+    pn_connection_set_container(conn, transport->name);
+    pn_connection_open(conn);
+    pn_session_t *ssn = pn_session(conn);
+    pn_session_open(ssn);
+    sender = pn_sender(ssn, "cd-sender");
+    pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
+    pn_link_open(sender);
+    break;
+  }
+
+  case PN_LINK_FLOW: {
+    /* peer has given us credit, send outbound messages */
+    amqp1_send_out_messages(sender);
+    break;
+  }
+
+  case PN_DELIVERY: {
+    /* acknowledgement from peer that a message was delivered */
+    pn_delivery_t *dlv = pn_event_delivery(event);
+    if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
+      pn_delivery_settle(dlv);
+      acknowledged++;
+    }
+    break;
+  }
+
+  case PN_CONNECTION_WAKE: {
+    if (!stopping) {
+      amqp1_send_out_messages(sender);
+    }
+    break;
+  }
+
+  case PN_TRANSPORT_CLOSED: {
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+  }
+
+  case PN_CONNECTION_REMOTE_CLOSE: {
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_SESSION_REMOTE_CLOSE: {
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_LINK_REMOTE_CLOSE:
+  case PN_LINK_REMOTE_DETACH: {
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_PROACTOR_INACTIVE: {
+    return false;
+  }
+
+  default:
+    break;
+  }
+  return true;
+} /* }}} bool handle */
+
+static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
+{
+  char addr[PN_MAX_ADDR];
+  cd_message_t *cdm;
+
+  /* setup proactor */
+  proactor = pn_proactor();
+  pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
+
+  while (!stopping) {
+    /* make connection */
+    conn = pn_connection();
+    if (transport->user != NULL) {
+      pn_connection_set_user(conn, transport->user);
+      pn_connection_set_password(conn, transport->password);
+    }
+    pn_proactor_connect(proactor, conn, addr);
+
+    bool engine_running = true;
+    while (engine_running && !stopping) {
+      pn_event_batch_t *events = pn_proactor_wait(proactor);
+      pn_event_t *e;
+      while ((e = pn_event_batch_next(events))) {
+        engine_running = handle(e);
+        if (!engine_running) {
+          break;
+        }
+      }
+      pn_proactor_done(proactor, events);
+    }
+
+    pn_proactor_release_connection(conn);
+
+    DEBUG("amqp1 plugin: retrying connection");
+    int delay = transport->retry_delay;
+    while (delay-- > 0 && !stopping) {
+      sleep(1.0);
+    }
+  }
+
+  pn_proactor_disconnect(proactor, NULL);
+
+  /* Free the remaining out_messages */
+  cdm = DEQ_HEAD(out_messages);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(out_messages);
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(out_messages);
+  }
+
+  event_thread_running = false;
+
+  return NULL;
+} /* }}} void event_thread */
+
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
+{
+  /* encode message */
+  pn_message_t *message = pn_message();
+  pn_message_set_address(message, instance->send_to);
+  pn_data_t *body = pn_message_body(message);
+  pn_data_clear(body);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
+  pn_data_exit(body);
+
+  /* put_binary copies and stores so ok to use mbuf */
+  cdm->mbuf.size = BUFSIZE;
+
+  int status;
+  while ((status = pn_message_encode(message, cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    cd_message_free(cdm);
+    return -1;
+  }
+
+  pthread_mutex_lock(&send_lock);
+  DEQ_INSERT_TAIL(out_messages, cdm);
+  pthread_mutex_unlock(&send_lock);
+
+  pn_message_free(message);
+
+  /* activate the sender */
+  if (conn) {
+    pn_connection_wake(conn);
+  }
+
+  return 0;
+} /* }}} int encqueue */
+
+static int amqp1_notify(notification_t const *n,
+                        user_data_t *user_data) /* {{{ */
+{
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
+  size_t bufsize = BUFSIZE;
+
+  if (n == NULL || user_data == NULL)
+    return EINVAL;
+
+  amqp1_config_instance_t *instance = user_data->data;
+
+  if (instance->notify != true) {
+    ERROR("amqp1 plugin: write notification failed");
+  }
+
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  DEQ_ITEM_INIT(cdm);
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  cdm->instance = instance;
+
+  switch (instance->format) {
+  case AMQP1_FORMAT_JSON:
+    format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+    int status = format_json_notification(cdm->mbuf.start, bufsize, n);
+    if (status != 0) {
+      ERROR("amqp1 plugin: formatting notification failed");
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  default:
+    ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+    return -1;
+  }
+
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
+
+} /* }}} int amqp1_notify */
+
+static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+                       user_data_t *user_data) {
+  int status = 0;
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
+  size_t bufsize = BUFSIZE;
+
+  if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
+    return EINVAL;
+
+  amqp1_config_instance_t *instance = user_data->data;
+
+  if (instance->notify != false) {
+    ERROR("amqp1 plugin: write failed");
+  }
+
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  DEQ_ITEM_INIT(cdm);
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  cdm->instance = instance;
+
+  switch (instance->format) {
+  case AMQP1_FORMAT_COMMAND:
+    status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
+    if (status != 0) {
+      ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  case AMQP1_FORMAT_JSON:
+    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
+    format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
+                           instance->store_rates);
+    format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  case AMQP1_FORMAT_GRAPHITE:
+    status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
+                             instance->prefix, instance->postfix,
+                             instance->escape_char, instance->graphite_flags);
+    if (status != 0) {
+      ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  default:
+    ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+    return -1;
+  }
+
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
+
+} /* }}} int amqp1_write */
+
+static void amqp1_config_transport_free(void *ptr) /* {{{ */
+{
+  amqp1_config_transport_t *transport = ptr;
+
+  if (transport == NULL)
+    return;
+
+  sfree(transport->name);
+  sfree(transport->host);
+  sfree(transport->user);
+  sfree(transport->password);
+  sfree(transport->address);
+
+  sfree(transport);
+} /* }}} void amqp1_config_transport_free */
+
+static void amqp1_config_instance_free(void *ptr) /* {{{ */
+{
+  amqp1_config_instance_t *instance = ptr;
+
+  if (instance == NULL)
+    return;
+
+  sfree(instance->name);
+  sfree(instance->prefix);
+  sfree(instance->postfix);
+
+  sfree(instance);
+} /* }}} void amqp1_config_instance_free */
+
+static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
+{
+  amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
+  if (instance == NULL) {
+    ERROR("amqp1 plugin: calloc failed.");
+    return ENOMEM;
+  }
+
+  int status = cf_util_get_string(ci, &instance->name);
+  if (status != 0) {
+    sfree(instance);
+    return status;
+  }
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("PreSettle", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->pre_settle);
+    else if (strcasecmp("Notify", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->notify);
+    else if (strcasecmp("Format", child->key) == 0) {
+      char *key = NULL;
+      status = cf_util_get_string(child, &key);
+      if (status != 0)
+        return status;
+      assert(key != NULL);
+      if (strcasecmp(key, "Command") == 0) {
+        instance->format = AMQP1_FORMAT_COMMAND;
+      } else if (strcasecmp(key, "Graphite") == 0) {
+        instance->format = AMQP1_FORMAT_GRAPHITE;
+      } else if (strcasecmp(key, "JSON") == 0) {
+        instance->format = AMQP1_FORMAT_JSON;
+      } else {
+        WARNING("amqp1 plugin: Invalid format string: %s", key);
+      }
+      sfree(key);
+    } else if (strcasecmp("StoreRates", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->store_rates);
+    else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_SEPARATE_INSTANCES);
+    else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_ALWAYS_APPEND_DS);
+    else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_PRESERVE_SEPARATOR);
+    else if (strcasecmp("GraphitePrefix", child->key) == 0)
+      status = cf_util_get_string(child, &instance->prefix);
+    else if (strcasecmp("GraphitePostfix", child->key) == 0)
+      status = cf_util_get_string(child, &instance->postfix);
+    else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
+      char *tmp_buff = NULL;
+      status = cf_util_get_string(child, &tmp_buff);
+      if (status == 0) {
+        if (strlen(tmp_buff) > 1)
+          WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+                  "only one character. Others will be ignored.");
+        instance->escape_char = tmp_buff[0];
+      }
+      sfree(tmp_buff);
+    } else
+      WARNING("amqp1 plugin: Ignoring unknown "
+              "instance configuration option "
+              "\%s\".",
+              child->key);
+    if (status != 0)
+      break;
+  }
+
+  if (status != 0) {
+    amqp1_config_instance_free(instance);
+    return status;
+  } else {
+    char tpname[DATA_MAX_NAME_LEN];
+    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                      transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
+    if (instance->notify) {
+      status = plugin_register_notification(
+          tpname, amqp1_notify,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
+    } else {
+      status = plugin_register_write(
+          tpname, amqp1_write,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
+    }
+
+    if (status != 0) {
+      amqp1_config_instance_free(instance);
+    }
+  }
+
+  return status;
+} /* }}} int amqp1_config_instance */
+
+static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
+{
+  transport = calloc(1, sizeof(*transport));
+  if (transport == NULL) {
+    ERROR("amqp1 plugin: calloc failed.");
+    return ENOMEM;
+  }
+
+  /* Initialize transport configuration {{{ */
+  transport->retry_delay = 1;
+
+  int status = cf_util_get_string(ci, &transport->name);
+  if (status != 0) {
+    sfree(transport);
+    return status;
+  }
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Host", child->key) == 0)
+      status = cf_util_get_string(child, &transport->host);
+    else if (strcasecmp("Port", child->key) == 0)
+      status = cf_util_get_string(child, &transport->port);
+    else if (strcasecmp("User", child->key) == 0)
+      status = cf_util_get_string(child, &transport->user);
+    else if (strcasecmp("Password", child->key) == 0)
+      status = cf_util_get_string(child, &transport->password);
+    else if (strcasecmp("Address", child->key) == 0)
+      status = cf_util_get_string(child, &transport->address);
+    else if (strcasecmp("RetryDelay", child->key) == 0)
+      status = cf_util_get_int(child, &transport->retry_delay);
+    else if (strcasecmp("Instance", child->key) == 0)
+      amqp1_config_instance(child);
+    else
+      WARNING("amqp1 plugin: Ignoring unknown "
+              "transport configuration option "
+              "\%s\".",
+              child->key);
+
+    if (status != 0)
+      break;
+  }
+
+  if (status != 0) {
+    amqp1_config_transport_free(transport);
+  }
+  return status;
+} /* }}} int amqp1_config_transport */
+
+static int amqp1_config(oconfig_item_t *ci) /* {{{ */
+{
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Transport", child->key) == 0)
+      amqp1_config_transport(child);
+    else
+      WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+              child->key);
+  }
+
+  return 0;
+} /* }}} int amqp1_config */
+
+static int amqp1_init(void) /* {{{ */
+{
+  if (transport == NULL) {
+    ERROR("amqp1: init failed, no transport configured");
+    return -1;
+  }
+
+  if (proactor == NULL) {
+    pthread_mutex_init(&send_lock, /* attr = */ NULL);
+    /* start_thread */
+    int status =
+        plugin_thread_create(&event_thread_id, NULL /* no attributes */,
+                             event_thread, NULL /* no argument */, "handle");
+    if (status != 0) {
+      ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
+    } else {
+      event_thread_running = true;
+    }
+  }
+  return 0;
+} /* }}} int amqp1_init */
+
+static int amqp1_shutdown(void) /* {{{ */
+{
+  stopping = true;
+
+  /* Stop the proactor thread */
+  if (event_thread_running) {
+    DEBUG("amqp1 plugin: Shutting down proactor thread.");
+    pn_connection_wake(conn);
+  }
+  pthread_join(event_thread_id, NULL /* no return value */);
+  memset(&event_thread_id, 0, sizeof(event_thread_id));
+
+  DEBUG("amqp1 plugin: proactor thread exited.");
+
+  if (transport) {
+    amqp1_config_transport_free(transport);
+  }
+
+  return 0;
+} /* }}} int amqp1_shutdown */
+
+void module_register(void) {
+  plugin_register_complex_config("amqp1", amqp1_config);
+  plugin_register_init("amqp1", amqp1_init);
+  plugin_register_shutdown("amqp1", amqp1_shutdown);
+} /* void module_register */
index 66ab20c..a54d998 100644 (file)
@@ -27,6 +27,9 @@
 
 #include <fcntl.h>
 #include <linux/i2c-dev.h>
+#if HAVE_I2C_SMBUS_H
+#include <i2c/smbus.h>
+#endif
 #include <math.h>
 #include <stdint.h>
 #include <sys/ioctl.h>
index da33dbf..c2aa915 100644 (file)
@@ -90,6 +90,7 @@
 
 #@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
 #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
+#@BUILD_PLUGIN_AMQP1_TRUE@LoadPlugin amqp1
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors
 #  </Publish>
 #</Plugin>
 
+#<Plugin amqp1>
+#  <Transport "name">
+#    Host "localhost"
+#    Port "5672"
+#    User "guest"
+#    Password "guest"
+#    Address "collectd"
+#    RetryDelay 1
+#    <Instance "log">
+#        Format JSON
+#        PreSettle false
+#    </Instance>
+#    <Instance "notify">
+#        Format JSON
+#        PreSettle true
+#    </Instance>
+#    <Instance "telemetry">
+#        Format JSON
+#        PreSettle false
+#    </Instance>
+#  </Transport>
+#</Plugin>
+
 #<Plugin apache>
 #  <Instance "local">
 #    URL "http://localhost/status?auto"
 #    ReportSoftwareEvents true
 #    EventList "/var/cache/pmu/GenuineIntel-6-2D-core.json"
 #    HardwareEvents "L2_RQSTS.CODE_RD_HIT,L2_RQSTS.CODE_RD_MISS" "L2_RQSTS.ALL_CODE_RD"
+#    Cores "[0-3]"
 #</Plugin>
 
 #<Plugin "intel_rdt">
 #              RegisterType float
 #              Type gauge
 #              Instance "..."
+#              #Scale 1.0
+#              #Shift 0.0
 #      </Data>
 #
 #      <Host "name">
 #      Host "redis.example.com"
 #      Port "6379"
 #      Timeout 2000
+#      <Query "LLEN myqueue">
+#        #Database 0
+#        Type "queue_length"
+#        Instance "myqueue"
+#      <Query>
 #   </Node>
 #</Plugin>
 
 #      PluginInstanceFormat name
 #      Instances 1
 #      ExtraStats "cpu_util disk disk_err domain_state fs_info job_stats_background pcpu perf vcpupin"
+#      PersistentNotification false
 #</Plugin>
 
 #<Plugin vmem>
index dda3a18..69a1b1e 100644 (file)
@@ -530,9 +530,9 @@ are disabled by default.
 =head2 Plugin C<amqp>
 
 The I<AMQP plugin> can be used to communicate with other instances of
-I<collectd> or third party applications using an AMQP message broker. Values
-are sent to or received from the broker, which handles routing, queueing and
-possibly filtering out messages.
+I<collectd> or third party applications using an AMQP 0.9.1 message broker.
+Values are sent to or received from the broker, which handles routing,
+queueing and possibly filtering out messages.
 
 B<Synopsis:>
 
@@ -738,6 +738,171 @@ is preserved, i.e. passed through.
 
 =back
 
+=head2 Plugin C<amqp1>
+
+The I<AMQP1 plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP 1.0 message
+intermediary. Metric values or notifications are sent to the
+messaging intermediary which may handle direct messaging or
+queue based transfer.
+
+B<Synopsis:>
+
+ <Plugin "amqp1">
+   # Send values to an AMQP 1.0 intermediary
+  <Transport "name">
+    Host "localhost"
+    Port "5672"
+    User "guest"
+    Password "guest"
+    Address "collectd"
+#    RetryDelay 1
+    <Instance "some_name">
+        Format "command"
+        PreSettle false
+        Notify false
+ #      StoreRates false
+ #      GraphitePrefix "collectd."
+ #      GraphiteEscapeChar "_"
+ #      GraphiteSeparateInstances false
+ #      GraphiteAlwaysAppendDS false
+ #      GraphitePreserveSeparator false
+    </Instance>
+  </Transport>
+ </Plugin>
+
+The plugin's configuration consists of a I<Transport> that configures
+communications to the AMQP 1.0 messaging bus and one or more I<Instance>
+corresponding to metric or event publishers to the messaging system.
+
+The address in the I<Transport> block concatenated with the name given in the
+I<Instance> block starting tag will be used as the send-to address for
+communications over the messaging link.
+
+The following options are accepted within each I<Transport> block:
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP 1.0 intermediary. Defaults to the
+default behavior of the underlying communications library,
+I<libqpid-proton>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP 1.0 intermediary accepts
+connections. This argument must be a string, even if the numeric form
+is used. Defaults to "5672".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP 1.0 intermediary. By
+default "guest"/"guest" is used.
+
+=item B<Address> I<Address>
+
+This option specifies the prefix for the send-to value in the message.
+By default, "collectd" will be used.
+
+=item B<RetryDelay> I<RetryDelay>
+
+When the AMQP1 connection is lost, defines the time in seconds to wait
+before attempting to reconnect. Defaults to 1, which implies attempt
+to reconnect at 1 second intervals.
+
+=back
+
+The following options are accepted within each I<Instance> block:
+
+=over 4
+
+=item B<Format> B<Command>|B<JSON>|B<Graphite>
+
+Selects the format in which messages are sent to the intermediary. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values.
+
+=item B<PreSettle> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will wait for a message
+acknowledgement from the messaging bus before sending the next
+message. This indicates transfer of ownership to the messaging
+system. If set to B<true>, the plugin will not wait for a message
+acknowledgement and the message may be dropped prior to transfer of
+ownership.
+
+=item B<Notify> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will service the
+instance write call back as a value list. If set to B<true> the
+plugin will service the instance as a write notification callback
+for alert formatting.
+
+=item B<StoreRates> B<true>|B<false>
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=item B<GraphitePrefix>
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix>
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar>
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
+=item B<GraphiteSeparateInstances> B<true>|B<false>
+
+If set to B<true>, the plugin instance and type instance will be in their own
+path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+default), the plugin and plugin instance (and likewise the type and type
+instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=item B<GraphitePreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<GraphiteEscapeChar>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
+=back
+
 =head2 Plugin C<apache>
 
 To configure the C<apache>-plugin you first need to configure the Apache
@@ -3182,6 +3347,7 @@ B<Synopsis:>
     ReportSoftwareEvents true
     EventList "/var/cache/pmu/GenuineIntel-6-2D-core.json"
     HardwareEvents "L2_RQSTS.CODE_RD_HIT,L2_RQSTS.CODE_RD_MISS" "L2_RQSTS.ALL_CODE_RD"
+    Cores "0-3" "4,6" "[12-15]"
   </Plugin>
 
 B<Options:>
@@ -3253,6 +3419,23 @@ event_download.py script to download event list for current CPU.
 This field is a list of event names or groups of comma separated event names.
 This option requires B<EventList> option to be configured.
 
+=item B<Cores> I<cores groups>
+
+All events are reported on a per core basis. Monitoring of the events can be
+configured for a group of cores (aggregated statistics). This field defines
+groups of cores on which to monitor supported events. The field is represented
+as list of strings with core group values. Each string represents a list of
+cores in a group. If a group is enclosed in square brackets each core is added
+individually to a separate group (that is statistics are not aggregated).
+Allowed formats are:
+    0,1,2,3
+    0-10,20-18
+    1,3,5-8,10,0x10-12
+    [4-15,32-63]
+
+If an empty string is provided as value for this field default cores
+configuration is applied - that is separate group is created for each core.
+
 =back
 
 =head2 Plugin C<intel_rdt>
@@ -4074,8 +4257,9 @@ which the sizes of physical memory vary.
 
 The B<modbus plugin> connects to a Modbus "slave" via Modbus/TCP or Modbus/RTU and
 reads register values. It supports reading single registers (unsigned 16E<nbsp>bit
-values), large integer values (unsigned 32E<nbsp>bit values) and floating point
-values (two registers interpreted as IEEE floats in big endian notation).
+values), large integer values (unsigned 32E<nbsp>bit and 64E<nbsp>bit values) and
+floating point values (two registers interpreted as IEEE floats in big endian
+notation).
 
 B<Synopsis:>
 
@@ -4085,6 +4269,8 @@ B<Synopsis:>
    RegisterCmd ReadHolding
    Type voltage
    Instance "input-1"
+   #Scale 1.0
+   #Shift 0.0
  </Data>
 
  <Data "voltage-input-2">
@@ -4143,7 +4329,7 @@ Configures the base register to read from the device. If the option
 B<RegisterType> has been set to B<Uint32> or B<Float>, this and the next
 register will be read (the register number is increased by one).
 
-=item B<RegisterType> B<Int16>|B<Int32>|B<Uint16>|B<Uint32>|B<Float>|B<Int32LE>|B<Uint32LE>|B<FloatLE>
+=item B<RegisterType> B<Int16>|B<Int32>|B<Int64>|B<Uint16>|B<Uint32>|B<UInt64>|B<Float>|B<Int32LE>|B<Uint32LE>|B<FloatLE>
 
 Specifies what kind of data is returned by the device. This defaults to
 B<Uint16>.  If the type is B<Int32>, B<Int32LE>, B<Uint32>, B<Uint32LE>,
@@ -4155,7 +4341,10 @@ significant 16E<nbsp>bits are in the register at B<RegisterBase+1>.
 For B<Int32LE>, B<Uint32LE>, or B<Float32LE>, the high and low order
 registers are swapped with the most significant 16E<nbsp>bits in
 the B<RegisterBase+1> and the least significant 16E<nbsp>bits in
-B<RegisterBase>.
+B<RegisterBase>. If the type is B<Int64> or B<UInt64>, four 16E<nbsp>bit
+registers at B<RegisterBase>, B<RegisterBase+1>, B<RegisterBase+2> and
+B<RegisterBase+3> will be read and the data combined into one
+64E<nbsp>value.
 
 =item B<RegisterCmd> B<ReadHolding>|B<ReadInput>
 
@@ -4170,9 +4359,19 @@ supported.
 
 =item B<Instance> I<Instance>
 
-Sets the type instance to use when dispatching the value to I<collectd>. If
+Sets the type instance to use when dispatching the value to I<Instance>. If
 unset, an empty string (no type instance) is used.
 
+=item B<Scale> I<Value>
+
+The values taken from device are multiplied by I<Value>. The field is optional
+and the default is B<1.0>.
+
+=item B<Shift> I<Value>
+
+I<Value> is added to values from device after they have been multiplied by
+B<Scale> value. The field is optional and the default value is B<0.0>.
+
 =back
 
 =item E<lt>B<Host> I<Name>E<gt> blocks
@@ -7092,6 +7291,7 @@ which configures the connection parameters for this node.
         Port "6379"
         Timeout 2000
         <Query "LLEN myqueue">
+          #Database 0
           Type "queue_length"
           Instance "myqueue"
         <Query>
@@ -7137,6 +7337,11 @@ than B<Interval> defined globally.
 The B<Query> block identifies a query to execute against the redis server.
 There may be an arbitrary number of queries to execute.
 
+=item B<Database> I<Index>
+
+This index selects the Redis logical database to use for query. Defaults
+to C<0>.
+
 =item B<Type> I<Collectd type>
 
 Within a query definition, a valid collectd type to use as when submitting
@@ -8938,6 +9143,12 @@ B<Note>: I<perf> metrics can't be collected if I<intel_rdt> plugin is enabled.
 
 =back
 
+=item B<PersistentNotification> B<true>|B<false>
+Override default configuration to only send notifications when there is a change
+in the lifecycle state of a domain. When set to true notifications will be sent
+for every read cycle. Default is false. Does not affect the stats being
+dispatched.
+
 =back
 
 =head2 Plugin C<vmem>
index f47b24b..8f8334e 100644 (file)
@@ -75,6 +75,65 @@ int plugin_register_data_set(const data_set_t *ds) { return ENOTSUP; }
 
 int plugin_dispatch_values(value_list_t const *vl) { return ENOTSUP; }
 
+int plugin_dispatch_notification(__attribute__((unused))
+                                 const notification_t *notif) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_add_string(__attribute__((unused))
+                                        notification_t *n,
+                                        __attribute__((unused))
+                                        const char *name,
+                                        __attribute__((unused))
+                                        const char *value) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_add_signed_int(__attribute__((unused))
+                                            notification_t *n,
+                                            __attribute__((unused))
+                                            const char *name,
+                                            __attribute__((unused))
+                                            int64_t value) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_add_unsigned_int(__attribute__((unused))
+                                              notification_t *n,
+                                              __attribute__((unused))
+                                              const char *name,
+                                              __attribute__((unused))
+                                              uint64_t value) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_add_double(__attribute__((unused))
+                                        notification_t *n,
+                                        __attribute__((unused))
+                                        const char *name,
+                                        __attribute__((unused)) double value) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_add_boolean(__attribute__((unused))
+                                         notification_t *n,
+                                         __attribute__((unused))
+                                         const char *name,
+                                         __attribute__((unused)) _Bool value) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_copy(__attribute__((unused)) notification_t *dst,
+                                  __attribute__((unused))
+                                  const notification_t *src) {
+  return ENOTSUP;
+}
+
+int plugin_notification_meta_free(__attribute__((unused))
+                                  notification_meta_t *n) {
+  return ENOTSUP;
+}
+
 int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) {
   return ENOTSUP;
 }
index 4ef9f8c..1909d8c 100644 (file)
--- a/src/dbi.c
+++ b/src/dbi.c
@@ -547,12 +547,18 @@ static int cdbi_read_database_query(cdbi_database_t *db, /* {{{ */
     sstrncpy(column_names[i], column_name, DATA_MAX_NAME_LEN);
   } /* }}} for (i = 0; i < column_num; i++) */
 
-  udb_query_prepare_result(
+  status = udb_query_prepare_result(
       q, prep_area, (db->host ? db->host : hostname_g),
       /* plugin = */ (db->plugin_name != NULL) ? db->plugin_name : "dbi",
       db->name, column_names, column_num,
       /* interval = */ (db->interval > 0) ? db->interval : 0);
 
+  if (status != 0) {
+    ERROR("dbi plugin: udb_query_prepare_result failed with status %i.",
+          status);
+    BAIL_OUT(-1);
+  }
+
   /* 0 = error; 1 = success; */
   status = dbi_result_first_row(res); /* {{{ */
   if (status != 1) {
index 6fd642b..9970be0 100644 (file)
@@ -428,7 +428,7 @@ static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
   }
   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
 
-  for (int i = 0; i < ec->nb_ports; i++) {
+  for (unsigned int i = 0; i < ec->nb_ports; i++) {
     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
       struct rte_eth_link link;
       ec->link_info[i].read_time = cdtime();
@@ -497,7 +497,7 @@ static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
         ec->nb_ports);
 
   /* dispatch Link Status values to collectd */
-  for (int i = 0; i < ec->nb_ports; i++) {
+  for (unsigned int i = 0; i < ec->nb_ports; i++) {
     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
       if (!ec->config.link_status.send_updated ||
           ec->link_info[i].status_updated) {
index 2353684..c9bbb50 100644 (file)
@@ -1,7 +1,7 @@
 /**
  * collectd - src/intel_pmu.c
  *
- * Copyright(c) 2017 Intel Corporation. All rights reserved.
+ * Copyright(c) 2017-2018 Intel Corporation. All rights reserved.
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
  *
  * Authors:
  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
+ *   Kamil Wiatrowski <kamilx.wiatrowski@intel.com>
  **/
 
 #include "collectd.h"
 #include "common.h"
 
+#include "utils_config_cores.h"
+
 #include <jevents.h>
 #include <jsession.h>
 
@@ -70,6 +73,7 @@ struct intel_pmu_ctx_s {
   char event_list_fn[PATH_MAX];
   char **hw_events;
   size_t hw_events_count;
+  core_groups_list_t cores;
   struct eventlist *event_list;
 };
 typedef struct intel_pmu_ctx_s intel_pmu_ctx_t;
@@ -197,8 +201,61 @@ static void pmu_dump_config(void) {
   }
 }
 
+static void pmu_dump_cgroups(void) {
+
+  DEBUG(PMU_PLUGIN ": Core groups:");
+
+  for (size_t i = 0; i < g_ctx.cores.num_cgroups; i++) {
+    core_group_t *cgroup = g_ctx.cores.cgroups + i;
+    const size_t cores_size = cgroup->num_cores * 4 + 1;
+    char *cores = calloc(cores_size, sizeof(*cores));
+    if (cores == NULL) {
+      DEBUG(PMU_PLUGIN ": Failed to allocate string to list cores.");
+      return;
+    }
+    for (size_t j = 0; j < cgroup->num_cores; j++)
+      if (snprintf(cores + strlen(cores), cores_size - strlen(cores), " %d",
+                   cgroup->cores[j]) < 0) {
+        DEBUG(PMU_PLUGIN ": Failed to write list of cores to string.");
+        sfree(cores);
+        return;
+      }
+
+    DEBUG(PMU_PLUGIN ":   group[%" PRIsz "]", i);
+    DEBUG(PMU_PLUGIN ":     description: %s", cgroup->desc);
+    DEBUG(PMU_PLUGIN ":     cores count: %" PRIsz, cgroup->num_cores);
+    DEBUG(PMU_PLUGIN ":     cores      :%s", cores);
+    sfree(cores);
+  }
+}
+
 #endif /* COLLECT_DEBUG */
 
+static int pmu_validate_cgroups(core_group_t *cgroups, size_t len,
+                                int max_cores) {
+  /* i - group index, j - core index */
+  for (size_t i = 0; i < len; i++) {
+    for (size_t j = 0; j < cgroups[i].num_cores; j++) {
+      int core = (int)cgroups[i].cores[j];
+
+      /* Core index cannot exceed number of cores in system,
+         note that max_cores include both online and offline CPUs. */
+      if (core >= max_cores) {
+        ERROR(PMU_PLUGIN ": Core %d is not valid, max core index: %d.", core,
+              max_cores - 1);
+        return -1;
+      }
+    }
+    /* Check if cores are set in remaining groups */
+    for (size_t k = i + 1; k < len; k++)
+      if (config_cores_cmp_cgroups(&cgroups[i], &cgroups[k]) != 0) {
+        ERROR(PMU_PLUGIN ": Same cores cannot be set in different groups.");
+        return -1;
+      }
+  }
+  return 0;
+}
+
 static int pmu_config_hw_events(oconfig_item_t *ci) {
 
   if (strcasecmp("HardwareEvents", ci->key) != 0) {
@@ -253,6 +310,8 @@ static int pmu_config(oconfig_item_t *ci) {
       ret = pmu_config_hw_events(child);
     } else if (strcasecmp("ReportSoftwareEvents", child->key) == 0) {
       ret = cf_util_get_boolean(child, &g_ctx.sw_events);
+    } else if (strcasecmp("Cores", child->key) == 0) {
+      ret = config_cores_parse(child, &g_ctx.cores);
     } else {
       ERROR(PMU_PLUGIN ": Unknown configuration parameter \"%s\".", child->key);
       ret = -1;
@@ -271,20 +330,17 @@ static int pmu_config(oconfig_item_t *ci) {
   return 0;
 }
 
-static void pmu_submit_counter(int cpu, char *event, counter_t value,
-                               meta_data_t *meta) {
+static void pmu_submit_counter(const char *cgroup, const char *event,
+                               counter_t value, meta_data_t *meta) {
   value_list_t vl = VALUE_LIST_INIT;
 
   vl.values = &(value_t){.counter = value};
   vl.values_len = 1;
 
   sstrncpy(vl.plugin, PMU_PLUGIN, sizeof(vl.plugin));
-  if (cpu == -1) {
-    snprintf(vl.plugin_instance, sizeof(vl.plugin_instance), "all");
-  } else {
+  sstrncpy(vl.plugin_instance, cgroup, sizeof(vl.plugin_instance));
+  if (meta)
     vl.meta = meta;
-    snprintf(vl.plugin_instance, sizeof(vl.plugin_instance), "%d", cpu);
-  }
   sstrncpy(vl.type, "counter", sizeof(vl.type));
   sstrncpy(vl.type_instance, event, sizeof(vl.type_instance));
 
@@ -317,49 +373,65 @@ static void pmu_dispatch_data(void) {
   struct event *e;
 
   for (e = g_ctx.event_list->eventlist; e; e = e->next) {
-    uint64_t all_value = 0;
-    int event_enabled = 0;
-    for (int i = 0; i < g_ctx.event_list->num_cpus; i++) {
-
-      if (e->efd[i].fd < 0)
-        continue;
-
-      event_enabled++;
-
-      /* If there are more events than counters, the kernel uses time
-       * multiplexing. With multiplexing, at the end of the run,
-       * the counter is scaled basing on total time enabled vs time running.
-       * final_count = raw_count * time_enabled/time_running
-       */
-      uint64_t value = event_scaled_value(e, i);
-      all_value += value;
-
-      /* get meta data with information about scaling */
-      meta_data_t *meta = pmu_meta_data_create(&e->efd[i]);
-
-      /* dispatch per CPU value */
-      pmu_submit_counter(i, e->event, value, meta);
-
-      meta_data_destroy(meta);
-    }
+    for (size_t i = 0; i < g_ctx.cores.num_cgroups; i++) {
+      core_group_t *cgroup = g_ctx.cores.cgroups + i;
+      uint64_t cgroup_value = 0;
+      int event_enabled_cgroup = 0;
+      meta_data_t *meta = NULL;
+
+      for (size_t j = 0; j < cgroup->num_cores; j++) {
+        int core = (int)cgroup->cores[j];
+        if (e->efd[core].fd < 0)
+          continue;
+
+        event_enabled_cgroup++;
+
+        /* If there are more events than counters, the kernel uses time
+         * multiplexing. With multiplexing, at the end of the run,
+         * the counter is scaled basing on total time enabled vs time running.
+         * final_count = raw_count * time_enabled/time_running
+         */
+        uint64_t value = event_scaled_value(e, core);
+        cgroup_value += value;
+
+        /* get meta data with information about scaling */
+        if (cgroup->num_cores == 1)
+          meta = pmu_meta_data_create(&e->efd[core]);
+      }
 
-    if (event_enabled > 0) {
-      DEBUG(PMU_PLUGIN ": %-20s %'10lu", e->event, all_value);
-      /* dispatch all CPU value */
-      pmu_submit_counter(-1, e->event, all_value, NULL);
+      if (event_enabled_cgroup > 0) {
+        DEBUG(PMU_PLUGIN ": %s/%s = %lu", e->event, cgroup->desc, cgroup_value);
+        /* dispatch per core group value */
+        pmu_submit_counter(cgroup->desc, e->event, cgroup_value, meta);
+        meta_data_destroy(meta);
+      }
     }
   }
 }
 
 static int pmu_read(__attribute__((unused)) user_data_t *ud) {
   int ret;
+  struct event *e;
 
   DEBUG(PMU_PLUGIN ": %s:%d", __FUNCTION__, __LINE__);
 
-  ret = read_all_events(g_ctx.event_list);
-  if (ret != 0) {
-    ERROR(PMU_PLUGIN ": Failed to read values of all events.");
-    return ret;
+  /* read all events only for configured cores */
+  for (e = g_ctx.event_list->eventlist; e; e = e->next) {
+    for (size_t i = 0; i < g_ctx.cores.num_cgroups; i++) {
+      core_group_t *cgroup = g_ctx.cores.cgroups + i;
+      for (size_t j = 0; j < cgroup->num_cores; j++) {
+        int core = (int)cgroup->cores[j];
+        if (e->efd[core].fd < 0)
+          continue;
+
+        ret = read_event(e, core);
+        if (ret != 0) {
+          ERROR(PMU_PLUGIN ": Failed to read value of %s/%d event.", e->event,
+                core);
+          return ret;
+        }
+      }
+    }
   }
 
   pmu_dispatch_data();
@@ -404,7 +476,7 @@ static int pmu_add_hw_events(struct eventlist *el, char **e, size_t count) {
     if (!events)
       return -1;
 
-    char *s, *tmp;
+    char *s, *tmp = NULL;
     for (s = strtok_r(events, ",", &tmp); s; s = strtok_r(NULL, ",", &tmp)) {
 
       /* Allocate memory for event struct that contains array of efd structs
@@ -460,6 +532,7 @@ static void pmu_free_events(struct eventlist *el) {
 
   while (e) {
     struct event *next = e->next;
+    sfree(e->event);
     sfree(e);
     e = next;
   }
@@ -474,13 +547,18 @@ static int pmu_setup_events(struct eventlist *el, bool measure_all,
 
   for (e = el->eventlist; e; e = e->next) {
 
-    for (int i = 0; i < el->num_cpus; i++) {
-      if (setup_event(e, i, leader, measure_all, measure_pid) < 0) {
-        WARNING(PMU_PLUGIN ": perf event '%s' is not available (cpu=%d).",
-                e->event, i);
-      } else {
-        /* success if at least one event was set */
-        ret = 0;
+    for (size_t i = 0; i < g_ctx.cores.num_cgroups; i++) {
+      core_group_t *cgroup = g_ctx.cores.cgroups + i;
+      for (size_t j = 0; j < cgroup->num_cores; j++) {
+        int core = (int)cgroup->cores[j];
+
+        if (setup_event(e, core, leader, measure_all, measure_pid) < 0) {
+          WARNING(PMU_PLUGIN ": perf event '%s' is not available (cpu=%d).",
+                  e->event, core);
+        } else {
+          /* success if at least one event was set */
+          ret = 0;
+        }
       }
     }
 
@@ -504,6 +582,24 @@ static int pmu_init(void) {
     return -ENOMEM;
   }
 
+  if (g_ctx.cores.num_cgroups == 0) {
+    ret = config_cores_default(g_ctx.event_list->num_cpus, &g_ctx.cores);
+    if (ret != 0) {
+      ERROR(PMU_PLUGIN ": Failed to set default core groups.");
+      goto init_error;
+    }
+  } else {
+    ret = pmu_validate_cgroups(g_ctx.cores.cgroups, g_ctx.cores.num_cgroups,
+                               g_ctx.event_list->num_cpus);
+    if (ret != 0) {
+      ERROR(PMU_PLUGIN ": Invalid core groups configuration.");
+      goto init_error;
+    }
+  }
+#if COLLECT_DEBUG
+  pmu_dump_cgroups();
+#endif
+
   if (g_ctx.hw_cache_events) {
     ret =
         pmu_add_events(g_ctx.event_list, PERF_TYPE_HW_CACHE, g_hw_cache_events,
@@ -579,6 +675,8 @@ init_error:
   sfree(g_ctx.hw_events);
   g_ctx.hw_events_count = 0;
 
+  config_cores_cleanup(&g_ctx.cores);
+
   return ret;
 }
 
@@ -594,6 +692,8 @@ static int pmu_shutdown(void) {
   sfree(g_ctx.hw_events);
   g_ctx.hw_events_count = 0;
 
+  config_cores_cleanup(&g_ctx.cores);
+
   return 0;
 }
 
index 3d4c988..667033c 100644 (file)
@@ -1,7 +1,7 @@
 /**
  * collectd - src/intel_rdt.c
  *
- * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ * Copyright(c) 2016-2018 Intel Corporation. All rights reserved.
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -25,8 +25,9 @@
  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
  **/
 
-#include "collectd.h"
 #include "common.h"
+#include "utils_config_cores.h"
+#include "collectd.h"
 
 #include <pqos.h>
 
@@ -41,16 +42,9 @@ typedef enum {
   CONFIGURATION_ERROR,
 } rdt_config_status;
 
-struct rdt_core_group_s {
-  char *desc;
-  size_t num_cores;
-  unsigned *cores;
-  enum pqos_mon_event events;
-};
-typedef struct rdt_core_group_s rdt_core_group_t;
-
 struct rdt_ctx_s {
-  rdt_core_group_t cgroups[RDT_MAX_CORES];
+  core_groups_list_t cores;
+  enum pqos_mon_event events[RDT_MAX_CORES];
   struct pqos_mon_data *pgroups[RDT_MAX_CORES];
   size_t num_groups;
   const struct pqos_cpuinfo *pqos_cpu;
@@ -63,243 +57,6 @@ static rdt_ctx_t *g_rdt;
 
 static rdt_config_status g_state = UNKNOWN;
 
-static int isdup(const uint64_t *nums, size_t size, uint64_t val) {
-  for (size_t i = 0; i < size; i++)
-    if (nums[i] == val)
-      return 1;
-  return 0;
-}
-
-static int strtouint64(const char *s, uint64_t *n) {
-  char *endptr = NULL;
-
-  assert(s != NULL);
-  assert(n != NULL);
-
-  *n = strtoull(s, &endptr, 0);
-
-  if (!(*s != '\0' && *endptr == '\0')) {
-    DEBUG(RDT_PLUGIN ": Error converting '%s' to unsigned number.", s);
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-/*
- * NAME
- *   strlisttonums
- *
- * DESCRIPTION
- *   Converts string of characters representing list of numbers into array of
- *   numbers. Allowed formats are:
- *     0,1,2,3
- *     0-10,20-18
- *     1,3,5-8,10,0x10-12
- *
- *   Numbers can be in decimal or hexadecimal format.
- *
- * PARAMETERS
- *   `s'         String representing list of unsigned numbers.
- *   `nums'      Array to put converted numeric values into.
- *   `max'       Maximum number of elements that nums can accommodate.
- *
- * RETURN VALUE
- *    Number of elements placed into nums.
- */
-static size_t strlisttonums(char *s, uint64_t *nums, size_t max) {
-  int ret;
-  size_t index = 0;
-  char *saveptr = NULL;
-
-  if (s == NULL || nums == NULL || max == 0)
-    return index;
-
-  for (;;) {
-    char *p = NULL;
-    char *token = NULL;
-
-    token = strtok_r(s, ",", &saveptr);
-    if (token == NULL)
-      break;
-
-    s = NULL;
-
-    while (isspace(*token))
-      token++;
-    if (*token == '\0')
-      continue;
-
-    p = strchr(token, '-');
-    if (p != NULL) {
-      uint64_t n, start, end;
-      *p = '\0';
-      ret = strtouint64(token, &start);
-      if (ret < 0)
-        return 0;
-      ret = strtouint64(p + 1, &end);
-      if (ret < 0)
-        return 0;
-      if (start > end) {
-        return 0;
-      }
-      for (n = start; n <= end; n++) {
-        if (!(isdup(nums, index, n))) {
-          nums[index] = n;
-          index++;
-        }
-        if (index >= max)
-          return index;
-      }
-    } else {
-      uint64_t val;
-
-      ret = strtouint64(token, &val);
-      if (ret < 0)
-        return 0;
-
-      if (!(isdup(nums, index, val))) {
-        nums[index] = val;
-        index++;
-      }
-      if (index >= max)
-        return index;
-    }
-  }
-
-  return index;
-}
-
-/*
- * NAME
- *   cgroup_cmp
- *
- * DESCRIPTION
- *   Function to compare cores in 2 core groups.
- *
- * PARAMETERS
- *   `cg_a'      Pointer to core group a.
- *   `cg_b'      Pointer to core group b.
- *
- * RETURN VALUE
- *    1 if both groups contain the same cores
- *    0 if none of their cores match
- *    -1 if some but not all cores match
- */
-static int cgroup_cmp(const rdt_core_group_t *cg_a,
-                      const rdt_core_group_t *cg_b) {
-  int found = 0;
-
-  assert(cg_a != NULL);
-  assert(cg_b != NULL);
-
-  const int sz_a = cg_a->num_cores;
-  const int sz_b = cg_b->num_cores;
-  const unsigned *tab_a = cg_a->cores;
-  const unsigned *tab_b = cg_b->cores;
-
-  for (int i = 0; i < sz_a; i++) {
-    for (int j = 0; j < sz_b; j++)
-      if (tab_a[i] == tab_b[j])
-        found++;
-  }
-  /* if no cores are the same */
-  if (!found)
-    return 0;
-  /* if group contains same cores */
-  if (sz_a == sz_b && sz_b == found)
-    return 1;
-  /* if not all cores are the same */
-  return -1;
-}
-
-static int cgroup_set(rdt_core_group_t *cg, char *desc, uint64_t *cores,
-                      size_t num_cores) {
-  assert(cg != NULL);
-  assert(desc != NULL);
-  assert(cores != NULL);
-  assert(num_cores > 0);
-
-  cg->cores = calloc(num_cores, sizeof(unsigned));
-  if (cg->cores == NULL) {
-    ERROR(RDT_PLUGIN ": Error allocating core group table");
-    return -ENOMEM;
-  }
-  cg->num_cores = num_cores;
-  cg->desc = strdup(desc);
-  if (cg->desc == NULL) {
-    ERROR(RDT_PLUGIN ": Error allocating core group description");
-    sfree(cg->cores);
-    return -ENOMEM;
-  }
-
-  for (size_t i = 0; i < num_cores; i++)
-    cg->cores[i] = (unsigned)cores[i];
-
-  return 0;
-}
-
-/*
- * NAME
- *   oconfig_to_cgroups
- *
- * DESCRIPTION
- *   Function to set the descriptions and cores for each core group.
- *   Takes a config option containing list of strings that are used to set
- *   core group values.
- *
- * PARAMETERS
- *   `item'        Config option containing core groups.
- *   `groups'      Table of core groups to set values in.
- *   `max_groups'  Maximum number of core groups allowed.
- *
- * RETURN VALUE
- *   On success, the number of core groups set up. On error, appropriate
- *   negative error value.
- */
-static int oconfig_to_cgroups(oconfig_item_t *item, rdt_core_group_t *groups,
-                              size_t max_groups) {
-  int index = 0;
-
-  assert(groups != NULL);
-  assert(max_groups > 0);
-  assert(item != NULL);
-
-  for (int j = 0; j < item->values_num; j++) {
-    int ret;
-    size_t n;
-    uint64_t cores[RDT_MAX_CORES] = {0};
-    char value[DATA_MAX_NAME_LEN];
-
-    if ((item->values[j].value.string == NULL) ||
-        (strlen(item->values[j].value.string) == 0))
-      continue;
-
-    sstrncpy(value, item->values[j].value.string, sizeof(value));
-
-    n = strlisttonums(value, cores, STATIC_ARRAY_SIZE(cores));
-    if (n == 0) {
-      ERROR(RDT_PLUGIN ": Error parsing core group (%s)",
-            item->values[j].value.string);
-      return -EINVAL;
-    }
-
-    /* set core group info */
-    ret = cgroup_set(&groups[index], item->values[j].value.string, cores, n);
-    if (ret < 0)
-      return ret;
-
-    index++;
-
-    if (index >= max_groups) {
-      WARNING(RDT_PLUGIN ": Too many core groups configured");
-      return index;
-    }
-  }
-
-  return index;
-}
-
 #if COLLECT_DEBUG
 static void rdt_dump_cgroups(void) {
   char cores[RDT_MAX_CORES * 4];
@@ -310,18 +67,19 @@ static void rdt_dump_cgroups(void) {
   DEBUG(RDT_PLUGIN ": Core Groups Dump");
   DEBUG(RDT_PLUGIN ":  groups count: %" PRIsz, g_rdt->num_groups);
 
-  for (int i = 0; i < g_rdt->num_groups; i++) {
+  for (size_t i = 0; i < g_rdt->num_groups; i++) {
+    core_group_t *cgroup = g_rdt->cores.cgroups + i;
 
     memset(cores, 0, sizeof(cores));
-    for (int j = 0; j < g_rdt->cgroups[i].num_cores; j++) {
+    for (int j = 0; j < cgroup->num_cores; j++) {
       snprintf(cores + strlen(cores), sizeof(cores) - strlen(cores) - 1, " %d",
-               g_rdt->cgroups[i].cores[j]);
+               cgroup->cores[j]);
     }
 
-    DEBUG(RDT_PLUGIN ":  group[%d]:", i);
-    DEBUG(RDT_PLUGIN ":    description: %s", g_rdt->cgroups[i].desc);
+    DEBUG(RDT_PLUGIN ":  group[%zu]:", i);
+    DEBUG(RDT_PLUGIN ":    description: %s", cgroup->desc);
     DEBUG(RDT_PLUGIN ":    cores: %s", cores);
-    DEBUG(RDT_PLUGIN ":    events: 0x%X", g_rdt->cgroups[i].events);
+    DEBUG(RDT_PLUGIN ":    events: 0x%X", g_rdt->events[i]);
   }
 
   return;
@@ -350,45 +108,59 @@ static void rdt_dump_data(void) {
     double mbr = bytes_to_mb(pv->mbm_remote_delta);
     double mbl = bytes_to_mb(pv->mbm_local_delta);
 
-    DEBUG(" [%s] %8u %10.1f %10.1f %10.1f", g_rdt->cgroups[i].desc,
+    DEBUG(" [%s] %8u %10.1f %10.1f %10.1f", g_rdt->cores.cgroups[i].desc,
           g_rdt->pgroups[i]->poll_ctx[0].rmid, llc, mbl, mbr);
   }
 }
 #endif /* COLLECT_DEBUG */
 
 static void rdt_free_cgroups(void) {
+  config_cores_cleanup(&g_rdt->cores);
   for (int i = 0; i < RDT_MAX_CORES; i++) {
-    sfree(g_rdt->cgroups[i].desc);
-
-    sfree(g_rdt->cgroups[i].cores);
-    g_rdt->cgroups[i].num_cores = 0;
-
     sfree(g_rdt->pgroups[i]);
   }
 }
 
 static int rdt_default_cgroups(void) {
-  int ret;
+  unsigned num_cores = g_rdt->pqos_cpu->num_cores;
+
+  g_rdt->cores.cgroups = calloc(num_cores, sizeof(*(g_rdt->cores.cgroups)));
+  if (g_rdt->cores.cgroups == NULL) {
+    ERROR(RDT_PLUGIN ": Error allocating core groups array");
+    return -ENOMEM;
+  }
+  g_rdt->cores.num_cgroups = num_cores;
 
   /* configure each core in separate group */
-  for (unsigned i = 0; i < g_rdt->pqos_cpu->num_cores; i++) {
+  for (unsigned i = 0; i < num_cores; i++) {
+    core_group_t *cgroup = g_rdt->cores.cgroups + i;
     char desc[DATA_MAX_NAME_LEN];
-    uint64_t core = i;
-
-    snprintf(desc, sizeof(desc), "%d", g_rdt->pqos_cpu->cores[i].lcore);
 
     /* set core group info */
-    ret = cgroup_set(&g_rdt->cgroups[i], desc, &core, 1);
-    if (ret < 0)
-      return ret;
+    cgroup->cores = calloc(1, sizeof(*(cgroup->cores)));
+    if (cgroup->cores == NULL) {
+      ERROR(RDT_PLUGIN ": Error allocating cores array");
+      rdt_free_cgroups();
+      return -ENOMEM;
+    }
+    cgroup->num_cores = 1;
+    cgroup->cores[0] = i;
+
+    snprintf(desc, sizeof(desc), "%d", g_rdt->pqos_cpu->cores[i].lcore);
+    cgroup->desc = strdup(desc);
+    if (cgroup->desc == NULL) {
+      ERROR(RDT_PLUGIN ": Error allocating core group description");
+      rdt_free_cgroups();
+      return -ENOMEM;
+    }
   }
 
-  return g_rdt->pqos_cpu->num_cores;
+  return num_cores;
 }
 
-static int rdt_is_core_id_valid(int core_id) {
+static int rdt_is_core_id_valid(unsigned int core_id) {
 
-  for (int i = 0; i < g_rdt->pqos_cpu->num_cores; i++)
+  for (unsigned int i = 0; i < g_rdt->pqos_cpu->num_cores; i++)
     if (core_id == g_rdt->pqos_cpu->cores[i].lcore)
       return 1;
 
@@ -396,38 +168,23 @@ static int rdt_is_core_id_valid(int core_id) {
 }
 
 static int rdt_config_cgroups(oconfig_item_t *item) {
-  int n = 0;
+  size_t n = 0;
   enum pqos_mon_event events = 0;
 
-  if (item == NULL) {
-    DEBUG(RDT_PLUGIN ": cgroups_config: Invalid argument.");
-    return -EINVAL;
-  }
-
-  DEBUG(RDT_PLUGIN ": Core groups [%d]:", item->values_num);
-  for (int j = 0; j < item->values_num; j++) {
-    if (item->values[j].type != OCONFIG_TYPE_STRING) {
-      ERROR(RDT_PLUGIN ": given core group value is not a string [idx=%d]", j);
-      return -EINVAL;
-    }
-    DEBUG(RDT_PLUGIN ":  [%d]: %s", j, item->values[j].value.string);
-  }
-
-  n = oconfig_to_cgroups(item, g_rdt->cgroups, g_rdt->pqos_cpu->num_cores);
-  if (n < 0) {
+  if (config_cores_parse(item, &g_rdt->cores) < 0) {
     rdt_free_cgroups();
     ERROR(RDT_PLUGIN ": Error parsing core groups configuration.");
     return -EINVAL;
   }
+  n = g_rdt->cores.num_cgroups;
 
   /* validate configured core id values */
-  for (int group_idx = 0; group_idx < n; group_idx++) {
-    for (int core_idx = 0; core_idx < g_rdt->cgroups[group_idx].num_cores;
-         core_idx++) {
-      if (!rdt_is_core_id_valid(g_rdt->cgroups[group_idx].cores[core_idx])) {
-        ERROR(RDT_PLUGIN ": Core group '%s' contains invalid core id '%d'",
-              g_rdt->cgroups[group_idx].desc,
-              (int)g_rdt->cgroups[group_idx].cores[core_idx]);
+  for (size_t group_idx = 0; group_idx < n; group_idx++) {
+    core_group_t *cgroup = g_rdt->cores.cgroups + group_idx;
+    for (size_t core_idx = 0; core_idx < cgroup->num_cores; core_idx++) {
+      if (!rdt_is_core_id_valid(cgroup->cores[core_idx])) {
+        ERROR(RDT_PLUGIN ": Core group '%s' contains invalid core id '%u'",
+              cgroup->desc, cgroup->cores[core_idx]);
         rdt_free_cgroups();
         return -EINVAL;
       }
@@ -436,18 +193,19 @@ static int rdt_config_cgroups(oconfig_item_t *item) {
 
   if (n == 0) {
     /* create default core groups if "Cores" config option is empty */
-    n = rdt_default_cgroups();
-    if (n < 0) {
+    int ret = rdt_default_cgroups();
+    if (ret < 0) {
       rdt_free_cgroups();
       ERROR(RDT_PLUGIN ": Error creating default core groups configuration.");
-      return n;
+      return ret;
     }
+    n = (size_t)ret;
     INFO(RDT_PLUGIN
          ": No core groups configured. Default core groups created.");
   }
 
   /* Get all available events on this platform */
-  for (int i = 0; i < g_rdt->cap_mon->u.mon->num_events; i++)
+  for (unsigned int i = 0; i < g_rdt->cap_mon->u.mon->num_events; i++)
     events |= g_rdt->cap_mon->u.mon->events[i].type;
 
   events &= ~(PQOS_PERF_EVENT_LLC_MISS);
@@ -457,10 +215,11 @@ static int rdt_config_cgroups(oconfig_item_t *item) {
   DEBUG(RDT_PLUGIN ": Available events to monitor: %#x", events);
 
   g_rdt->num_groups = n;
-  for (int i = 0; i < n; i++) {
-    for (int j = 0; j < i; j++) {
+  for (size_t i = 0; i < n; i++) {
+    for (size_t j = 0; j < i; j++) {
       int found = 0;
-      found = cgroup_cmp(&g_rdt->cgroups[j], &g_rdt->cgroups[i]);
+      found = config_cores_cmp_cgroups(&g_rdt->cores.cgroups[j],
+                                       &g_rdt->cores.cgroups[i]);
       if (found != 0) {
         rdt_free_cgroups();
         ERROR(RDT_PLUGIN ": Cannot monitor same cores in different groups.");
@@ -468,7 +227,7 @@ static int rdt_config_cgroups(oconfig_item_t *item) {
       }
     }
 
-    g_rdt->cgroups[i].events = events;
+    g_rdt->events[i] = events;
     g_rdt->pgroups[i] = calloc(1, sizeof(*g_rdt->pgroups[i]));
     if (g_rdt->pgroups[i] == NULL) {
       rdt_free_cgroups();
@@ -627,7 +386,9 @@ static int rdt_read(__attribute__((unused)) user_data_t *ud) {
   rdt_dump_data();
 #endif /* COLLECT_DEBUG */
 
-  for (int i = 0; i < g_rdt->num_groups; i++) {
+  for (size_t i = 0; i < g_rdt->num_groups; i++) {
+    core_group_t *cgroup = g_rdt->cores.cgroups + i;
+
     enum pqos_mon_event mbm_events =
         (PQOS_MON_EVENT_LMEM_BW | PQOS_MON_EVENT_TMEM_BW |
          PQOS_MON_EVENT_RMEM_BW);
@@ -636,16 +397,16 @@ static int rdt_read(__attribute__((unused)) user_data_t *ud) {
 
     /* Submit only monitored events data */
 
-    if (g_rdt->cgroups[i].events & PQOS_MON_EVENT_L3_OCCUP)
-      rdt_submit_gauge(g_rdt->cgroups[i].desc, "bytes", "llc", pv->llc);
+    if (g_rdt->events[i] & PQOS_MON_EVENT_L3_OCCUP)
+      rdt_submit_gauge(cgroup->desc, "bytes", "llc", pv->llc);
 
-    if (g_rdt->cgroups[i].events & PQOS_PERF_EVENT_IPC)
-      rdt_submit_gauge(g_rdt->cgroups[i].desc, "ipc", NULL, pv->ipc);
+    if (g_rdt->events[i] & PQOS_PERF_EVENT_IPC)
+      rdt_submit_gauge(cgroup->desc, "ipc", NULL, pv->ipc);
 
-    if (g_rdt->cgroups[i].events & mbm_events) {
-      rdt_submit_derive(g_rdt->cgroups[i].desc, "memory_bandwidth", "local",
+    if (g_rdt->events[i] & mbm_events) {
+      rdt_submit_derive(cgroup->desc, "memory_bandwidth", "local",
                         pv->mbm_local_delta);
-      rdt_submit_derive(g_rdt->cgroups[i].desc, "memory_bandwidth", "remote",
+      rdt_submit_derive(cgroup->desc, "memory_bandwidth", "remote",
                         pv->mbm_remote_delta);
     }
   }
@@ -664,11 +425,11 @@ static int rdt_init(void) {
     return ret;
 
   /* Start monitoring */
-  for (int i = 0; i < g_rdt->num_groups; i++) {
-    rdt_core_group_t *cg = &g_rdt->cgroups[i];
+  for (size_t i = 0; i < g_rdt->num_groups; i++) {
+    core_group_t *cg = g_rdt->cores.cgroups + i;
 
-    ret = pqos_mon_start(cg->num_cores, cg->cores, cg->events, (void *)cg->desc,
-                         g_rdt->pgroups[i]);
+    ret = pqos_mon_start(cg->num_cores, cg->cores, g_rdt->events[i],
+                         (void *)cg->desc, g_rdt->pgroups[i]);
 
     if (ret != PQOS_RETVAL_OK)
       ERROR(RDT_PLUGIN ": Error starting monitoring group %s (pqos status=%d)",
@@ -687,7 +448,7 @@ static int rdt_shutdown(void) {
     return 0;
 
   /* Stop monitoring */
-  for (int i = 0; i < g_rdt->num_groups; i++) {
+  for (size_t i = 0; i < g_rdt->num_groups; i++) {
     pqos_mon_stop(g_rdt->pgroups[i]);
   }
 
index 2642f64..fb99bad 100644 (file)
@@ -358,7 +358,7 @@ static const char *sensor_unit_to_type(ipmi_sensor_t *sensor) {
 
   /* find the db type by using sensor base unit type */
   enum ipmi_unit_type_e ipmi_type = ipmi_sensor_get_base_unit(sensor);
-  for (int i = 0; i < STATIC_ARRAY_SIZE(ipmi_db_type_map); i++)
+  for (size_t i = 0; i < STATIC_ARRAY_SIZE(ipmi_db_type_map); i++)
     if (ipmi_db_type_map[i].type == ipmi_type)
       return ipmi_db_type_map[i].type_name;
 
index efcf6be..bb9eaa0 100644 (file)
@@ -80,6 +80,8 @@ enum mb_register_type_e /* {{{ */
   REG_TYPE_UINT16,
   REG_TYPE_UINT32,
   REG_TYPE_UINT32_CDAB,
+  REG_TYPE_INT64,
+  REG_TYPE_UINT64,
   REG_TYPE_FLOAT,
   REG_TYPE_FLOAT_CDAB }; /* }}} */
 
@@ -105,6 +107,8 @@ struct mb_data_s /* {{{ */
   mb_mreg_type_t modbus_register_type;
   char type[DATA_MAX_NAME_LEN];
   char instance[DATA_MAX_NAME_LEN];
+  double scale;
+  double shift;
 
   mb_data_t *next;
 }; /* }}} */
@@ -392,21 +396,21 @@ static int mb_init_connection(mb_host_t *host) /* {{{ */
 } /* }}} int mb_init_connection */
 #endif /* !LEGACY_LIBMODBUS */
 
-#define CAST_TO_VALUE_T(ds, vt, raw)                                           \
+#define CAST_TO_VALUE_T(ds, vt, raw, scale, shift)                             \
   do {                                                                         \
     if ((ds)->ds[0].type == DS_TYPE_COUNTER)                                   \
-      (vt).counter = (counter_t)(raw);                                         \
+      (vt).counter = (((counter_t)(raw)*scale) + shift);                       \
     else if ((ds)->ds[0].type == DS_TYPE_GAUGE)                                \
-      (vt).gauge = (gauge_t)(raw);                                             \
+      (vt).gauge = (((gauge_t)(raw)*scale) + shift);                           \
     else if ((ds)->ds[0].type == DS_TYPE_DERIVE)                               \
-      (vt).derive = (derive_t)(raw);                                           \
+      (vt).derive = (((derive_t)(raw)*scale) + shift);                         \
     else /* if (ds->ds[0].type == DS_TYPE_ABSOLUTE) */                         \
-      (vt).absolute = (absolute_t)(raw);                                       \
+      (vt).absolute = (((absolute_t)(raw)*scale) + shift);                     \
   } while (0)
 
 static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
                         mb_data_t *data) {
-  uint16_t values[2] = {0};
+  uint16_t values[4] = {0};
   int values_num;
   const data_set_t *ds;
   int status = 0;
@@ -431,11 +435,13 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
       (data->register_type != REG_TYPE_INT32) &&
       (data->register_type != REG_TYPE_INT32_CDAB) &&
       (data->register_type != REG_TYPE_UINT32) &&
-      (data->register_type != REG_TYPE_UINT32_CDAB)) {
+      (data->register_type != REG_TYPE_UINT32_CDAB) &&
+      (data->register_type != REG_TYPE_INT64) &&
+      (data->register_type != REG_TYPE_UINT64)) {
     NOTICE(
         "Modbus plugin: The data source of type \"%s\" is %s, not gauge. "
         "This will most likely result in problems, because the register type "
-        "is not UINT32.",
+        "is not UINT32 or UINT64.",
         data->type, DS_TYPE_TO_STRING(ds->ds[0].type));
   }
 
@@ -446,6 +452,9 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
       (data->register_type == REG_TYPE_FLOAT) ||
       (data->register_type == REG_TYPE_FLOAT_CDAB))
     values_num = 2;
+  else if ((data->register_type == REG_TYPE_INT64) ||
+           (data->register_type == REG_TYPE_UINT64))
+    values_num = 4;
   else
     values_num = 1;
 
@@ -530,7 +539,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned float value is %g",
           (double)float_value);
 
-    CAST_TO_VALUE_T(ds, vt, float_value);
+    CAST_TO_VALUE_T(ds, vt, float_value, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_FLOAT_CDAB) {
     float float_value;
@@ -541,7 +550,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned float value is %g",
           (double)float_value);
 
-    CAST_TO_VALUE_T(ds, vt, float_value);
+    CAST_TO_VALUE_T(ds, vt, float_value, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT32) {
     union {
@@ -555,7 +564,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int32 value is %" PRIi32,
           v.i32);
 
-    CAST_TO_VALUE_T(ds, vt, v.i32);
+    CAST_TO_VALUE_T(ds, vt, v.i32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT32_CDAB) {
     union {
@@ -569,7 +578,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int32 value is %" PRIi32,
           v.i32);
 
-    CAST_TO_VALUE_T(ds, vt, v.i32);
+    CAST_TO_VALUE_T(ds, vt, v.i32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT16) {
     union {
@@ -584,7 +593,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int16 value is %" PRIi16,
           v.i16);
 
-    CAST_TO_VALUE_T(ds, vt, v.i16);
+    CAST_TO_VALUE_T(ds, vt, v.i16, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_UINT32) {
     uint32_t v32;
@@ -595,7 +604,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint32 value is %" PRIu32,
           v32);
 
-    CAST_TO_VALUE_T(ds, vt, v32);
+    CAST_TO_VALUE_T(ds, vt, v32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_UINT32_CDAB) {
     uint32_t v32;
@@ -606,7 +615,34 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint32 value is %" PRIu32,
           v32);
 
-    CAST_TO_VALUE_T(ds, vt, v32);
+    CAST_TO_VALUE_T(ds, vt, v32, data->scale, data->shift);
+    mb_submit(host, slave, data, vt);
+  } else if (data->register_type == REG_TYPE_UINT64) {
+    uint64_t v64;
+    value_t vt;
+
+    v64 = (((uint64_t)values[0]) << 48) | (((uint64_t)values[1]) << 32) |
+          (((uint64_t)values[2]) << 16) | (((uint64_t)values[3]));
+    DEBUG("Modbus plugin: mb_read_data: "
+          "Returned uint64 value is %" PRIu64,
+          v64);
+
+    CAST_TO_VALUE_T(ds, vt, v64, data->scale, data->shift);
+    mb_submit(host, slave, data, vt);
+  } else if (data->register_type == REG_TYPE_INT64) {
+    union {
+      uint64_t u64;
+      int64_t i64;
+    } v;
+    value_t vt;
+
+    v.u64 = (((uint64_t)values[0]) << 48) | (((uint64_t)values[1]) << 32) |
+            (((uint64_t)values[2]) << 16) | ((uint64_t)values[3]);
+    DEBUG("Modbus plugin: mb_read_data: "
+          "Returned uint64 value is %" PRIi64,
+          v.i64);
+
+    CAST_TO_VALUE_T(ds, vt, v.i64, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else /* if (data->register_type == REG_TYPE_UINT16) */
   {
@@ -616,7 +652,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint16 value is %" PRIu16,
           values[0]);
 
-    CAST_TO_VALUE_T(ds, vt, values[0]);
+    CAST_TO_VALUE_T(ds, vt, values[0], data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   }
 
@@ -723,6 +759,8 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
   data.name = NULL;
   data.register_type = REG_TYPE_UINT16;
   data.next = NULL;
+  data.scale = 1;
+  data.shift = 0;
 
   status = cf_util_get_string(ci, &data.name);
   if (status != 0)
@@ -736,6 +774,10 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Instance", child->key) == 0)
       status = cf_util_get_string_buffer(child, data.instance,
                                          sizeof(data.instance));
+    else if (strcasecmp("Scale", child->key) == 0)
+      status = cf_util_get_double(child, &data.scale);
+    else if (strcasecmp("Shift", child->key) == 0)
+      status = cf_util_get_double(child, &data.shift);
     else if (strcasecmp("RegisterBase", child->key) == 0)
       status = cf_util_get_int(child, &data.register_base);
     else if (strcasecmp("RegisterType", child->key) == 0) {
@@ -759,6 +801,10 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
         data.register_type = REG_TYPE_FLOAT;
       else if (strcasecmp("FloatLE", tmp) == 0)
         data.register_type = REG_TYPE_FLOAT_CDAB;
+      else if (strcasecmp("Uint64", tmp) == 0)
+        data.register_type = REG_TYPE_UINT64;
+      else if (strcasecmp("Int64", tmp) == 0)
+        data.register_type = REG_TYPE_INT64;
       else {
         ERROR("Modbus plugin: The register type \"%s\" is unknown.", tmp);
         status = -1;
index 0bd598c..a1f52a4 100644 (file)
@@ -543,7 +543,7 @@ static int qos_filter_cb(const struct nlmsghdr *nlh, void *args) {
 
       int r = snprintf(type_instance, sizeof(type_instance), "%s-%s", tc_type,
                        tc_inst);
-      if (r >= sizeof(type_instance)) {
+      if ((size_t)r >= sizeof(type_instance)) {
         ERROR("netlink plugin: type_instance truncated to %zu bytes, need %d",
               sizeof(type_instance), r);
         return MNL_CB_ERROR;
@@ -582,7 +582,7 @@ static int qos_filter_cb(const struct nlmsghdr *nlh, void *args) {
 
       int r = snprintf(type_instance, sizeof(type_instance), "%s-%s", tc_type,
                        tc_inst);
-      if (r >= sizeof(type_instance)) {
+      if ((size_t)r >= sizeof(type_instance)) {
         ERROR("netlink plugin: type_instance truncated to %zu bytes, need %d",
               sizeof(type_instance), r);
         return MNL_CB_ERROR;
index c76bce2..4bca838 100644 (file)
@@ -583,6 +583,8 @@ static int o_read_database_query(o_database_t *db, /* {{{ */
     }
   } /* }}} while (42) */
 
+  udb_query_finish_result(q, prep_area);
+
   /* DEBUG ("oracle plugin: o_read_database_query: This statement succeeded:
    * %s", q->statement); */
   FREE_ALL;
index cfc5790..ffe6c5a 100644 (file)
@@ -995,8 +995,9 @@ static int ps_read_tasks_status(process_entry_t *ps) {
 
     tpid = ent->d_name;
 
-    if (snprintf(filename, sizeof(filename), "/proc/%li/task/%s/status", ps->id,
-                 tpid) >= sizeof(filename)) {
+    int r = snprintf(filename, sizeof(filename), "/proc/%li/task/%s/status",
+                     ps->id, tpid);
+    if ((size_t)r >= sizeof(filename)) {
       DEBUG("Filename too long: `%s'", filename);
       continue;
     }
index 2d33e2d..5734098 100644 (file)
@@ -36,7 +36,7 @@
 #define REDIS_DEF_PASSWD ""
 #define REDIS_DEF_PORT 6379
 #define REDIS_DEF_TIMEOUT 2000
-#define REDIS_DEF_DB_COUNT 16
+#define REDIS_DEF_DB_COUNT 256
 #define MAX_REDIS_NODE_NAME 64
 #define MAX_REDIS_PASSWD_LENGTH 512
 #define MAX_REDIS_VAL_SIZE 256
@@ -60,6 +60,8 @@ struct redis_query_s {
   char query[MAX_REDIS_QUERY];
   char type[DATA_MAX_NAME_LEN];
   char instance[DATA_MAX_NAME_LEN];
+  int database;
+
   redis_query_t *next;
 };
 
@@ -137,6 +139,8 @@ static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
   replace_special(rq->instance, sizeof(rq->instance));
 
+  rq->database = 0;
+
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *option = ci->children + i;
 
@@ -145,6 +149,13 @@ static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
     } else if (strcasecmp("Instance", option->key) == 0) {
       status =
           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
+    } else if (strcasecmp("Database", option->key) == 0) {
+      status = cf_util_get_int(option, &rq->database);
+      if (rq->database < 0) {
+        WARNING("redis plugin: The \"Database\" option must be positive "
+                "integer or zero");
+        status = -1;
+      }
     } else {
       WARNING("redis plugin: unknown configuration option: %s", option->key);
       status = -1;
@@ -313,6 +324,12 @@ static int redis_handle_query(redisContext *rh, redis_node_t *rn,
     return -1;
   }
 
+  if ((rr = redisCommand(rh, "SELECT %d", rq->database)) == NULL) {
+    WARNING("redis plugin: unable to switch to database `%d' on node `%s'.",
+            rq->database, rn->name);
+    return -1;
+  }
+
   if ((rr = redisCommand(rh, rq->query)) == NULL) {
     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
     return -1;
@@ -364,8 +381,8 @@ static int redis_db_stats(char *node, char const *info_line) /* {{{ */
 
   for (int db = 0; db < REDIS_DEF_DB_COUNT; db++) {
     static char buf[MAX_REDIS_VAL_SIZE];
-    static char field_name[11];
-    static char db_id[3];
+    static char field_name[12];
+    static char db_id[4];
     value_t val;
     char *str;
     int i;
index 3e5e381..7c3ebc8 100644 (file)
@@ -589,7 +589,6 @@ static int csnmp_config_add_host(oconfig_item_t *ci) {
 
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *option = ci->children + i;
-    status = 0;
 
     if (strcasecmp("Address", option->key) == 0)
       status = cf_util_get_string(option, &hd->address);
index 28ee337..444e8ea 100644 (file)
@@ -352,9 +352,8 @@ static int statsd_handle_set(char const *name, /* {{{ */
   status = c_avl_insert(metric->set, set_key, /* value = */ NULL);
   if (status < 0) {
     pthread_mutex_unlock(&metrics_lock);
-    if (status < 0)
-      ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
-            set_key, status);
+    ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
+          set_key, status);
     sfree(set_key);
     return -1;
   } else if (status > 0) /* key already exists */
index ae26c78..189d605 100644 (file)
@@ -396,7 +396,7 @@ static int tbl_result_dispatch(tbl_t *tbl, tbl_result_t *res, char **fields,
     else
       r = snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-%s",
                    res->instance_prefix, instances_str);
-    if (r >= sizeof(vl.type_instance))
+    if ((size_t)r >= sizeof(vl.type_instance))
       log_warn("Truncated type instance: %s.", vl.type_instance);
   }
 
index 2bc7e3f..68cf412 100644 (file)
@@ -1289,15 +1289,15 @@ static int allocate_counters(struct thread_data **threads,
   *cores = calloc(total_cores, sizeof(struct core_data));
   if (*cores == NULL) {
     ERROR("turbostat plugin: calloc failed");
-    sfree(threads);
+    sfree(*threads);
     return -1;
   }
 
   *packages = calloc(topology.num_packages, sizeof(struct pkg_data));
   if (*packages == NULL) {
     ERROR("turbostat plugin: calloc failed");
-    sfree(cores);
-    sfree(threads);
+    sfree(*cores);
+    sfree(*threads);
     return -1;
   }
 
diff --git a/src/utils_config_cores.c b/src/utils_config_cores.c
new file mode 100644 (file)
index 0000000..9465745
--- /dev/null
@@ -0,0 +1,372 @@
+/**
+ * collectd - src/utils_config_cores.c
+ *
+ * Copyright(c) 2018 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Kamil Wiatrowski <kamilx.wiatrowski@intel.com>
+ **/
+
+#include "collectd.h"
+
+#include "common.h"
+
+#include "utils_config_cores.h"
+
+#define UTIL_NAME "utils_config_cores"
+
+#define MAX_SOCKETS 8
+#define MAX_SOCKET_CORES 64
+#define MAX_CORES (MAX_SOCKET_CORES * MAX_SOCKETS)
+
+static inline _Bool is_in_list(unsigned val, const unsigned *list, size_t len) {
+  for (size_t i = 0; i < len; i++)
+    if (list[i] == val)
+      return 1;
+  return 0;
+}
+
+static int str_to_uint(const char *s, unsigned *n) {
+  if (s == NULL || n == NULL)
+    return -EINVAL;
+  char *endptr = NULL;
+
+  *n = (unsigned)strtoul(s, &endptr, 0);
+  if (*s == '\0' || *endptr != '\0') {
+    ERROR(UTIL_NAME ": Failed to parse '%s' into unsigned number", s);
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+/*
+ * NAME
+ *   str_list_to_nums
+ *
+ * DESCRIPTION
+ *   Converts string of characters representing list of numbers into array of
+ *   numbers. Allowed formats are:
+ *     0,1,2,3
+ *     0-10,20-18
+ *     1,3,5-8,10,0x10-12
+ *
+ *   Numbers can be in decimal or hexadecimal format.
+ *
+ * PARAMETERS
+ *   `s'         String representing list of unsigned numbers.
+ *   `nums'      Array to put converted numeric values into.
+ *   `nums_len'  Maximum number of elements that nums can accommodate.
+ *
+ * RETURN VALUE
+ *    Number of elements placed into nums.
+ */
+static size_t str_list_to_nums(char *s, unsigned *nums, size_t nums_len) {
+  char *saveptr = NULL;
+  char *token;
+  size_t idx = 0;
+
+  while ((token = strtok_r(s, ",", &saveptr))) {
+    char *pos;
+    unsigned start, end = 0;
+    s = NULL;
+
+    while (isspace(*token))
+      token++;
+    if (*token == '\0')
+      continue;
+
+    pos = strchr(token, '-');
+    if (pos) {
+      *pos = '\0';
+    }
+
+    if (str_to_uint(token, &start))
+      return 0;
+
+    if (pos) {
+      if (str_to_uint(pos + 1, &end))
+        return 0;
+    } else {
+      end = start;
+    }
+
+    if (start > end) {
+      unsigned swap = start;
+      start = end;
+      end = swap;
+    }
+
+    for (unsigned i = start; i <= end; i++) {
+      if (is_in_list(i, nums, idx))
+        continue;
+      if (idx >= nums_len) {
+        WARNING(UTIL_NAME ": exceeded the cores number limit: %" PRIsz,
+                nums_len);
+        return idx;
+      }
+      nums[idx] = i;
+      idx++;
+    }
+  }
+  return idx;
+}
+
+/*
+ * NAME
+ *   check_core_grouping
+ *
+ * DESCRIPTION
+ *   Look for [...] brackets in *in string and if found copy the
+ *   part between brackets into *out string and set grouped to 0.
+ *   Otherwise grouped is set to 1 and input is copied without leading
+ *   whitespaces.
+ *
+ * PARAMETERS
+ *   `out'       Output string to store result.
+ *   `in'        Input string to be parsed and copied.
+ *   `out_size'  Maximum number of elements that out can accommodate.
+ *   `grouped'   Set by function depending if cores should be grouped or not.
+ *
+ * RETURN VALUE
+ *    Zero upon success or non-zero if an error occurred.
+ */
+static int check_core_grouping(char *out, const char *in, size_t out_size,
+                               _Bool *grouped) {
+  const char *start = in;
+  char *end;
+  while (isspace(*start))
+    ++start;
+  if (start[0] == '[') {
+    *grouped = 0;
+    ++start;
+    end = strchr(start, ']');
+    if (end == NULL) {
+      ERROR(UTIL_NAME ": Missing closing bracket ] in option %s.", in);
+      return -EINVAL;
+    }
+    if ((size_t)(end - start) >= out_size) {
+      ERROR(UTIL_NAME ": Out buffer is too small.");
+      return -EINVAL;
+    }
+    sstrncpy(out, start, end - start + 1);
+    DEBUG(UTIL_NAME ": Mask for individual (not aggregated) cores: %s", out);
+  } else {
+    *grouped = 1;
+    sstrncpy(out, start, out_size);
+  }
+  return 0;
+}
+
+int config_cores_parse(const oconfig_item_t *ci, core_groups_list_t *cgl) {
+  if (ci == NULL || cgl == NULL)
+    return -EINVAL;
+  if (ci->values_num == 0 || ci->values_num > MAX_CORES)
+    return -EINVAL;
+  core_group_t cgroups[MAX_CORES] = {{0}};
+  size_t cg_idx = 0; /* index for cgroups array */
+  int ret = 0;
+
+  for (int i = 0; i < ci->values_num; i++) {
+    if (ci->values[i].type != OCONFIG_TYPE_STRING) {
+      WARNING(UTIL_NAME ": The %s option requires string arguments.", ci->key);
+      return -EINVAL;
+    }
+  }
+
+  if (ci->values_num == 1 && ci->values[0].value.string &&
+      strlen(ci->values[0].value.string) == 0)
+    return 0;
+
+  for (int i = 0; i < ci->values_num; i++) {
+    size_t n;
+    _Bool grouped = 1;
+    char str[DATA_MAX_NAME_LEN];
+    unsigned cores[MAX_CORES] = {0};
+
+    if (cg_idx >= STATIC_ARRAY_SIZE(cgroups)) {
+      ERROR(UTIL_NAME
+            ": Configuration exceeds maximum number of cores: %" PRIsz,
+            STATIC_ARRAY_SIZE(cgroups));
+      ret = -EINVAL;
+      goto parse_error;
+    }
+    if ((ci->values[i].value.string == NULL) ||
+        (strlen(ci->values[i].value.string) == 0)) {
+      ERROR(UTIL_NAME ": Failed to parse parameters for %s option.", ci->key);
+      ret = -EINVAL;
+      goto parse_error;
+    }
+
+    ret = check_core_grouping(str, ci->values[i].value.string, sizeof(str),
+                              &grouped);
+    if (ret != 0) {
+      ERROR(UTIL_NAME ": Failed to parse config option [%d] %s.", i,
+            ci->values[i].value.string);
+      goto parse_error;
+    }
+    n = str_list_to_nums(str, cores, STATIC_ARRAY_SIZE(cores));
+    if (n == 0) {
+      ERROR(UTIL_NAME ": Failed to parse config option [%d] %s.", i,
+            ci->values[i].value.string);
+      ret = -EINVAL;
+      goto parse_error;
+    }
+
+    if (grouped) {
+      cgroups[cg_idx].desc = strdup(ci->values[i].value.string);
+      if (cgroups[cg_idx].desc == NULL) {
+        ERROR(UTIL_NAME ": Failed to allocate description.");
+        ret = -ENOMEM;
+        goto parse_error;
+      }
+
+      cgroups[cg_idx].cores = calloc(n, sizeof(*cgroups[cg_idx].cores));
+      if (cgroups[cg_idx].cores == NULL) {
+        ERROR(UTIL_NAME ": Failed to allocate cores for cgroup.");
+        ret = -ENOMEM;
+        goto parse_error;
+      }
+
+      for (size_t j = 0; j < n; j++)
+        cgroups[cg_idx].cores[j] = cores[j];
+
+      cgroups[cg_idx].num_cores = n;
+      cg_idx++;
+    } else {
+      for (size_t j = 0; j < n && cg_idx < STATIC_ARRAY_SIZE(cgroups); j++) {
+        char desc[DATA_MAX_NAME_LEN];
+        snprintf(desc, sizeof(desc), "%u", cores[j]);
+
+        cgroups[cg_idx].desc = strdup(desc);
+        if (cgroups[cg_idx].desc == NULL) {
+          ERROR(UTIL_NAME ": Failed to allocate desc for core %u.", cores[j]);
+          ret = -ENOMEM;
+          goto parse_error;
+        }
+
+        cgroups[cg_idx].cores = calloc(1, sizeof(*(cgroups[cg_idx].cores)));
+        if (cgroups[cg_idx].cores == NULL) {
+          ERROR(UTIL_NAME ": Failed to allocate cgroup for core %u.", cores[j]);
+          ret = -ENOMEM;
+          goto parse_error;
+        }
+        cgroups[cg_idx].num_cores = 1;
+        cgroups[cg_idx].cores[0] = cores[j];
+        cg_idx++;
+      }
+    }
+  }
+
+  cgl->cgroups = calloc(cg_idx, sizeof(*cgl->cgroups));
+  if (cgl->cgroups == NULL) {
+    ERROR(UTIL_NAME ": Failed to allocate core groups.");
+    ret = -ENOMEM;
+    goto parse_error;
+  }
+
+  cgl->num_cgroups = cg_idx;
+  for (size_t i = 0; i < cg_idx; i++)
+    cgl->cgroups[i] = cgroups[i];
+
+  return 0;
+
+parse_error:
+
+  cg_idx = 0;
+  while (cg_idx < STATIC_ARRAY_SIZE(cgroups) && cgroups[cg_idx].desc != NULL) {
+    sfree(cgroups[cg_idx].desc);
+    sfree(cgroups[cg_idx].cores);
+    cg_idx++;
+  }
+  return ret;
+}
+
+int config_cores_default(int num_cores, core_groups_list_t *cgl) {
+  if (cgl == NULL || num_cores < 0 || num_cores > MAX_CORES)
+    return -EINVAL;
+
+  cgl->cgroups = calloc(num_cores, sizeof(*(cgl->cgroups)));
+  if (cgl->cgroups == NULL) {
+    ERROR(UTIL_NAME ": Failed to allocate memory for core groups.");
+    return -ENOMEM;
+  }
+  cgl->num_cgroups = num_cores;
+
+  for (int i = 0; i < num_cores; i++) {
+    char desc[DATA_MAX_NAME_LEN];
+    snprintf(desc, sizeof(desc), "%d", i);
+
+    cgl->cgroups[i].cores = calloc(1, sizeof(*(cgl->cgroups[i].cores)));
+    if (cgl->cgroups[i].cores == NULL) {
+      ERROR(UTIL_NAME ": Failed to allocate default cores for cgroup %d.", i);
+      config_cores_cleanup(cgl);
+      return -ENOMEM;
+    }
+    cgl->cgroups[i].num_cores = 1;
+    cgl->cgroups[i].cores[0] = i;
+
+    cgl->cgroups[i].desc = strdup(desc);
+    if (cgl->cgroups[i].desc == NULL) {
+      ERROR(UTIL_NAME ": Failed to allocate description for cgroup %d.", i);
+      config_cores_cleanup(cgl);
+      return -ENOMEM;
+    }
+  }
+  return 0;
+}
+
+void config_cores_cleanup(core_groups_list_t *cgl) {
+  if (cgl == NULL)
+    return;
+  for (size_t i = 0; i < cgl->num_cgroups; i++) {
+    sfree(cgl->cgroups[i].desc);
+    sfree(cgl->cgroups[i].cores);
+  }
+  sfree(cgl->cgroups);
+  cgl->num_cgroups = 0;
+}
+
+int config_cores_cmp_cgroups(const core_group_t *cg_a,
+                             const core_group_t *cg_b) {
+  size_t found = 0;
+
+  assert(cg_a != NULL);
+  assert(cg_b != NULL);
+
+  const size_t sz_a = cg_a->num_cores;
+  const size_t sz_b = cg_b->num_cores;
+  const unsigned *tab_a = cg_a->cores;
+  const unsigned *tab_b = cg_b->cores;
+
+  for (size_t i = 0; i < sz_a; i++)
+    if (is_in_list(tab_a[i], tab_b, sz_b))
+      found++;
+
+  /* if no cores are the same */
+  if (!found)
+    return 0;
+  /* if group contains same cores */
+  if (sz_a == sz_b && sz_b == found)
+    return 1;
+  /* if not all cores are the same */
+  return -1;
+}
diff --git a/src/utils_config_cores.h b/src/utils_config_cores.h
new file mode 100644 (file)
index 0000000..d45f848
--- /dev/null
@@ -0,0 +1,136 @@
+/**
+ * collectd - src/utils_config_cores.h
+ *
+ * Copyright(c) 2018 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Kamil Wiatrowski <kamilx.wiatrowski@intel.com>
+ **/
+
+#ifndef UTILS_CONFIG_CORES_H
+#define UTILS_CONFIG_CORES_H 1
+
+#include "configfile.h"
+
+#ifndef PRIsz
+#define PRIsz "zu"
+#endif /* PRIsz */
+
+struct core_group_s {
+  char *desc;
+  unsigned int *cores;
+  size_t num_cores;
+};
+typedef struct core_group_s core_group_t;
+
+struct core_groups_list_s {
+  core_group_t *cgroups;
+  size_t num_cgroups;
+};
+typedef struct core_groups_list_s core_groups_list_t;
+
+/*
+ * NAME
+ *   config_cores_parse
+ *
+ * DESCRIPTION
+ *   Convert strings from config item into list of core groups.
+ *
+ * PARAMETERS
+ *   `ci'      Pointer to config item.
+ *   `cgl'     Pointer to core groups list to be filled.
+ *
+ * RETURN VALUE
+ *    Zero upon success or non-zero if an error occurred.
+ *
+ * NOTES
+ *    In case of an error, *cgl is not modified.
+ *    Numbers can be in decimal or hexadecimal format.
+ *    The memory allocated for *cgroups in list needs to be freed
+ *    with config_cores_cleanup.
+ *
+ * EXAMPLES
+ *    If config is "0-3" "[4-15]" it means that cores 0-3 are aggregated
+ *    into one group and cores 4 to 15 are stored individualily in
+ *    separate groups. Examples of allowed formats:
+ *    "0,3,4" "10-15" - cores collected into two groups
+ *    "0" "0x3" "7" - 3 cores, each in individual group
+ *    "[32-63]" - 32 cores, each in individual group
+ *
+ *    For empty string "" *cgl is not modified and zero is returned.
+ */
+int config_cores_parse(const oconfig_item_t *ci, core_groups_list_t *cgl);
+
+/*
+ * NAME
+ *   config_cores_default
+ *
+ * DESCRIPTION
+ *   Set number of cores starting from zero into individual
+ *   core groups in *cgl list.
+ *
+ * PARAMETERS
+ *   `num_cores'  Number of cores to be configured.
+ *   `cgl'        Pointer to core groups list.
+ *
+ * RETURN VALUE
+ *    Zero upon success or non-zero if an error occurred.
+ *
+ * NOTES
+ *    The memory allocated for *cgroups in list needs to be freed
+ *    with config_cores_cleanup. In case of error the memory is
+ *    freed by the function itself.
+ */
+int config_cores_default(int num_cores, core_groups_list_t *cgl);
+
+/*
+ * NAME
+ *   config_cores_cleanup
+ *
+ * DESCRIPTION
+ *   Free the memory allocated for cgroups and set
+ *   num_cgroups to zero.
+ *
+ * PARAMETERS
+ *   `cgl'     Pointer to core groups list.
+ */
+void config_cores_cleanup(core_groups_list_t *cgl);
+
+/*
+ * NAME
+ *   config_cores_cmp_cgroups
+ *
+ * DESCRIPTION
+ *   Function to compare cores in 2 core groups.
+ *
+ * PARAMETERS
+ *   `cg_a'      Pointer to core group a.
+ *   `cg_b'      Pointer to core group b.
+ *
+ * RETURN VALUE
+ *    1 if both groups contain the same cores
+ *    0 if none of their cores match
+ *    -1 if some but not all cores match
+ */
+int config_cores_cmp_cgroups(const core_group_t *cg_a,
+                             const core_group_t *cg_b);
+
+#endif /* UTILS_CONFIG_CORES_H */
diff --git a/src/utils_config_cores_test.c b/src/utils_config_cores_test.c
new file mode 100644 (file)
index 0000000..2c6f5b6
--- /dev/null
@@ -0,0 +1,249 @@
+/**
+ * collectd - src/utils_config_cores_test.c
+ *
+ * Copyright(c) 2018 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Kamil Wiatrowski <kamilx.wiatrowski@intel.com>
+ **/
+
+#include "collectd.h"
+
+#include "testing.h"
+#include "utils_config_cores.c" /* sic */
+
+oconfig_value_t test_cfg_values[] = {{{"0"}, OCONFIG_TYPE_STRING},
+                                     {{"1-2"}, OCONFIG_TYPE_STRING},
+                                     {{"[3-4]"}, OCONFIG_TYPE_STRING}};
+
+oconfig_item_t test_cfg = {
+    "Cores", test_cfg_values, STATIC_ARRAY_SIZE(test_cfg_values), NULL, NULL,
+    0};
+
+static int compare_with_test_config(core_groups_list_t *cgl) {
+  if (cgl->num_cgroups == 4 && cgl->cgroups[0].num_cores == 1 &&
+      strcmp("0", cgl->cgroups[0].desc) == 0 && cgl->cgroups[0].cores[0] == 0 &&
+      cgl->cgroups[1].num_cores == 2 &&
+      strcmp("1-2", cgl->cgroups[1].desc) == 0 &&
+      cgl->cgroups[1].cores[0] == 1 && cgl->cgroups[1].cores[1] == 2 &&
+      cgl->cgroups[2].num_cores == 1 &&
+      strcmp("3", cgl->cgroups[2].desc) == 0 && cgl->cgroups[2].cores[0] == 3 &&
+      cgl->cgroups[3].num_cores == 1 &&
+      strcmp("4", cgl->cgroups[3].desc) == 0 && cgl->cgroups[3].cores[0] == 4)
+    return 0;
+
+  return -1;
+}
+
+DEF_TEST(string_to_uint) {
+  int ret = 0;
+  char *s = "13", *s1 = "0xd", *s2 = "g";
+  unsigned n = 0;
+
+  ret = str_to_uint(s, &n);
+  EXPECT_EQ_INT(0, ret);
+  EXPECT_EQ_INT(13, n);
+
+  ret = str_to_uint(s1, &n);
+  EXPECT_EQ_INT(0, ret);
+  EXPECT_EQ_INT(13, n);
+
+  ret = str_to_uint(s2, &n);
+  OK(ret < 0);
+
+  ret = str_to_uint(NULL, &n);
+  OK(ret < 0);
+  return 0;
+}
+
+DEF_TEST(cores_list_to_numbers) {
+  size_t n = 0;
+  unsigned nums[MAX_CORES];
+  char str[64] = "";
+
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(0, n);
+
+  strncpy(str, "1", STATIC_ARRAY_SIZE(str));
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(1, n);
+  EXPECT_EQ_INT(1, nums[0]);
+
+  strncpy(str, "0,2-3", STATIC_ARRAY_SIZE(str));
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(3, n);
+  EXPECT_EQ_INT(0, nums[0]);
+  EXPECT_EQ_INT(2, nums[1]);
+  EXPECT_EQ_INT(3, nums[2]);
+
+  strncpy(str, "11-0xa", STATIC_ARRAY_SIZE(str));
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(2, n);
+  EXPECT_EQ_INT(10, nums[0]);
+  EXPECT_EQ_INT(11, nums[1]);
+
+  snprintf(str, sizeof(str), "0-%d", (MAX_CORES - 1));
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(MAX_CORES, n);
+  EXPECT_EQ_INT(0, nums[0]);
+  EXPECT_EQ_INT(MAX_CORES - 1, nums[MAX_CORES - 1]);
+
+  /* Should return 0 for incorrect syntax. */
+  strncpy(str, "5g", STATIC_ARRAY_SIZE(str));
+  n = str_list_to_nums(str, nums, STATIC_ARRAY_SIZE(nums));
+  EXPECT_EQ_INT(0, n);
+  return 0;
+}
+
+DEF_TEST(check_grouped_cores) {
+  int ret = 0;
+  _Bool grouped;
+  char src[64] = "[5-15]";
+  char dest[64];
+
+  ret = check_core_grouping(dest, src, sizeof(dest), &grouped);
+  EXPECT_EQ_INT(0, ret);
+  EXPECT_EQ_INT(0, grouped);
+  EXPECT_EQ_STR("5-15", dest);
+
+  strncpy(src, "  5-15", STATIC_ARRAY_SIZE(src));
+  ret = check_core_grouping(dest, src, sizeof(dest), &grouped);
+  EXPECT_EQ_INT(0, ret);
+  EXPECT_EQ_INT(1, grouped);
+  EXPECT_EQ_STR("5-15", dest);
+  return 0;
+}
+
+DEF_TEST(cores_option_parse) {
+  int ret = 0;
+  core_groups_list_t cgl = {0};
+
+  ret = config_cores_parse(&test_cfg, &cgl);
+  EXPECT_EQ_INT(0, ret);
+  CHECK_NOT_NULL(cgl.cgroups);
+  EXPECT_EQ_INT(0, compare_with_test_config(&cgl));
+
+  config_cores_cleanup(&cgl);
+  return 0;
+}
+
+DEF_TEST(cores_option_parse_fail) {
+  int ret = 0;
+  core_groups_list_t cgl = {0};
+  /* Wrong value, missing closing bracket ] */
+  oconfig_value_t values = {{"[0-15"}, OCONFIG_TYPE_STRING};
+  oconfig_item_t cfg = {"Cores", &values, 1, NULL, NULL, 0};
+
+  ret = config_cores_parse(&cfg, &cgl);
+  EXPECT_EQ_INT(-EINVAL, ret);
+  EXPECT_EQ_INT(0, cgl.num_cgroups);
+  OK(NULL == cgl.cgroups);
+  return 0;
+}
+
+DEF_TEST(cores_default_list) {
+  int ret = 0;
+  core_groups_list_t cgl = {0};
+
+  ret = config_cores_default(2, &cgl);
+  EXPECT_EQ_INT(0, ret);
+  EXPECT_EQ_INT(2, cgl.num_cgroups);
+  CHECK_NOT_NULL(cgl.cgroups);
+
+  CHECK_NOT_NULL(cgl.cgroups[0].cores);
+  CHECK_NOT_NULL(cgl.cgroups[0].desc);
+  EXPECT_EQ_STR("0", cgl.cgroups[0].desc);
+  EXPECT_EQ_INT(1, cgl.cgroups[0].num_cores);
+  EXPECT_EQ_INT(0, cgl.cgroups[0].cores[0]);
+
+  CHECK_NOT_NULL(cgl.cgroups[1].cores);
+  CHECK_NOT_NULL(cgl.cgroups[1].desc);
+  EXPECT_EQ_STR("1", cgl.cgroups[1].desc);
+  EXPECT_EQ_INT(1, cgl.cgroups[1].num_cores);
+  EXPECT_EQ_INT(1, cgl.cgroups[1].cores[0]);
+
+  config_cores_cleanup(&cgl);
+  return 0;
+}
+
+DEF_TEST(cores_default_list_fail) {
+  int ret = 0;
+  core_groups_list_t cgl = {0};
+
+  ret = config_cores_default(-1, &cgl);
+  OK(ret < 0);
+  ret = config_cores_default(MAX_CORES + 1, &cgl);
+  OK(ret < 0);
+  ret = config_cores_default(1, NULL);
+  OK(ret < 0);
+  return 0;
+}
+
+DEF_TEST(cores_group_cleanup) {
+  core_groups_list_t cgl;
+  cgl.cgroups = calloc(1, sizeof(*cgl.cgroups));
+  CHECK_NOT_NULL(cgl.cgroups);
+  cgl.num_cgroups = 1;
+  cgl.cgroups[0].desc = strdup("1");
+  cgl.cgroups[0].cores = calloc(1, sizeof(*cgl.cgroups[0].cores));
+  CHECK_NOT_NULL(cgl.cgroups[0].cores);
+  cgl.cgroups[0].cores[0] = 1;
+  cgl.cgroups[0].num_cores = 1;
+
+  config_cores_cleanup(&cgl);
+  OK(NULL == cgl.cgroups);
+  EXPECT_EQ_INT(0, cgl.num_cgroups);
+  return 0;
+}
+
+DEF_TEST(cores_group_cmp) {
+  unsigned cores_mock[] = {0, 1, 2};
+  core_group_t group_mock = {"0,1,2", cores_mock, 3};
+  unsigned cores_mock_2[] = {2, 3};
+  core_group_t group_mock_2 = {"2,3", cores_mock_2, 2};
+
+  int ret = config_cores_cmp_cgroups(&group_mock, &group_mock);
+  EXPECT_EQ_INT(1, ret);
+
+  ret = config_cores_cmp_cgroups(&group_mock, &group_mock_2);
+  EXPECT_EQ_INT(-1, ret);
+
+  cores_mock_2[0] = 4;
+  ret = config_cores_cmp_cgroups(&group_mock, &group_mock_2);
+  EXPECT_EQ_INT(0, ret);
+  return 0;
+}
+
+int main(void) {
+  RUN_TEST(string_to_uint);
+  RUN_TEST(cores_list_to_numbers);
+  RUN_TEST(check_grouped_cores);
+
+  RUN_TEST(cores_group_cleanup);
+  RUN_TEST(cores_option_parse);
+  RUN_TEST(cores_option_parse_fail);
+  RUN_TEST(cores_default_list);
+  RUN_TEST(cores_default_list_fail);
+
+  RUN_TEST(cores_group_cmp);
+
+  END_TEST;
+}
index a58bc06..61fe2e4 100644 (file)
@@ -268,6 +268,7 @@ static int udb_result_submit(udb_result_t *r, /* {{{ */
     vl.meta = meta_data_create();
     if (vl.meta == NULL) {
       ERROR("db query utils:: meta_data_create failed.");
+      free(vl.values);
       return -ENOMEM;
     }
 
@@ -278,6 +279,7 @@ static int udb_result_submit(udb_result_t *r, /* {{{ */
         ERROR("db query utils:: meta_data_add_string failed.");
         meta_data_destroy(vl.meta);
         vl.meta = NULL;
+        free(vl.values);
         return status;
       }
     }
@@ -336,22 +338,20 @@ static int udb_result_prepare_result(udb_result_t const *r, /* {{{ */
   if ((r == NULL) || (prep_area == NULL))
     return -EINVAL;
 
+#if COLLECT_DEBUG
+  assert(prep_area->ds == NULL);
+  assert(prep_area->instances_pos == NULL);
+  assert(prep_area->values_pos == NULL);
+  assert(prep_area->metadata_pos == NULL);
+  assert(prep_area->instances_buffer == NULL);
+  assert(prep_area->values_buffer == NULL);
+  assert(prep_area->metadata_buffer == NULL);
+#endif
+
 #define BAIL_OUT(status)                                                       \
-  prep_area->ds = NULL;                                                        \
-  sfree(prep_area->instances_pos);                                             \
-  sfree(prep_area->values_pos);                                                \
-  sfree(prep_area->metadata_pos);                                              \
-  sfree(prep_area->instances_buffer);                                          \
-  sfree(prep_area->values_buffer);                                             \
-  sfree(prep_area->metadata_buffer);                                           \
+  udb_result_finish_result(r, prep_area);                                      \
   return (status)
 
-  /* Make sure previous preparations are cleaned up. */
-  udb_result_finish_result(r, prep_area);
-  prep_area->instances_pos = NULL;
-  prep_area->values_pos = NULL;
-  prep_area->metadata_pos = NULL;
-
   /* Read `ds' and check number of values {{{ */
   prep_area->ds = plugin_get_ds(r->type);
   if (prep_area->ds == NULL) {
@@ -929,7 +929,13 @@ int udb_query_prepare_result(udb_query_t const *q, /* {{{ */
   if ((q == NULL) || (prep_area == NULL))
     return -EINVAL;
 
-  udb_query_finish_result(q, prep_area);
+#if COLLECT_DEBUG
+  assert(prep_area->column_num == 0);
+  assert(prep_area->host == NULL);
+  assert(prep_area->plugin == NULL);
+  assert(prep_area->db_name == NULL);
+  assert(prep_area->interval == 0);
+#endif
 
   prep_area->column_num = column_num;
   prep_area->host = strdup(host);
diff --git a/src/utils_deq.h b/src/utils_deq.h
new file mode 100644 (file)
index 0000000..3182baa
--- /dev/null
@@ -0,0 +1,214 @@
+/**
+ * collectd - src/utils_deq.h
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Andy Smith <ansmith@redhat.com>
+ */
+
+#ifndef utils_deq_h
+#define utils_deq_h 1
+
+#include <assert.h>
+#include <memory.h>
+#include <stdlib.h>
+
+#define CT_ASSERT(exp)                                                         \
+  { assert(exp); }
+
+#define NEW(t) (t *)malloc(sizeof(t))
+#define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n))
+#define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n))
+
+#define ZERO(p) memset(p, 0, sizeof(*p))
+
+#define DEQ_DECLARE(i, d)                                                      \
+  typedef struct {                                                             \
+    i *head;                                                                   \
+    i *tail;                                                                   \
+    i *scratch;                                                                \
+    size_t size;                                                               \
+  } d
+
+#define DEQ_LINKS_N(n, t)                                                      \
+  t *prev##n;                                                                  \
+  t *next##n
+#define DEQ_LINKS(t) DEQ_LINKS_N(, t)
+#define DEQ_EMPTY                                                              \
+  { 0, 0, 0, 0 }
+
+#define DEQ_INIT(d)                                                            \
+  do {                                                                         \
+    (d).head = 0;                                                              \
+    (d).tail = 0;                                                              \
+    (d).scratch = 0;                                                           \
+    (d).size = 0;                                                              \
+  } while (0)
+#define DEQ_IS_EMPTY(d) ((d).head == 0)
+#define DEQ_ITEM_INIT_N(n, i)                                                  \
+  do {                                                                         \
+    (i)->next##n = 0;                                                          \
+    (i)->prev##n = 0;                                                          \
+  } while (0)
+#define DEQ_ITEM_INIT(i) DEQ_ITEM_INIT_N(, i)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
+#define DEQ_NEXT_N(n, i) (i)->next##n
+#define DEQ_NEXT(i) DEQ_NEXT_N(, i)
+#define DEQ_PREV_N(n, i) (i)->prev##n
+#define DEQ_PREV(i) DEQ_PREV_N(, i)
+#define DEQ_MOVE(d1, d2)                                                       \
+  do {                                                                         \
+    d2 = d1;                                                                   \
+    DEQ_INIT(d1);                                                              \
+  } while (0)
+/**
+ *@pre ptr points to first element of deq
+ *@post ptr points to first element of deq that passes test, or 0. Test should
+ *involve ptr.
+ */
+#define DEQ_FIND_N(n, ptr, test)                                               \
+  while ((ptr) && !(test))                                                     \
+    ptr = DEQ_NEXT_N(n, ptr);
+#define DEQ_FIND(ptr, test) DEQ_FIND_N(, ptr, test)
+
+#define DEQ_INSERT_HEAD_N(n, d, i)                                             \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    if ((d).head) {                                                            \
+      (i)->next##n = (d).head;                                                 \
+      (d).head->prev##n = i;                                                   \
+    } else {                                                                   \
+      (d).tail = i;                                                            \
+      (i)->next##n = 0;                                                        \
+      CT_ASSERT((d).size == 0);                                                \
+    }                                                                          \
+    (i)->prev##n = 0;                                                          \
+    (d).head = i;                                                              \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_HEAD(d, i) DEQ_INSERT_HEAD_N(, d, i)
+
+#define DEQ_INSERT_TAIL_N(n, d, i)                                             \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    if ((d).tail) {                                                            \
+      (i)->prev##n = (d).tail;                                                 \
+      (d).tail->next##n = i;                                                   \
+    } else {                                                                   \
+      (d).head = i;                                                            \
+      (i)->prev##n = 0;                                                        \
+      CT_ASSERT((d).size == 0);                                                \
+    }                                                                          \
+    (i)->next##n = 0;                                                          \
+    (d).tail = i;                                                              \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_TAIL(d, i) DEQ_INSERT_TAIL_N(, d, i)
+
+#define DEQ_REMOVE_HEAD_N(n, d)                                                \
+  do {                                                                         \
+    CT_ASSERT((d).head);                                                       \
+    if ((d).head) {                                                            \
+      (d).scratch = (d).head;                                                  \
+      (d).head = (d).head->next##n;                                            \
+      if ((d).head == 0) {                                                     \
+        (d).tail = 0;                                                          \
+        CT_ASSERT((d).size == 1);                                              \
+      } else                                                                   \
+        (d).head->prev##n = 0;                                                 \
+      (d).size--;                                                              \
+      (d).scratch->next##n = 0;                                                \
+      (d).scratch->prev##n = 0;                                                \
+    }                                                                          \
+  } while (0)
+#define DEQ_REMOVE_HEAD(d) DEQ_REMOVE_HEAD_N(, d)
+
+#define DEQ_REMOVE_TAIL_N(n, d)                                                \
+  do {                                                                         \
+    CT_ASSERT((d).tail);                                                       \
+    if ((d).tail) {                                                            \
+      (d).scratch = (d).tail;                                                  \
+      (d).tail = (d).tail->prev##n;                                            \
+      if ((d).tail == 0) {                                                     \
+        (d).head = 0;                                                          \
+        CT_ASSERT((d).size == 1);                                              \
+      } else                                                                   \
+        (d).tail->next##n = 0;                                                 \
+      (d).size--;                                                              \
+      (d).scratch->next##n = 0;                                                \
+      (d).scratch->prev##n = 0;                                                \
+    }                                                                          \
+  } while (0)
+#define DEQ_REMOVE_TAIL(d) DEQ_REMOVE_TAIL_N(, d)
+
+#define DEQ_INSERT_AFTER_N(n, d, i, a)                                         \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    CT_ASSERT(a);                                                              \
+    if ((a)->next##n)                                                          \
+      (a)->next##n->prev##n = (i);                                             \
+    else                                                                       \
+      (d).tail = (i);                                                          \
+    (i)->next##n = (a)->next##n;                                               \
+    (i)->prev##n = (a);                                                        \
+    (a)->next##n = (i);                                                        \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_AFTER(d, i, a) DEQ_INSERT_AFTER_N(, d, i, a)
+
+#define DEQ_REMOVE_N(n, d, i)                                                  \
+  do {                                                                         \
+    if ((i)->next##n)                                                          \
+      (i)->next##n->prev##n = (i)->prev##n;                                    \
+    else                                                                       \
+      (d).tail = (i)->prev##n;                                                 \
+    if ((i)->prev##n)                                                          \
+      (i)->prev##n->next##n = (i)->next##n;                                    \
+    else                                                                       \
+      (d).head = (i)->next##n;                                                 \
+    CT_ASSERT((d).size > 0);                                                   \
+    (d).size--;                                                                \
+    (i)->next##n = 0;                                                          \
+    (i)->prev##n = 0;                                                          \
+    CT_ASSERT((d).size || (!(d).head && !(d).tail));                           \
+  } while (0)
+#define DEQ_REMOVE(d, i) DEQ_REMOVE_N(, d, i)
+
+#define DEQ_APPEND_N(n, d1, d2)                                                \
+  do {                                                                         \
+    if (!(d1).head)                                                            \
+      (d1) = (d2);                                                             \
+    else if ((d2).head) {                                                      \
+      (d1).tail->next##n = (d2).head;                                          \
+      (d2).head->prev##n = (d1).tail;                                          \
+      (d1).tail = (d2).tail;                                                   \
+      (d1).size += (d2).size;                                                  \
+    }                                                                          \
+    DEQ_INIT(d2);                                                              \
+  } while (0)
+#define DEQ_APPEND(d1, d2) DEQ_APPEND_N(, d1, d2)
+
+#endif
index 3b7236c..4ca86ae 100644 (file)
@@ -1051,6 +1051,8 @@ ovs_db_t *ovs_db_init(const char *node, const char *service,
     ret = ovs_db_destroy(pdb);
     if (ret > 0)
       goto failure;
+    else
+      return NULL;
   }
 
   /* init polling thread */
@@ -1059,6 +1061,8 @@ ovs_db_t *ovs_db_init(const char *node, const char *service,
     if (ret > 0) {
       ovs_db_event_thread_data_destroy(pdb);
       goto failure;
+    } else {
+      return NULL;
     }
   }
   return pdb;
diff --git a/src/valgrind.suppress b/src/valgrind.suppress
new file mode 100644 (file)
index 0000000..f4c3f34
--- /dev/null
@@ -0,0 +1,7 @@
+{
+   libnl1_virt_initialization_unpreventable_leak
+   Memcheck:Leak
+   ...
+   obj:*libnl.so.1.*
+   ...
+}
\ No newline at end of file
index e97fd84..c6280d1 100644 (file)
 #include <libxml/tree.h>
 #include <libxml/xpath.h>
 #include <libxml/xpathInternals.h>
+#include <stdbool.h>
 
 /* Plugin name */
 #define PLUGIN_NAME "virt"
 
+/* Secure strcat macro assuring null termination. Parameter (n) is the size of
+   buffer (d), allowing this macro to be safe for static and dynamic buffers */
+#define SSTRNCAT(d, s, n)                                                      \
+  do {                                                                         \
+    size_t _l = strlen(d);                                                     \
+    sstrncpy((d) + _l, (s), (n)-_l);                                           \
+  } while (0)
+
 #ifdef LIBVIR_CHECK_VERSION
 
 #if LIBVIR_CHECK_VERSION(0, 9, 2)
 #define HAVE_DOM_REASON_RUNNING_WAKEUP 1
 #endif
 
+/*
+  virConnectListAllDomains() appeared in 0.10.2
+  Note that LIBVIR_CHECK_VERSION appeared a year later, so
+  in some systems which actually have virConnectListAllDomains()
+  we can't detect this.
+ */
+#if LIBVIR_CHECK_VERSION(0, 10, 2)
+#define HAVE_LIST_ALL_DOMAINS 1
+#endif
+
 #if LIBVIR_CHECK_VERSION(1, 0, 1)
 #define HAVE_DOM_REASON_PAUSED_SNAPSHOT 1
 #endif
 
 #endif /* LIBVIR_CHECK_VERSION */
 
+/* structure used for aggregating notification-thread data*/
+typedef struct virt_notif_thread_s {
+  pthread_t event_loop_tid;
+  int domain_event_cb_id;
+  pthread_mutex_t active_mutex; /* protects 'is_active' member access*/
+  bool is_active;
+} virt_notif_thread_t;
+
 static const char *config_keys[] = {"Connection",
 
                                     "RefreshInterval",
@@ -108,8 +135,15 @@ static const char *config_keys[] = {"Connection",
 
                                     "Instances",
                                     "ExtraStats",
+                                    "PersistentNotification",
                                     NULL};
 
+/* PersistentNotification is false by default */
+static bool persistent_notification = false;
+
+/* Thread used for handling libvirt notifications events */
+static virt_notif_thread_t notif_thread;
+
 const char *domain_states[] = {
         [VIR_DOMAIN_NOSTATE] = "no state",
         [VIR_DOMAIN_RUNNING] = "the domain is running",
@@ -124,7 +158,202 @@ const char *domain_states[] = {
 #endif
 };
 
+static int map_domain_event_to_state(int event) {
+  int ret;
+  switch (event) {
+  case VIR_DOMAIN_EVENT_STARTED:
+    ret = VIR_DOMAIN_RUNNING;
+    break;
+  case VIR_DOMAIN_EVENT_SUSPENDED:
+    ret = VIR_DOMAIN_PAUSED;
+    break;
+  case VIR_DOMAIN_EVENT_RESUMED:
+    ret = VIR_DOMAIN_RUNNING;
+    break;
+  case VIR_DOMAIN_EVENT_STOPPED:
+    ret = VIR_DOMAIN_SHUTOFF;
+    break;
+  case VIR_DOMAIN_EVENT_SHUTDOWN:
+    ret = VIR_DOMAIN_SHUTDOWN;
+    break;
+#ifdef HAVE_DOM_STATE_PMSUSPENDED
+  case VIR_DOMAIN_EVENT_PMSUSPENDED:
+    ret = VIR_DOMAIN_PMSUSPENDED;
+    break;
+#endif
+#ifdef HAVE_DOM_REASON_CRASHED
+  case VIR_DOMAIN_EVENT_CRASHED:
+    ret = VIR_DOMAIN_CRASHED;
+    break;
+#endif
+  default:
+    ret = VIR_DOMAIN_NOSTATE;
+  }
+  return ret;
+}
+
 #ifdef HAVE_DOM_REASON
+static int map_domain_event_detail_to_reason(int event, int detail) {
+  int ret;
+  switch (event) {
+  case VIR_DOMAIN_EVENT_STARTED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_STARTED_BOOTED: /* Normal startup from boot */
+      ret = VIR_DOMAIN_RUNNING_BOOTED;
+      break;
+    case VIR_DOMAIN_EVENT_STARTED_MIGRATED: /* Incoming migration from another
+                                               host */
+      ret = VIR_DOMAIN_RUNNING_MIGRATED;
+      break;
+    case VIR_DOMAIN_EVENT_STARTED_RESTORED: /* Restored from a state file */
+      ret = VIR_DOMAIN_RUNNING_RESTORED;
+      break;
+    case VIR_DOMAIN_EVENT_STARTED_FROM_SNAPSHOT: /* Restored from snapshot */
+      ret = VIR_DOMAIN_RUNNING_FROM_SNAPSHOT;
+      break;
+#ifdef HAVE_DOM_REASON_RUNNING_WAKEUP
+    case VIR_DOMAIN_EVENT_STARTED_WAKEUP: /* Started due to wakeup event */
+      ret = VIR_DOMAIN_RUNNING_WAKEUP;
+      break;
+#endif
+    default:
+      ret = VIR_DOMAIN_RUNNING_UNKNOWN;
+    }
+    break;
+  case VIR_DOMAIN_EVENT_SUSPENDED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_SUSPENDED_PAUSED: /* Normal suspend due to admin
+                                               pause */
+      ret = VIR_DOMAIN_PAUSED_USER;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED: /* Suspended for offline
+                                                 migration */
+      ret = VIR_DOMAIN_PAUSED_MIGRATION;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_IOERROR: /* Suspended due to a disk I/O
+                                                error */
+      ret = VIR_DOMAIN_PAUSED_IOERROR;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_WATCHDOG: /* Suspended due to a watchdog
+                                                 firing */
+      ret = VIR_DOMAIN_PAUSED_WATCHDOG;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_RESTORED: /* Restored from paused state
+                                                 file */
+      ret = VIR_DOMAIN_PAUSED_UNKNOWN;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_FROM_SNAPSHOT: /* Restored from paused
+                                                      snapshot */
+      ret = VIR_DOMAIN_PAUSED_FROM_SNAPSHOT;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_API_ERROR: /* Suspended after failure during
+                                                  libvirt API call */
+      ret = VIR_DOMAIN_PAUSED_UNKNOWN;
+      break;
+#ifdef HAVE_DOM_REASON_POSTCOPY
+    case VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY: /* Suspended for post-copy
+                                                 migration */
+      ret = VIR_DOMAIN_PAUSED_POSTCOPY;
+      break;
+    case VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY_FAILED: /* Suspended after failed
+                                                        post-copy */
+      ret = VIR_DOMAIN_PAUSED_POSTCOPY_FAILED;
+      break;
+#endif
+    default:
+      ret = VIR_DOMAIN_PAUSED_UNKNOWN;
+    }
+    break;
+  case VIR_DOMAIN_EVENT_RESUMED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_RESUMED_UNPAUSED: /* Normal resume due to admin
+                                               unpause */
+      ret = VIR_DOMAIN_RUNNING_UNPAUSED;
+      break;
+    case VIR_DOMAIN_EVENT_RESUMED_MIGRATED: /* Resumed for completion of
+                                               migration */
+      ret = VIR_DOMAIN_RUNNING_MIGRATED;
+      break;
+    case VIR_DOMAIN_EVENT_RESUMED_FROM_SNAPSHOT: /* Resumed from snapshot */
+      ret = VIR_DOMAIN_RUNNING_FROM_SNAPSHOT;
+      break;
+#ifdef HAVE_DOM_REASON_POSTCOPY
+    case VIR_DOMAIN_EVENT_RESUMED_POSTCOPY: /* Resumed, but migration is still
+                                               running in post-copy mode */
+      ret = VIR_DOMAIN_RUNNING_POSTCOPY;
+      break;
+#endif
+    default:
+      ret = VIR_DOMAIN_RUNNING_UNKNOWN;
+    }
+    break;
+  case VIR_DOMAIN_EVENT_STOPPED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_STOPPED_SHUTDOWN: /* Normal shutdown */
+      ret = VIR_DOMAIN_SHUTOFF_SHUTDOWN;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_DESTROYED: /* Forced poweroff from host */
+      ret = VIR_DOMAIN_SHUTOFF_DESTROYED;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_CRASHED: /* Guest crashed */
+      ret = VIR_DOMAIN_SHUTOFF_CRASHED;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_MIGRATED: /* Migrated off to another host */
+      ret = VIR_DOMAIN_SHUTOFF_MIGRATED;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_SAVED: /* Saved to a state file */
+      ret = VIR_DOMAIN_SHUTOFF_SAVED;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_FAILED: /* Host emulator/mgmt failed */
+      ret = VIR_DOMAIN_SHUTOFF_FAILED;
+      break;
+    case VIR_DOMAIN_EVENT_STOPPED_FROM_SNAPSHOT: /* Offline snapshot loaded */
+      ret = VIR_DOMAIN_SHUTOFF_FROM_SNAPSHOT;
+      break;
+    default:
+      ret = VIR_DOMAIN_SHUTOFF_UNKNOWN;
+    }
+    break;
+  case VIR_DOMAIN_EVENT_SHUTDOWN:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_SHUTDOWN_FINISHED: /* Guest finished shutdown
+                                                sequence */
+      ret = VIR_DOMAIN_SHUTDOWN_USER;
+      break;
+    default:
+      ret = VIR_DOMAIN_SHUTDOWN_UNKNOWN;
+    }
+    break;
+#ifdef HAVE_DOM_STATE_PMSUSPENDED
+  case VIR_DOMAIN_EVENT_PMSUSPENDED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_PMSUSPENDED_MEMORY: /* Guest was PM suspended to
+                                                 memory */
+      ret = VIR_DOMAIN_PMSUSPENDED_UNKNOWN;
+      break;
+    case VIR_DOMAIN_EVENT_PMSUSPENDED_DISK: /* Guest was PM suspended to disk */
+      ret = VIR_DOMAIN_PMSUSPENDED_DISK_UNKNOWN;
+      break;
+    default:
+      ret = VIR_DOMAIN_PMSUSPENDED_UNKNOWN;
+    }
+    break;
+#endif
+  case VIR_DOMAIN_EVENT_CRASHED:
+    switch (detail) {
+    case VIR_DOMAIN_EVENT_CRASHED_PANICKED: /* Guest was panicked */
+      ret = VIR_DOMAIN_CRASHED_PANICKED;
+      break;
+    default:
+      ret = VIR_DOMAIN_CRASHED_UNKNOWN;
+    }
+    break;
+  default:
+    ret = VIR_DOMAIN_NOSTATE_UNKNOWN;
+  }
+  return ret;
+}
+
 #define DOMAIN_STATE_REASON_MAX_SIZE 20
 const char *domain_reasons[][DOMAIN_STATE_REASON_MAX_SIZE] = {
         [VIR_DOMAIN_NOSTATE][VIR_DOMAIN_NOSTATE_UNKNOWN] =
@@ -158,7 +387,6 @@ const char *domain_reasons[][DOMAIN_STATE_REASON_MAX_SIZE] = {
         [VIR_DOMAIN_RUNNING][VIR_DOMAIN_RUNNING_POSTCOPY] =
             "running in post-copy migration mode",
 #endif
-
         [VIR_DOMAIN_BLOCKED][VIR_DOMAIN_BLOCKED_UNKNOWN] =
             "the reason is unknown",
 
@@ -198,7 +426,6 @@ const char *domain_reasons[][DOMAIN_STATE_REASON_MAX_SIZE] = {
         [VIR_DOMAIN_PAUSED][VIR_DOMAIN_PAUSED_POSTCOPY_FAILED] =
             "paused after failed post-copy",
 #endif
-
         [VIR_DOMAIN_SHUTDOWN][VIR_DOMAIN_SHUTDOWN_UNKNOWN] =
             "the reason is unknown",
         [VIR_DOMAIN_SHUTDOWN][VIR_DOMAIN_SHUTDOWN_USER] =
@@ -278,6 +505,7 @@ struct interface_device {
 typedef struct domain_s {
   virDomainPtr ptr;
   virDomainInfo info;
+  bool active;
 } domain_t;
 
 struct lv_read_state {
@@ -293,7 +521,8 @@ struct lv_read_state {
 };
 
 static void free_domains(struct lv_read_state *state);
-static int add_domain(struct lv_read_state *state, virDomainPtr dom);
+static int add_domain(struct lv_read_state *state, virDomainPtr dom,
+                      bool active);
 
 static void free_block_devices(struct lv_read_state *state);
 static int add_block_device(struct lv_read_state *state, virDomainPtr dom,
@@ -533,7 +762,6 @@ static int lv_domain_info(virDomainPtr dom, struct lv_info *info) {
 }
 
 static void init_value_list(value_list_t *vl, virDomainPtr dom) {
-  int n;
   const char *name;
   char uuid[VIR_UUID_STRING_BUFLEN];
 
@@ -546,44 +774,34 @@ static void init_value_list(value_list_t *vl, virDomainPtr dom) {
     if (hostname_format[i] == hf_none)
       continue;
 
-    n = DATA_MAX_NAME_LEN - strlen(vl->host) - 2;
-
-    if (i > 0 && n >= 1) {
-      strncat(vl->host, ":", 1);
-      n--;
-    }
+    if (i > 0)
+      SSTRNCAT(vl->host, ":", sizeof(vl->host));
 
     switch (hostname_format[i]) {
     case hf_none:
       break;
     case hf_hostname:
-      strncat(vl->host, hostname_g, n);
+      SSTRNCAT(vl->host, hostname_g, sizeof(vl->host));
       break;
     case hf_name:
       name = virDomainGetName(dom);
       if (name)
-        strncat(vl->host, name, n);
+        SSTRNCAT(vl->host, name, sizeof(vl->host));
       break;
     case hf_uuid:
       if (virDomainGetUUIDString(dom, uuid) == 0)
-        strncat(vl->host, uuid, n);
+        SSTRNCAT(vl->host, uuid, sizeof(vl->host));
       break;
     }
   }
 
-  vl->host[sizeof(vl->host) - 1] = '\0';
-
   /* Construct the plugin instance field according to PluginInstanceFormat. */
   for (int i = 0; i < PLGINST_MAX_FIELDS; ++i) {
     if (plugin_instance_format[i] == plginst_none)
       continue;
 
-    n = sizeof(vl->plugin_instance) - strlen(vl->plugin_instance) - 2;
-
-    if (i > 0 && n >= 1) {
-      strncat(vl->plugin_instance, ":", 1);
-      n--;
-    }
+    if (i > 0)
+      SSTRNCAT(vl->plugin_instance, ":", sizeof(vl->plugin_instance));
 
     switch (plugin_instance_format[i]) {
     case plginst_none:
@@ -591,17 +809,15 @@ static void init_value_list(value_list_t *vl, virDomainPtr dom) {
     case plginst_name:
       name = virDomainGetName(dom);
       if (name)
-        strncat(vl->plugin_instance, name, n);
+        SSTRNCAT(vl->plugin_instance, name, sizeof(vl->plugin_instance));
       break;
     case plginst_uuid:
       if (virDomainGetUUIDString(dom, uuid) == 0)
-        strncat(vl->plugin_instance, uuid, n);
+        SSTRNCAT(vl->plugin_instance, uuid, sizeof(vl->plugin_instance));
       break;
     }
   }
 
-  vl->plugin_instance[sizeof(vl->plugin_instance) - 1] = '\0';
-
 } /* void init_value_list */
 
 static int init_notif(notification_t *notif, const virDomainPtr domain,
@@ -799,9 +1015,8 @@ static unsigned int parse_ex_stats_flags(char **exstats, int numexstats) {
   return ex_stats_flags;
 }
 
-static void domain_state_submit(virDomainPtr dom, int state, int reason) {
-
-  if ((state < 0) || (state >= STATIC_ARRAY_SIZE(domain_states))) {
+static void domain_state_submit_notif(virDomainPtr dom, int state, int reason) {
+  if ((state < 0) || ((size_t)state >= STATIC_ARRAY_SIZE(domain_states))) {
     ERROR(PLUGIN_NAME ": Array index out of bounds: state=%d", state);
     return;
   }
@@ -809,7 +1024,7 @@ static void domain_state_submit(virDomainPtr dom, int state, int reason) {
   char msg[DATA_MAX_NAME_LEN];
   const char *state_str = domain_states[state];
 #ifdef HAVE_DOM_REASON
-  if ((reason < 0) || (reason >= STATIC_ARRAY_SIZE(domain_reasons[0]))) {
+  if ((reason < 0) || ((size_t)reason >= STATIC_ARRAY_SIZE(domain_reasons[0]))) {
     ERROR(PLUGIN_NAME ": Array index out of bounds: reason=%d", reason);
     return;
   }
@@ -908,7 +1123,7 @@ static int lv_config(const char *key, const char *value) {
     return 0;
   }
   if (strcasecmp(key, "BlockDeviceFormatBasename") == 0) {
-    blockdevice_format_basename = IS_TRUE(value);
+    blockdevice_format_basename = IS_TRUE(value) ? true : false;
     return 0;
   }
   if (strcasecmp(key, "InterfaceDevice") == 0) {
@@ -1070,6 +1285,11 @@ static int lv_config(const char *key, const char *value) {
     }
   }
 
+  if (strcasecmp(key, "PersistentNotification") == 0) {
+    persistent_notification = IS_TRUE(value);
+    return 0;
+  }
+
   /* Unrecognised option. */
   return -1;
 }
@@ -1223,6 +1443,15 @@ static int get_vcpu_stats(virDomainPtr domain, unsigned short nr_virt_cpu) {
 }
 
 #ifdef HAVE_DOM_REASON
+
+static void domain_state_submit(virDomainPtr dom, int state, int reason) {
+  value_t values[] = {
+      {.gauge = (gauge_t)state}, {.gauge = (gauge_t)reason},
+  };
+
+  submit(dom, "domain_state", NULL, values, STATIC_ARRAY_SIZE(values));
+}
+
 static int get_domain_state(virDomainPtr domain) {
   int domain_state = 0;
   int domain_reason = 0;
@@ -1235,8 +1464,28 @@ static int get_domain_state(virDomainPtr domain) {
   }
 
   domain_state_submit(domain, domain_state, domain_reason);
+
   return status;
 }
+
+#ifdef HAVE_LIST_ALL_DOMAINS
+static int get_domain_state_notify(virDomainPtr domain) {
+  int domain_state = 0;
+  int domain_reason = 0;
+
+  int status = virDomainGetState(domain, &domain_state, &domain_reason, 0);
+  if (status != 0) {
+    ERROR(PLUGIN_NAME " plugin: virDomainGetState failed with status %i.",
+          status);
+    return status;
+  }
+
+  if (persistent_notification)
+    domain_state_submit_notif(domain, domain_state, domain_reason);
+
+  return status;
+}
+#endif /* HAVE_LIST_ALL_DOMAINS */
 #endif /* HAVE_DOM_REASON */
 
 static int get_memory_stats(virDomainPtr domain) {
@@ -1335,7 +1584,7 @@ static int get_block_stats(struct block_device *block_dev) {
 
 #define NM_ADD_STR_ITEMS(_items, _size)                                        \
   do {                                                                         \
-    for (int _i = 0; _i < _size; ++_i) {                                       \
+    for (size_t _i = 0; _i < _size; ++_i) {                                       \
       DEBUG(PLUGIN_NAME                                                        \
             " plugin: Adding notification metadata name=%s value=%s",          \
             _items[_i].name, _items[_i].value);                                \
@@ -1360,7 +1609,7 @@ static int fs_info_notify(virDomainPtr domain, virDomainFSInfoPtr fs_info) {
       {.name = "name", .value = fs_info->name},
       {.name = "fstype", .value = fs_info->fstype}};
 
-  for (int i = 0; i < fs_info->ndevAlias; ++i) {
+  for (size_t i = 0; i < fs_info->ndevAlias; ++i) {
     fs_dev_alias[i].name = "devAlias";
     fs_dev_alias[i].value = fs_info->devAlias[i];
   }
@@ -1488,10 +1737,6 @@ static int get_domain_metrics(domain_t *domain) {
      * We need to get it from virDomainGetState.
      */
     GET_STATS(get_domain_state, "domain reason", domain->ptr);
-#else
-    /* virDomainGetState is not available. Submit 0, which corresponds to
-     * unknown reason. */
-    domain_state_submit(domain->ptr, info.di.state, 0);
 #endif
   }
 
@@ -1530,6 +1775,7 @@ static int get_domain_metrics(domain_t *domain) {
 
   /* Update cached virDomainInfo. It has to be done after cpu_submit */
   memcpy(&domain->info, &info.di, sizeof(domain->info));
+
   return 0;
 }
 
@@ -1578,6 +1824,192 @@ static int get_if_dev_stats(struct interface_device *if_dev) {
   return 0;
 }
 
+static int domain_lifecycle_event_cb(__attribute__((unused)) virConnectPtr conn,
+                                     virDomainPtr dom, int event, int detail,
+                                     __attribute__((unused)) void *opaque) {
+  int domain_state = map_domain_event_to_state(event);
+  int domain_reason = 0; /* 0 means UNKNOWN reason for any state */
+#ifdef HAVE_DOM_REASON
+  domain_reason = map_domain_event_detail_to_reason(event, detail);
+#endif
+  domain_state_submit_notif(dom, domain_state, domain_reason);
+
+  return 0;
+}
+
+static int register_event_impl(void) {
+  if (virEventRegisterDefaultImpl() < 0) {
+    virErrorPtr err = virGetLastError();
+    ERROR(PLUGIN_NAME
+          " plugin: error while event implementation registering: %s",
+          err && err->message ? err->message : "Unknown error");
+    return -1;
+  }
+
+  return 0;
+}
+
+static void virt_notif_thread_set_active(virt_notif_thread_t *thread_data,
+                                         const bool active) {
+  assert(thread_data != NULL);
+  pthread_mutex_lock(&thread_data->active_mutex);
+  thread_data->is_active = active;
+  pthread_mutex_unlock(&thread_data->active_mutex);
+}
+
+static bool virt_notif_thread_is_active(virt_notif_thread_t *thread_data) {
+  bool active = false;
+
+  assert(thread_data != NULL);
+  pthread_mutex_lock(&thread_data->active_mutex);
+  active = thread_data->is_active;
+  pthread_mutex_unlock(&thread_data->active_mutex);
+
+  return active;
+}
+
+/* worker function running default event implementation */
+static void *event_loop_worker(void *arg) {
+  virt_notif_thread_t *thread_data = (virt_notif_thread_t *)arg;
+
+  while (virt_notif_thread_is_active(thread_data)) {
+    if (virEventRunDefaultImpl() < 0) {
+      virErrorPtr err = virGetLastError();
+      ERROR(PLUGIN_NAME " plugin: failed to run event loop: %s\n",
+            err && err->message ? err->message : "Unknown error");
+    }
+  }
+
+  return NULL;
+}
+
+static int virt_notif_thread_init(virt_notif_thread_t *thread_data) {
+  int ret;
+
+  assert(thread_data != NULL);
+  ret = pthread_mutex_init(&thread_data->active_mutex, NULL);
+  if (ret != 0) {
+    ERROR(PLUGIN_NAME ": Failed to initialize mutex, err %u", ret);
+    return ret;
+  }
+
+  /**
+   * '0' and positive integers are meaningful ID's, therefore setting
+   * domain_event_cb_id to '-1'
+   */
+  thread_data->domain_event_cb_id = -1;
+  pthread_mutex_lock(&thread_data->active_mutex);
+  thread_data->is_active = false;
+  pthread_mutex_unlock(&thread_data->active_mutex);
+
+  return 0;
+}
+
+/* register domain event callback and start event loop thread */
+static int start_event_loop(virt_notif_thread_t *thread_data) {
+  assert(thread_data != NULL);
+  thread_data->domain_event_cb_id = virConnectDomainEventRegisterAny(
+      conn, NULL, VIR_DOMAIN_EVENT_ID_LIFECYCLE,
+      VIR_DOMAIN_EVENT_CALLBACK(domain_lifecycle_event_cb), NULL, NULL);
+  if (thread_data->domain_event_cb_id == -1) {
+    ERROR(PLUGIN_NAME " plugin: error while callback registering");
+    return -1;
+  }
+
+  virt_notif_thread_set_active(thread_data, 1);
+  if (pthread_create(&thread_data->event_loop_tid, NULL, event_loop_worker,
+                     thread_data)) {
+    ERROR(PLUGIN_NAME " plugin: failed event loop thread creation");
+    virConnectDomainEventDeregisterAny(conn, thread_data->domain_event_cb_id);
+    return -1;
+  }
+
+  return 0;
+}
+
+/* stop event loop thread and deregister callback */
+static void stop_event_loop(virt_notif_thread_t *thread_data) {
+  /* stopping loop and de-registering event handler*/
+  virt_notif_thread_set_active(thread_data, 0);
+  if (conn != NULL && thread_data->domain_event_cb_id != -1)
+    virConnectDomainEventDeregisterAny(conn, thread_data->domain_event_cb_id);
+
+  if (pthread_join(notif_thread.event_loop_tid, NULL) != 0)
+    ERROR(PLUGIN_NAME " plugin: stopping notification thread failed");
+}
+
+static int persistent_domains_state_notification(void) {
+  int status = 0;
+  int n;
+#ifdef HAVE_LIST_ALL_DOMAINS
+  virDomainPtr *domains = NULL;
+  n = virConnectListAllDomains(conn, &domains,
+                               VIR_CONNECT_LIST_DOMAINS_PERSISTENT);
+  if (n < 0) {
+    VIRT_ERROR(conn, "reading list of persistent domains");
+    status = -1;
+  } else {
+    DEBUG(PLUGIN_NAME " plugin: getting state of %i persistent domains", n);
+    /* Fetch each persistent domain's state and notify it */
+    int n_notified = n;
+    for (int i = 0; i < n; ++i) {
+      status = get_domain_state_notify(domains[i]);
+      if (status != 0) {
+        n_notified--;
+        ERROR(PLUGIN_NAME " plugin: could not notify state of domain %s",
+              virDomainGetName(domains[i]));
+      }
+      virDomainFree(domains[i]);
+    }
+
+    sfree(domains);
+    DEBUG(PLUGIN_NAME " plugin: notified state of %i persistent domains",
+          n_notified);
+  }
+#else
+  n = virConnectNumOfDomains(conn);
+  if (n > 0) {
+    int *domids;
+    /* Get list of domains. */
+    domids = calloc(n, sizeof(*domids));
+    if (domids == NULL) {
+      ERROR(PLUGIN_NAME " plugin: calloc failed.");
+      return -1;
+    }
+    n = virConnectListDomains(conn, domids, n);
+    if (n < 0) {
+      VIRT_ERROR(conn, "reading list of domains");
+      sfree(domids);
+      return -1;
+    }
+    /* Fetch info of each active domain and notify it */
+    for (int i = 0; i < n; ++i) {
+      virDomainInfo info;
+      virDomainPtr dom = NULL;
+      dom = virDomainLookupByID(conn, domids[i]);
+      if (dom == NULL) {
+        VIRT_ERROR(conn, "virDomainLookupByID");
+        /* Could be that the domain went away -- ignore it anyway. */
+        continue;
+      }
+      status = virDomainGetInfo(dom, &info);
+      if (status == 0)
+        /* virDomainGetState is not available. Submit 0, which corresponds to
+         * unknown reason. */
+        domain_state_submit_notif(dom, info.state, 0);
+      else
+        ERROR(PLUGIN_NAME " plugin: virDomainGetInfo failed with status %i.",
+              status);
+
+      virDomainFree(dom);
+    }
+    sfree(domids);
+  }
+#endif
+
+  return status;
+}
+
 static int lv_read(user_data_t *ud) {
   time_t t;
   struct lv_read_instance *inst = NULL;
@@ -1591,9 +2023,19 @@ static int lv_read(user_data_t *ud) {
   inst = ud->data;
   state = &inst->read_state;
 
+  bool reconnect = conn == NULL ? true : false;
+  /* event implementation must be registered before connection is opened */
   if (inst->id == 0) {
+    if (!persistent_notification && reconnect)
+      if (register_event_impl() != 0)
+        return -1;
+
     if (lv_connect() < 0)
       return -1;
+
+    if (!persistent_notification && reconnect && conn != NULL)
+      if (start_event_loop(&notif_thread) != 0)
+        return -1;
   }
 
   time(&t);
@@ -1602,32 +2044,53 @@ static int lv_read(user_data_t *ud) {
   if ((last_refresh == (time_t)0) ||
       ((interval > 0) && ((last_refresh + interval) <= t))) {
     if (refresh_lists(inst) != 0) {
-      if (inst->id == 0)
+      if (inst->id == 0) {
+        if (!persistent_notification)
+          stop_event_loop(&notif_thread);
         lv_disconnect();
+      }
       return -1;
     }
     last_refresh = t;
   }
 
-#if 0
-    for (int i = 0; i < nr_domains; ++i)
-        fprintf (stderr, "domain %s\n", virDomainGetName (state->domains[i].ptr));
-    for (int i = 0; i < nr_block_devices; ++i)
-        fprintf  (stderr, "block device %d %s:%s\n",
-                  i, virDomainGetName (block_devices[i].dom),
-                  block_devices[i].path);
-    for (int i = 0; i < nr_interface_devices; ++i)
-        fprintf (stderr, "interface device %d %s:%s\n",
-                 i, virDomainGetName (interface_devices[i].dom),
-                 interface_devices[i].path);
+  /* persistent domains state notifications are handled by instance 0 */
+  if (inst->id == 0 && persistent_notification) {
+    int status = persistent_domains_state_notification();
+    if (status != 0)
+      DEBUG(PLUGIN_NAME " plugin: persistent_domains_state_notifications "
+                        "returned with status %i",
+            status);
+  }
+
+#if COLLECT_DEBUG
+  for (int i = 0; i < state->nr_domains; ++i)
+    DEBUG(PLUGIN_NAME " plugin: domain %s",
+          virDomainGetName(state->domains[i].ptr));
+  for (int i = 0; i < state->nr_block_devices; ++i)
+    DEBUG(PLUGIN_NAME " plugin: block device %d %s:%s", i,
+          virDomainGetName(state->block_devices[i].dom),
+          state->block_devices[i].path);
+  for (int i = 0; i < state->nr_interface_devices; ++i)
+    DEBUG(PLUGIN_NAME " plugin: interface device %d %s:%s", i,
+          virDomainGetName(state->interface_devices[i].dom),
+          state->interface_devices[i].path);
 #endif
 
   /* Get domains' metrics */
   for (int i = 0; i < state->nr_domains; ++i) {
-    int status = get_domain_metrics(&state->domains[i]);
+    domain_t *dom = &state->domains[i];
+    int status = 0;
+    if (dom->active)
+      status = get_domain_metrics(dom);
+#ifdef HAVE_DOM_REASON
+    else
+      status = get_domain_state(dom->ptr);
+#endif
+
     if (status != 0)
       ERROR(PLUGIN_NAME " failed to get metrics for domain=%s",
-            virDomainGetName(state->domains[i].ptr));
+            virDomainGetName(dom->ptr));
   }
 
   /* Get block device stats for each domain. */
@@ -1667,6 +2130,7 @@ static int lv_init_instance(size_t i, plugin_read_cb callback) {
   ud->free_func = NULL;
 
   INFO(PLUGIN_NAME " plugin: reader %s initialized", inst->tag);
+
   return plugin_register_complex_read(NULL, inst->tag, callback, 0, ud);
 }
 
@@ -1681,6 +2145,7 @@ static void lv_fini_instance(size_t i) {
   struct lv_read_state *state = &(inst->read_state);
 
   lv_clean_read_state(state);
+
   INFO(PLUGIN_NAME " plugin: reader %s finalized", inst->tag);
 }
 
@@ -1688,13 +2153,27 @@ static int lv_init(void) {
   if (virInitialize() != 0)
     return -1;
 
+  /* event implementation must be registered before connection is opened */
+  if (!persistent_notification)
+    if (register_event_impl() != 0)
+      return -1;
+
   if (lv_connect() != 0)
     return -1;
 
+  DEBUG(PLUGIN_NAME " plugin: starting event loop");
+
+  if (!persistent_notification) {
+    virt_notif_thread_init(&notif_thread);
+    if (start_event_loop(&notif_thread) != 0)
+      return -1;
+  }
+
   DEBUG(PLUGIN_NAME " plugin: starting %i instances", nr_instances);
 
   for (int i = 0; i < nr_instances; ++i)
-    lv_init_instance(i, lv_read);
+    if (lv_init_instance(i, lv_read) != 0)
+      return -1;
 
   return 0;
 }
@@ -1793,224 +2272,247 @@ static int lv_instance_include_domain(struct lv_read_instance *inst,
   return 0;
 }
 
-/*
-  virConnectListAllDomains() appeared in 0.10.2
-  Note that LIBVIR_CHECK_VERSION appeared a year later, so
-  in some systems which actually have virConnectListAllDomains()
-  we can't detect this.
- */
-#ifdef LIBVIR_CHECK_VERSION
-#if LIBVIR_CHECK_VERSION(0, 10, 2)
-#define HAVE_LIST_ALL_DOMAINS 1
-#endif
-#endif
-
 static int refresh_lists(struct lv_read_instance *inst) {
   struct lv_read_state *state = &inst->read_state;
   int n;
 
+#ifndef HAVE_LIST_ALL_DOMAINS
   n = virConnectNumOfDomains(conn);
   if (n < 0) {
     VIRT_ERROR(conn, "reading number of domains");
     return -1;
   }
+#endif
 
   lv_clean_read_state(state);
 
-  if (n > 0) {
+#ifndef HAVE_LIST_ALL_DOMAINS
+  if (n == 0)
+    goto end;
+#endif
+
 #ifdef HAVE_LIST_ALL_DOMAINS
-    virDomainPtr *domains;
-    n = virConnectListAllDomains(conn, &domains,
-                                 VIR_CONNECT_LIST_DOMAINS_ACTIVE);
+  virDomainPtr *domains, *domains_inactive;
+  int m = virConnectListAllDomains(conn, &domains_inactive,
+                                   VIR_CONNECT_LIST_DOMAINS_INACTIVE);
+  n = virConnectListAllDomains(conn, &domains, VIR_CONNECT_LIST_DOMAINS_ACTIVE);
 #else
-    int *domids;
+  int *domids;
 
-    /* Get list of domains. */
-    domids = malloc(sizeof(*domids) * n);
-    if (domids == NULL) {
-      ERROR(PLUGIN_NAME " plugin: malloc failed.");
-      return -1;
-    }
+  /* Get list of domains. */
+  domids = calloc(n, sizeof(*domids));
+  if (domids == NULL) {
+    ERROR(PLUGIN_NAME " plugin: calloc failed.");
+    return -1;
+  }
 
-    n = virConnectListDomains(conn, domids, n);
+  n = virConnectListDomains(conn, domids, n);
 #endif
 
-    if (n < 0) {
-      VIRT_ERROR(conn, "reading list of domains");
+  if (n < 0) {
+    VIRT_ERROR(conn, "reading list of domains");
 #ifndef HAVE_LIST_ALL_DOMAINS
-      sfree(domids);
+    sfree(domids);
+#else
+    for (int i = 0; i < m; ++i)
+      virDomainFree(domains_inactive[i]);
+    sfree(domains_inactive);
 #endif
-      return -1;
+    return -1;
+  }
+
+#ifdef HAVE_LIST_ALL_DOMAINS
+  for (int i = 0; i < m; ++i)
+    if (add_domain(state, domains_inactive[i], 0) < 0) {
+      ERROR(PLUGIN_NAME " plugin: malloc failed.");
+      virDomainFree(domains_inactive[i]);
+      domains_inactive[i] = NULL;
+      continue;
     }
+#endif
 
-    /* Fetch each domain and add it to the list, unless ignore. */
-    for (int i = 0; i < n; ++i) {
-      const char *name;
-      char *xml = NULL;
-      xmlDocPtr xml_doc = NULL;
-      xmlXPathContextPtr xpath_ctx = NULL;
-      xmlXPathObjectPtr xpath_obj = NULL;
-      char tag[PARTITION_TAG_MAX_LEN] = {'\0'};
-      virDomainInfo info;
-      int status;
+  /* Fetch each domain and add it to the list, unless ignore. */
+  for (int i = 0; i < n; ++i) {
+    const char *name;
+    char *xml = NULL;
+    xmlDocPtr xml_doc = NULL;
+    xmlXPathContextPtr xpath_ctx = NULL;
+    xmlXPathObjectPtr xpath_obj = NULL;
+    char tag[PARTITION_TAG_MAX_LEN] = {'\0'};
+    virDomainInfo info;
+    int status;
 
 #ifdef HAVE_LIST_ALL_DOMAINS
-      virDomainPtr dom = domains[i];
+    virDomainPtr dom = domains[i];
 #else
-      virDomainPtr dom = NULL;
-      dom = virDomainLookupByID(conn, domids[i]);
-      if (dom == NULL) {
-        VIRT_ERROR(conn, "virDomainLookupByID");
-        /* Could be that the domain went away -- ignore it anyway. */
-        continue;
-      }
+    virDomainPtr dom = NULL;
+    dom = virDomainLookupByID(conn, domids[i]);
+    if (dom == NULL) {
+      VIRT_ERROR(conn, "virDomainLookupByID");
+      /* Could be that the domain went away -- ignore it anyway. */
+      continue;
+    }
 #endif
 
-      name = virDomainGetName(dom);
-      if (name == NULL) {
-        VIRT_ERROR(conn, "virDomainGetName");
-        goto cont;
-      }
+    if (add_domain(state, dom, 1) < 0) {
+      /*
+       * When domain is already tracked, then there is
+       * no problem with memory handling (will be freed
+       * with the rest of domains cached data)
+       * But in case of error like this (error occurred
+       * before adding domain to track) we have to take
+       * care it ourselves and call virDomainFree
+       */
+      ERROR(PLUGIN_NAME " plugin: malloc failed.");
+      virDomainFree(dom);
+      goto cont;
+    }
 
-      status = virDomainGetInfo(dom, &info);
-      if (status != 0) {
-        ERROR(PLUGIN_NAME " plugin: virDomainGetInfo failed with status %i.",
-              status);
-        continue;
-      }
+    name = virDomainGetName(dom);
+    if (name == NULL) {
+      VIRT_ERROR(conn, "virDomainGetName");
+      goto cont;
+    }
 
-      if (info.state != VIR_DOMAIN_RUNNING) {
-        DEBUG(PLUGIN_NAME " plugin: skipping inactive domain %s", name);
-        continue;
-      }
+    status = virDomainGetInfo(dom, &info);
+    if (status != 0) {
+      ERROR(PLUGIN_NAME " plugin: virDomainGetInfo failed with status %i.",
+            status);
+      continue;
+    }
 
-      if (il_domains && ignorelist_match(il_domains, name) != 0)
-        goto cont;
+    if (info.state != VIR_DOMAIN_RUNNING) {
+      DEBUG(PLUGIN_NAME " plugin: skipping inactive domain %s", name);
+      continue;
+    }
 
-      /* Get a list of devices for this domain. */
-      xml = virDomainGetXMLDesc(dom, 0);
-      if (!xml) {
-        VIRT_ERROR(conn, "virDomainGetXMLDesc");
-        goto cont;
-      }
+    if (il_domains && ignorelist_match(il_domains, name) != 0)
+      goto cont;
 
-      /* Yuck, XML.  Parse out the devices. */
-      xml_doc = xmlReadDoc((xmlChar *)xml, NULL, NULL, XML_PARSE_NONET);
-      if (xml_doc == NULL) {
-        VIRT_ERROR(conn, "xmlReadDoc");
-        goto cont;
-      }
+    /* Get a list of devices for this domain. */
+    xml = virDomainGetXMLDesc(dom, 0);
+    if (!xml) {
+      VIRT_ERROR(conn, "virDomainGetXMLDesc");
+      goto cont;
+    }
 
-      xpath_ctx = xmlXPathNewContext(xml_doc);
+    /* Yuck, XML.  Parse out the devices. */
+    xml_doc = xmlReadDoc((xmlChar *)xml, NULL, NULL, XML_PARSE_NONET);
+    if (xml_doc == NULL) {
+      VIRT_ERROR(conn, "xmlReadDoc");
+      goto cont;
+    }
 
-      if (lv_domain_get_tag(xpath_ctx, name, tag) < 0) {
-        ERROR(PLUGIN_NAME " plugin: lv_domain_get_tag failed.");
-        goto cont;
-      }
+    xpath_ctx = xmlXPathNewContext(xml_doc);
 
-      if (!lv_instance_include_domain(inst, name, tag))
-        goto cont;
+    if (lv_domain_get_tag(xpath_ctx, name, tag) < 0) {
+      ERROR(PLUGIN_NAME " plugin: lv_domain_get_tag failed.");
+      goto cont;
+    }
 
-      if (add_domain(state, dom) < 0) {
-        ERROR(PLUGIN_NAME " plugin: malloc failed.");
-        goto cont;
-      }
+    if (!lv_instance_include_domain(inst, name, tag))
+      goto cont;
 
-      /* Block devices. */
-      const char *bd_xmlpath = "/domain/devices/disk/target[@dev]";
-      if (blockdevice_format == source)
-        bd_xmlpath = "/domain/devices/disk/source[@dev]";
-      xpath_obj = xmlXPathEval((const xmlChar *)bd_xmlpath, xpath_ctx);
+    /* Block devices. */
+    const char *bd_xmlpath = "/domain/devices/disk/target[@dev]";
+    if (blockdevice_format == source)
+      bd_xmlpath = "/domain/devices/disk/source[@dev]";
+    xpath_obj = xmlXPathEval((const xmlChar *)bd_xmlpath, xpath_ctx);
 
-      if (xpath_obj == NULL || xpath_obj->type != XPATH_NODESET ||
-          xpath_obj->nodesetval == NULL)
-        goto cont;
+    if (xpath_obj == NULL || xpath_obj->type != XPATH_NODESET ||
+        xpath_obj->nodesetval == NULL)
+      goto cont;
 
-      for (int j = 0; j < xpath_obj->nodesetval->nodeNr; ++j) {
-        xmlNodePtr node;
-        char *path = NULL;
+    for (int j = 0; j < xpath_obj->nodesetval->nodeNr; ++j) {
+      xmlNodePtr node;
+      char *path = NULL;
 
-        node = xpath_obj->nodesetval->nodeTab[j];
-        if (!node)
-          continue;
-        path = (char *)xmlGetProp(node, (xmlChar *)"dev");
-        if (!path)
-          continue;
+      node = xpath_obj->nodesetval->nodeTab[j];
+      if (!node)
+        continue;
+      path = (char *)xmlGetProp(node, (xmlChar *)"dev");
+      if (!path)
+        continue;
 
-        if (il_block_devices &&
-            ignore_device_match(il_block_devices, name, path) != 0)
-          goto cont2;
+      if (il_block_devices &&
+          ignore_device_match(il_block_devices, name, path) != 0)
+        goto cont2;
 
-        add_block_device(state, dom, path);
-      cont2:
-        if (path)
-          xmlFree(path);
-      }
-      xmlXPathFreeObject(xpath_obj);
+      add_block_device(state, dom, path);
+    cont2:
+      if (path)
+        xmlFree(path);
+    }
+    xmlXPathFreeObject(xpath_obj);
+
+    /* Network interfaces. */
+    xpath_obj = xmlXPathEval(
+        (xmlChar *)"/domain/devices/interface[target[@dev]]", xpath_ctx);
+    if (xpath_obj == NULL || xpath_obj->type != XPATH_NODESET ||
+        xpath_obj->nodesetval == NULL)
+      goto cont;
 
-      /* Network interfaces. */
-      xpath_obj = xmlXPathEval(
-          (xmlChar *)"/domain/devices/interface[target[@dev]]", xpath_ctx);
-      if (xpath_obj == NULL || xpath_obj->type != XPATH_NODESET ||
-          xpath_obj->nodesetval == NULL)
-        goto cont;
+    xmlNodeSetPtr xml_interfaces = xpath_obj->nodesetval;
 
-      xmlNodeSetPtr xml_interfaces = xpath_obj->nodesetval;
+    for (int j = 0; j < xml_interfaces->nodeNr; ++j) {
+      char *path = NULL;
+      char *address = NULL;
+      xmlNodePtr xml_interface;
 
-      for (int j = 0; j < xml_interfaces->nodeNr; ++j) {
-        char *path = NULL;
-        char *address = NULL;
-        xmlNodePtr xml_interface;
+      xml_interface = xml_interfaces->nodeTab[j];
+      if (!xml_interface)
+        continue;
 
-        xml_interface = xml_interfaces->nodeTab[j];
-        if (!xml_interface)
+      for (xmlNodePtr child = xml_interface->children; child;
+           child = child->next) {
+        if (child->type != XML_ELEMENT_NODE)
           continue;
 
-        for (xmlNodePtr child = xml_interface->children; child;
-             child = child->next) {
-          if (child->type != XML_ELEMENT_NODE)
+        if (xmlStrEqual(child->name, (const xmlChar *)"target")) {
+          path = (char *)xmlGetProp(child, (const xmlChar *)"dev");
+          if (!path)
+            continue;
+        } else if (xmlStrEqual(child->name, (const xmlChar *)"mac")) {
+          address = (char *)xmlGetProp(child, (const xmlChar *)"address");
+          if (!address)
             continue;
-
-          if (xmlStrEqual(child->name, (const xmlChar *)"target")) {
-            path = (char *)xmlGetProp(child, (const xmlChar *)"dev");
-            if (!path)
-              continue;
-          } else if (xmlStrEqual(child->name, (const xmlChar *)"mac")) {
-            address = (char *)xmlGetProp(child, (const xmlChar *)"address");
-            if (!address)
-              continue;
-          }
         }
-
-        if (il_interface_devices &&
-            (ignore_device_match(il_interface_devices, name, path) != 0 ||
-             ignore_device_match(il_interface_devices, name, address) != 0))
-          goto cont3;
-
-        add_interface_device(state, dom, path, address, j + 1);
-      cont3:
-        if (path)
-          xmlFree(path);
-        if (address)
-          xmlFree(address);
       }
 
-    cont:
-      if (xpath_obj)
-        xmlXPathFreeObject(xpath_obj);
-      if (xpath_ctx)
-        xmlXPathFreeContext(xpath_ctx);
-      if (xml_doc)
-        xmlFreeDoc(xml_doc);
-      sfree(xml);
+      if (il_interface_devices &&
+          (ignore_device_match(il_interface_devices, name, path) != 0 ||
+           ignore_device_match(il_interface_devices, name, address) != 0))
+        goto cont3;
+
+      add_interface_device(state, dom, path, address, j + 1);
+    cont3:
+      if (path)
+        xmlFree(path);
+      if (address)
+        xmlFree(address);
     }
 
+  cont:
+    if (xpath_obj)
+      xmlXPathFreeObject(xpath_obj);
+    if (xpath_ctx)
+      xmlXPathFreeContext(xpath_ctx);
+    if (xml_doc)
+      xmlFreeDoc(xml_doc);
+    sfree(xml);
+  }
+
 #ifdef HAVE_LIST_ALL_DOMAINS
-    sfree(domains);
+  /* NOTE: domains_active and domains_inactive data will be cleared during
+     refresh of all domains (inside lv_clean_read_state function) so we need
+     to free here only allocated arrays */
+  sfree(domains);
+  sfree(domains_inactive);
 #else
-    sfree(domids);
+  sfree(domids);
+
+end:
 #endif
-  }
 
   DEBUG(PLUGIN_NAME " plugin#%s: refreshing"
                     " domains=%i block_devices=%i iface_devices=%i",
@@ -2030,7 +2532,8 @@ static void free_domains(struct lv_read_state *state) {
   state->nr_domains = 0;
 }
 
-static int add_domain(struct lv_read_state *state, virDomainPtr dom) {
+static int add_domain(struct lv_read_state *state, virDomainPtr dom,
+                      bool active) {
   domain_t *new_ptr;
   int new_size = sizeof(state->domains[0]) * (state->nr_domains + 1);
 
@@ -2044,6 +2547,7 @@ static int add_domain(struct lv_read_state *state, virDomainPtr dom) {
 
   state->domains = new_ptr;
   state->domains[state->nr_domains].ptr = dom;
+  state->domains[state->nr_domains].active = active;
   memset(&state->domains[state->nr_domains].info, 0,
          sizeof(state->domains[state->nr_domains].info));
 
@@ -2166,6 +2670,11 @@ static int lv_shutdown(void) {
     lv_fini_instance(i);
   }
 
+  DEBUG(PLUGIN_NAME " plugin: stopping event loop");
+
+  if (!persistent_notification)
+    stop_event_loop(&notif_thread);
+
   lv_disconnect();
 
   ignorelist_free(il_domains);
index 489a367..458faca 100644 (file)
 #include "testing.h"
 #include "virt.c" /* sic */
 
-#include <unistd.h>
+static virDomainPtr *domains;
+static int nr_domains;
 
-static const char minimal_xml[] =
-    ""
-    "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
-    "<domain type=\"kvm\" xmlns:ovirt=\"http://ovirt.org/vm/tune/1.0\">"
-    "  <metadata/>"
-    "</domain>";
-
-static const char minimal_metadata_xml[] =
-    ""
-    "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
-    "<domain type=\"kvm\" xmlns:ovirt=\"http://ovirt.org/vm/tune/1.0\">"
-    "  <metadata>"
-    "    <ovirtmap:tag "
-    "xmlns:ovirtmap=\"http://ovirt.org/ovirtmap/tag/1.0\">virt-0</ovirtmap:tag>"
-    "  </metadata>"
-    "</domain>";
-
-struct xml_state {
-  xmlDocPtr xml_doc;
-  xmlXPathContextPtr xpath_ctx;
-  xmlXPathObjectPtr xpath_obj;
-  char tag[PARTITION_TAG_MAX_LEN];
-};
-
-static int init_state(struct xml_state *st, const char *xml) {
-  memset(st, 0, sizeof(*st));
-
-  st->xml_doc = xmlReadDoc((const xmlChar *)xml, NULL, NULL, XML_PARSE_NONET);
-  if (st->xml_doc == NULL) {
-    return -1;
-  }
-  st->xpath_ctx = xmlXPathNewContext(st->xml_doc);
-  if (st->xpath_ctx == NULL) {
+static int setup(void) {
+  if (virInitialize() != 0) {
+    printf("ERROR: virInitialize() != 0\n");
     return -1;
   }
-  return 0;
-}
 
-static void fini_state(struct xml_state *st) {
-  if (st->xpath_ctx) {
-    xmlXPathFreeContext(st->xpath_ctx);
-    st->xpath_ctx = NULL;
-  }
-  if (st->xml_doc) {
-    xmlFreeDoc(st->xml_doc);
-    st->xml_doc = NULL;
+  conn = virConnectOpen("test:///default");
+  if (conn == NULL) {
+    printf("ERROR: virConnectOpen == NULL\n");
+    return -1;
   }
-}
-
-#define TAG "virt-0"
-
-DEF_TEST(lv_domain_get_tag_no_metadata_xml) {
-  int err;
-  struct xml_state st;
-  err = init_state(&st, minimal_xml);
-  EXPECT_EQ_INT(0, err);
-
-  err = lv_domain_get_tag(st.xpath_ctx, "test", st.tag);
-
-  EXPECT_EQ_INT(0, err);
-  EXPECT_EQ_STR("", st.tag);
-
-  fini_state(&st);
-  return 0;
-}
-
-DEF_TEST(lv_domain_get_tag_valid_xml) {
-  int err;
-  struct xml_state st;
-  err = init_state(&st, minimal_metadata_xml);
-  EXPECT_EQ_INT(0, err);
-
-  err = lv_domain_get_tag(st.xpath_ctx, "test", st.tag);
-
-  EXPECT_EQ_INT(0, err);
-  EXPECT_EQ_STR(TAG, st.tag);
-
-  return 0;
-}
-
-DEF_TEST(lv_default_instance_include_domain_without_tag) {
-  struct lv_read_instance *inst = NULL;
-  int ret;
-
-  ret = lv_init_instance(0, lv_read);
-  inst = &(lv_read_user_data[0].inst);
-  EXPECT_EQ_STR("virt-0", inst->tag);
-
-  ret = lv_instance_include_domain(inst, "testing", "");
-  EXPECT_EQ_INT(1, ret);
-
-  lv_fini_instance(0);
-  return 0;
-}
-
-DEF_TEST(lv_regular_instance_skip_domain_without_tag) {
-  struct lv_read_instance *inst = NULL;
-  int ret;
-
-  ret = lv_init_instance(1, lv_read);
-  inst = &(lv_read_user_data[1].inst);
-  EXPECT_EQ_STR("virt-1", inst->tag);
 
-  ret = lv_instance_include_domain(inst, "testing", "");
-  EXPECT_EQ_INT(0, ret);
-
-  lv_fini_instance(0);
   return 0;
 }
 
-DEF_TEST(lv_include_domain_matching_tags) {
-  struct lv_read_instance *inst = NULL;
-  int ret;
-
-  ret = lv_init_instance(0, lv_read);
-  inst = &(lv_read_user_data[0].inst);
-  EXPECT_EQ_STR("virt-0", inst->tag);
-
-  ret = lv_instance_include_domain(inst, "testing", "virt-0");
-  EXPECT_EQ_INT(1, ret);
-
-  ret = lv_init_instance(1, lv_read);
-  inst = &(lv_read_user_data[1].inst);
-  EXPECT_EQ_STR("virt-1", inst->tag);
-
-  ret = lv_instance_include_domain(inst, "testing", "virt-1");
-  EXPECT_EQ_INT(1, ret);
+static int teardown(void) {
+  if (domains) {
+    for (int i = 0; i < nr_domains; ++i)
+      virDomainFree(domains[i]);
+    sfree(domains);
+  }
+  nr_domains = 0;
+  if (conn != NULL)
+    virConnectClose(conn);
 
-  lv_fini_instance(0);
-  lv_fini_instance(1);
   return 0;
 }
 
-DEF_TEST(lv_default_instance_include_domain_with_unknown_tag) {
-  struct lv_read_instance *inst = NULL;
-  int ret;
-
-  ret = lv_init_instance(0, lv_read);
-  inst = &(lv_read_user_data[0].inst);
-  EXPECT_EQ_STR("virt-0", inst->tag);
-
-  ret = lv_instance_include_domain(inst, "testing", "unknownFormat-tag");
-  EXPECT_EQ_INT(1, ret);
+#ifdef HAVE_LIST_ALL_DOMAINS
+DEF_TEST(get_domain_state_notify) {
+  if (setup() == 0) {
+    nr_domains = virConnectListAllDomains(conn, &domains,
+                                          VIR_CONNECT_LIST_DOMAINS_PERSISTENT);
+    if (nr_domains <= 0) {
+      printf("ERROR: virConnectListAllDomains: nr_domains <= 0\n");
+      return -1;
+    }
+
+    int ret = get_domain_state_notify(domains[0]);
+    EXPECT_EQ_INT(0, ret);
+  }
+  teardown();
 
-  lv_fini_instance(0);
   return 0;
 }
+#endif
 
-DEF_TEST(lv_regular_instance_skip_domain_with_unknown_tag) {
-  struct lv_read_instance *inst = NULL;
-  int ret;
-
-  ret = lv_init_instance(1, lv_read);
-  inst = &(lv_read_user_data[1].inst);
-  EXPECT_EQ_STR("virt-1", inst->tag);
-
-  ret = lv_instance_include_domain(inst, "testing", "unknownFormat-tag");
-  EXPECT_EQ_INT(0, ret);
+DEF_TEST(persistent_domains_state_notification) {
+  if (setup() == 0) {
+    int ret = persistent_domains_state_notification();
+    EXPECT_EQ_INT(0, ret);
+  }
+  teardown();
 
-  lv_fini_instance(0);
   return 0;
 }
-#undef TAG
 
 int main(void) {
-  RUN_TEST(lv_domain_get_tag_no_metadata_xml);
-  RUN_TEST(lv_domain_get_tag_valid_xml);
-
-  RUN_TEST(lv_default_instance_include_domain_without_tag);
-  RUN_TEST(lv_regular_instance_skip_domain_without_tag);
-  RUN_TEST(lv_include_domain_matching_tags);
-  RUN_TEST(lv_default_instance_include_domain_with_unknown_tag);
-  RUN_TEST(lv_regular_instance_skip_domain_with_unknown_tag);
+#ifdef HAVE_LIST_ALL_DOMAINS
+  RUN_TEST(get_domain_state_notify);
+#endif
+  RUN_TEST(persistent_domains_state_notification);
 
   END_TEST;
 }
-
-/* vim: set sw=2 sts=2 et : */