Merge remote-tracking branch 'github/pr/706'
authorFlorian Forster <octo@collectd.org>
Fri, 22 Aug 2014 05:23:28 +0000 (07:23 +0200)
committerFlorian Forster <octo@collectd.org>
Fri, 22 Aug 2014 05:23:28 +0000 (07:23 +0200)
Conflicts:
src/write_riemann.c

31 files changed:
AUTHORS
ChangeLog
README
configure.ac
contrib/collection3/etc/collection.conf
contrib/redhat/collectd.spec
src/Makefile.am
src/aggregation.c
src/amqp.c
src/apache.c
src/collectd-unixsock.pod
src/collectd.conf.in
src/collectd.conf.pod
src/common.c
src/configfile.c
src/curl.c
src/curl_json.c
src/curl_xml.c
src/exec.c
src/lvm.c
src/network.c
src/processes.c
src/pyvalues.c
src/snmp.c
src/statsd.c
src/tcpconns.c
src/utils_vl_lookup.c
src/write_http.c
src/write_riemann.c
src/write_tsdb.c [new file with mode: 0644]
version-gen.sh

diff --git a/AUTHORS b/AUTHORS
index 6d0eae7..6ecc688 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -5,7 +5,7 @@ Florian "octo" Forster <octo at verplant.org>
  - Initial author.
 
 Sebastian "tokkee" Harl <sh at tokkee.org>
- - Bugfixes and enhancments in many places all around the project.
+ - Bugfixes and enhancements in many places all around the project.
  - perl plugin.
  - users plugin.
  - vserver plugin.
@@ -53,9 +53,12 @@ Benjamin Gilbert <bgilbert at cs.cmu.edu>
 Bert Vermeulen <bert at biot.com>
  - sigrok plugin
 
+Brett Hawn <bhawn at llnw.com>
+ - write_tsdb plugin for http://opentsdb.net/
+
 Bruno Prémont <bonbons at linux-vserver.org>
  - BIND plugin.
- - Many bugreports and -fixes in various plugins,
+ - Many bug reports and -fixes in various plugins,
    especially a nasty bug in the network plugin.
  - Wireshark dissector.
 
@@ -112,6 +115,9 @@ J. Javier Maestro <jjmaestro at ieee.org>
 Jérôme Renard <jerome.renard at gmail.com>
  - varnish plugin.
 
+Kevin Bowling <kbowling at llnw.com>
+ - write_tsdb plugin for http://opentsdb.net/
+
 Kris Nielander <nielander at fox-it.com>
  - tail_csv plugin.
 
index 8cd1579..8112afa 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,27 @@
+2014-01-26, Version 5.4.1
+       * amqp plugin: Add support for RabbitMQ 0.4.x to avoid compiler
+         warnings. Thanks to Sebastian Harl for implementing this.
+       * apache / network plugins: Improved initialization order hopefully
+         resolved gcrypt initialization problems.
+       * aquaero plugin: The type used to submit fan utilization was fixed.
+         Thanks to Alex Deymo for the patch.
+       * cgroups plugin: A small memory leak was fixed. Checking the existence
+         of a mount option without a value was fixed. More permissive parsing
+         of the cpuacct.stats file fixes support for some versions of Linux.
+         Thanks to Marc Fournier for bug reports and patches.
+       * curl plugin: Fix <Match> blocks without an instance. Thanks to
+         Alexander Golovko for reporting and Sebastian Harl for fixing this.
+       * curl_json plugin: Potentially invalid memory access has been
+         sanitized. Thanks to Jim Radford for his patch.
+       * interface plugin: Fix behavior under FreeBSD 10: Reporting of
+         per-address statistics caused duplicate updates to the same metric.
+         Thanks to demon / @trtrmitya for the patch.
+       * write_graphite plugin: Use TCP to connect to Graphite by default. The
+         default changed from TCP to UDP between 5.3.1 and 5.4.0, which is a
+         regression. Thanks to Marc Fournier for fixing this. Reconnect
+         behavior was improved. Thanks to Michael Hart for his patch.
+       * zfs_arc plugin: Collect "allocated" and "stolen" on FreeBSD only.
+
 2013-08-18, Version 5.4.0
        * collectd: The "LoadPlugin" config option no longer attempts to load
          plugins twice. If more than one "LoadPlugin" statement or block is
diff --git a/README b/README
index 60de1ed..1956609 100644 (file)
--- a/README
+++ b/README
@@ -16,18 +16,18 @@ Features
   * collectd is able to collect the following data:
 
     - apache
-      Apache server utilization: Number of bytes transfered, number of
+      Apache server utilization: Number of bytes transferred, number of
       requests handled and detailed scoreboard statistics
 
     - apcups
       APC UPS Daemon: UPS charge, load, input/output/battery voltage, etc.
 
     - apple_sensors
-      Sensors in Macs running Mac OS X / Darwin: Temperature, fanspeed and
+      Sensors in Macs running Mac OS X / Darwin: Temperature, fan speed and
       voltage sensors.
 
     - aquaero
-      Various sensors in the Aquaero 5 watercooling board made by Aquacomputer.
+      Various sensors in the Aquaero 5 water cooling board made by Aquacomputer.
 
     - ascent
       Statistics about Ascent, a free server for the game `World of Warcraft'.
@@ -85,7 +85,7 @@ Features
 
     - dns
       DNS traffic: Query types, response codes, opcodes and traffic/octets
-      transfered.
+      transferred.
 
     - drbd
       Collect individual drbd resource statistics.
@@ -114,7 +114,7 @@ Features
       Receive multicast traffic from Ganglia instances.
 
     - hddtemp
-      Harddisk temperatures using hddtempd.
+      Hard disk temperatures using hddtempd.
 
     - interface
       Interface traffic: Number of octets, packets and errors for each
@@ -158,7 +158,7 @@ Features
       interfaces that use the Atheros chipset and the MadWifi driver.
 
     - mbmon
-      Motherboard sensors: temperature, fanspeed and voltage information,
+      Motherboard sensors: temperature, fan speed and voltage information,
       using mbmon(1).
 
     - md
@@ -275,7 +275,7 @@ Features
       See collectd-python(5) for details.
 
     - redis
-      The redis plugin gathers information from a redis server, including:
+      The redis plugin gathers information from a Redis server, including:
       uptime, used memory, total connections etc.
 
     - routeros
@@ -306,13 +306,13 @@ Features
       clients and calculating rates and other aggregates out of these values.
 
     - swap
-      Pages swapped out onto harddisk or whatever is called `swap' by the OS..
+      Pages swapped out onto hard disk or whatever is called `swap' by the OS..
 
     - table
       Parse table-like structured files.
 
     - tail
-      Follows (tails) logfiles, parses them by lines and submits matched
+      Follows (tails) log files, parses them by lines and submits matched
       values.
 
     - tail_csv
@@ -411,6 +411,10 @@ Features
       can be configured to avoid logging send errors (especially useful when
       using UDP).
 
+    - write_tsdb
+      Sends data OpenTSDB, a scalable no master, no shared state time series
+      database.
+
     - write_http
       Sends the values collected by collectd to a web-server using HTTP POST
       requests. The transmitted data is either in a form understood by the
@@ -519,7 +523,7 @@ Features
       values are out of bounds. See collectd-threshold(5) for details.
 
     - uuid
-      Sets the hostname to an unique identifier. This is meant for setups
+      Sets the hostname to a unique identifier. This is meant for setups
       where each client may migrate to another physical host, possibly going
       through one or more name changes in the process.
 
@@ -527,7 +531,7 @@ Features
     time starting up again and again. With the exception of the exec plugin no
     processes are forked. Caching in output plugins, such as the rrdtool and
     network plugins, makes sure your resources are used efficiently. Also,
-    since collectd is programmed multithreaded it benefits from hyperthreading
+    since collectd is programmed multithreaded it benefits from hyper-threading
     and multicore processors and makes sure that the daemon isn't idle if only
     one plugin waits for an IO-operation to complete.
 
@@ -539,7 +543,7 @@ Operation
 ---------
 
   * collectd's configuration file can be found at `sysconfdir'/collectd.conf.
-    Run `collectd -h' for a list of builtin defaults. See `collectd.conf(5)'
+    Run `collectd -h' for a list of built-in defaults. See `collectd.conf(5)'
     for a list of options and a syntax description.
 
   * When the `csv' or `rrdtool' plugins are loaded they'll write the values to
index 5235112..0df5678 100644 (file)
@@ -404,6 +404,18 @@ AC_CHECK_HEADERS(linux/if.h, [], [],
 #  include <sys/socket.h>
 #endif
 ])
+AC_CHECK_HEADERS(linux/inet_diag.h, [], [],
+[
+#if HAVE_SYS_TYPES_H
+#  include <sys/types.h>
+#endif
+#if HAVE_SYS_SOCKET_H
+#  include <sys/socket.h>
+#endif
+#if HAVE_LINUX_INET_DIAG_H
+# include <linux/inet_diag.h>
+#endif
+])
 AC_CHECK_HEADERS(linux/netdevice.h, [], [],
 [
 #if HAVE_SYS_TYPES_H
@@ -1218,10 +1230,9 @@ if test "x$have_getmntent" = "xgen"; then
 fi
 
 # Check for htonll
-AC_MSG_CHECKING([if have htonll defined])
-
-    have_htonll="no"
-    AC_LINK_IFELSE([AC_LANG_PROGRAM(
+AC_CACHE_CHECK([if have htonll defined],
+                  [c_cv_have_htonll],
+                  AC_LINK_IFELSE([AC_LANG_PROGRAM(
 [[[
 #include <sys/types.h>
 #include <netinet/in.h>
@@ -1233,12 +1244,14 @@ AC_MSG_CHECKING([if have htonll defined])
           return htonll(0);
 ]]]
     )],
-    [
-      have_htonll="yes"
-      AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.])
-    ])
-
-AC_MSG_RESULT([$have_htonll])
+    [c_cv_have_htonll="yes"],
+    [c_cv_have_htonll="no"]
+  )
+)
+if test "x$c_cv_have_htonll" = "xyes"
+then
+    AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.])
+fi
 
 # Check for structures
 AC_CHECK_MEMBERS([struct if_data.ifi_ibytes, struct if_data.ifi_opackets, struct if_data.ifi_ierrors],
@@ -1258,6 +1271,13 @@ AC_CHECK_MEMBERS([struct net_device_stats.rx_bytes, struct net_device_stats.tx_p
        #include <linux/if.h>
        #include <linux/netdevice.h>
        ])
+AC_CHECK_MEMBERS([struct inet_diag_req.id, struct inet_diag_req.idiag_states],
+       [AC_DEFINE(HAVE_STRUCT_LINUX_INET_DIAG_REQ, 1, [Define if struct inet_diag_req exists and is usable.])],
+       [],
+       [
+       #include <linux/inet_diag.h>
+       ])
+
 
 AC_CHECK_MEMBERS([struct ip_mreqn.imr_ifindex], [],
        [],
@@ -1368,8 +1388,8 @@ AC_CHECK_LIB(hal,libhal_device_property_exists,
             [with_libhal="no"])
 if test "x$with_libhal" = "xyes"; then
        if test "x$PKG_CONFIG" != "x"; then
-               BUILD_WITH_LIBHAL_CFLAGS="`pkg-config --cflags hal`"
-               BUILD_WITH_LIBHAL_LIBS="`pkg-config --libs hal`"
+               BUILD_WITH_LIBHAL_CFLAGS="`$PKG_CONFIG --cflags hal`"
+               BUILD_WITH_LIBHAL_LIBS="`$PKG_CONFIG --libs hal`"
                AC_SUBST(BUILD_WITH_LIBHAL_CFLAGS)
                AC_SUBST(BUILD_WITH_LIBHAL_LIBS)
        fi
@@ -3576,6 +3596,41 @@ fi
 CPPFLAGS="$SAVE_CPPFLAGS"
 LDFLAGS="$SAVE_LDFLAGS"
 AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+
+with_amqp_tcp_socket="no"
+if test "x$with_librabbitmq" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       SAVE_LDFLAGS="$LDFLAGS"
+       SAVE_LIBS="$LIBS"
+       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+       LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+       LIBS="-lrabbitmq"
+
+       AC_CHECK_HEADERS(amqp_tcp_socket.h amqp_socket.h)
+       AC_CHECK_FUNC(amqp_tcp_socket_new, [with_amqp_tcp_socket="yes"], [with_amqp_tcp_socket="no"])
+       if test "x$with_amqp_tcp_socket" = "xyes"
+       then
+               AC_DEFINE(HAVE_AMQP_TCP_SOCKET, 1,
+                               [Define if librabbitmq provides the new TCP socket interface.])
+       fi
+
+       AC_CHECK_DECLS(amqp_socket_close,
+                               [amqp_socket_close_decl="yes"], [amqp_socket_close_decl="no"],
+                               [[
+#include <amqp.h>
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+                               ]])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+       LDFLAGS="$SAVE_LDFLAGS"
+       LIBS="$SAVE_LIBS"
+fi
 # }}}
 
 # --with-librdkafka {{{
@@ -3844,18 +3899,20 @@ AC_ARG_WITH(libsigrok, [AS_HELP_STRING([--with-libsigrok@<:@=PREFIX@:>@], [Path
                        with_libsigrok_ldflags="-L$withval/lib"
                fi
        fi
-],[])
+],[with_libsigrok="yes"])
 
 # libsigrok has a glib dependency
 if test "x$with_libsigrok" = "xyes"
 then
-       if test -z "m4_ifdef([AM_PATH_GLIB_2_0], [yes], [])"
-       then
-               with_libsigrok="no (glib not available)"
-       else
-               AM_PATH_GLIB_2_0([2.28.0],
-                       [with_libsigrok_cflags="$with_libsigrok_cflags $GLIB_CFLAGS"; with_libsigrok_ldflags="$with_libsigrok_ldflags $GLIB_LIBS"])
-       fi
+m4_ifdef([AM_PATH_GLIB_2_0],
+       [
+        AM_PATH_GLIB_2_0([2.28.0],
+               [with_libsigrok_cflags="$with_libsigrok_cflags $GLIB_CFLAGS"; with_libsigrok_ldflags="$with_libsigrok_ldflags $GLIB_LIBS"])
+       ],
+       [
+        with_libsigrok="no (glib not available)"
+       ]
+)
 fi
 
 # libsigrok headers
@@ -4039,8 +4096,8 @@ then
   if $PKG_CONFIG --exists tokyotyrant
   then
     with_libtokyotyrant_cppflags="$with_libtokyotyrant_cppflags `$PKG_CONFIG --cflags tokyotyrant`"
-    with_libtokyotyrant_ldflags="$with_libtokyotyrant_ldflags `pkg-config --libs-only-L tokyotyrant`"
-    with_libtokyotyrant_libs="$with_libtokyotyrant_libs `pkg-config --libs-only-l tokyotyrant`"
+    with_libtokyotyrant_ldflags="$with_libtokyotyrant_ldflags `$PKG_CONFIG --libs-only-L tokyotyrant`"
+    with_libtokyotyrant_libs="$with_libtokyotyrant_libs `$PKG_CONFIG --libs-only-l tokyotyrant`"
   fi
 fi
 
@@ -4557,7 +4614,7 @@ with_libvirt_cflags=""
 with_libvirt_ldflags=""
 if test "x$PKG_CONFIG" != "x"
 then
-       pkg-config --exists 'libxml-2.0' 2>/dev/null
+       $PKG_CONFIG --exists 'libxml-2.0' 2>/dev/null
        if test "$?" = "0"
        then
                with_libxml2="yes"
@@ -4565,7 +4622,7 @@ then
                with_libxml2="no (pkg-config doesn't know libxml-2.0)"
        fi
 
-       pkg-config --exists libvirt 2>/dev/null
+       $PKG_CONFIG --exists libvirt 2>/dev/null
        if test "$?" = "0"
        then
                with_libvirt="yes"
@@ -4575,12 +4632,12 @@ then
 fi
 if test "x$with_libxml2" = "xyes"
 then
-       with_libxml2_cflags="`pkg-config --cflags libxml-2.0`"
+       with_libxml2_cflags="`$PKG_CONFIG --cflags libxml-2.0`"
        if test $? -ne 0
        then
                with_libxml2="no"
        fi
-       with_libxml2_ldflags="`pkg-config --libs libxml-2.0`"
+       with_libxml2_ldflags="`$PKG_CONFIG --libs libxml-2.0`"
        if test $? -ne 0
        then
                with_libxml2="no"
@@ -4620,12 +4677,12 @@ if test "x$with_libxml2" = "xyes"; then
 fi
 if test "x$with_libvirt" = "xyes"
 then
-       with_libvirt_cflags="`pkg-config --cflags libvirt`"
+       with_libvirt_cflags="`$PKG_CONFIG --cflags libvirt`"
        if test $? -ne 0
        then
                with_libvirt="no"
        fi
-       with_libvirt_ldflags="`pkg-config --libs libvirt`"
+       with_libvirt_ldflags="`$PKG_CONFIG --libs libvirt`"
        if test $? -ne 0
        then
                with_libvirt="no"
@@ -5354,6 +5411,7 @@ AC_PLUGIN([write_kafka],  [$with_librdkafka],  [Kafka output plugin])
 AC_PLUGIN([write_mongodb], [$with_libmongoc],  [MongoDB output plugin])
 AC_PLUGIN([write_redis], [$with_libcredis],    [Redis output plugin])
 AC_PLUGIN([write_riemann], [$have_protoc_c],   [Riemann output plugin])
+AC_PLUGIN([write_tsdb],  [yes],                [TSDB output plugin])
 AC_PLUGIN([xmms],        [$with_libxmms],      [XMMS statistics])
 AC_PLUGIN([zfs_arc],     [$plugin_zfs_arc],    [ZFS ARC statistics])
 
@@ -5384,7 +5442,7 @@ else
        LOAD_PLUGIN_LOGFILE="##"
 fi
 
-if test "x$enable_logfile" = "xyes"
+if test "x$enable_log_logstash" = "xyes"
 then
   LOAD_PLUGIN_LOG_LOGSTASH="#"
 else
@@ -5717,6 +5775,7 @@ Configuration:
     write_mongodb . . . . $enable_write_mongodb
     write_redis . . . . . $enable_write_redis
     write_riemann . . . . $enable_write_riemann
+    write_tsdb  . . . . . $enable_write_tsdb
     xmms  . . . . . . . . $enable_xmms
     zfs_arc . . . . . . . $enable_zfs_arc
 
index 5fe4313..3e19bfc 100644 (file)
@@ -7,7 +7,7 @@ GraphWidth 400
   RRDTitle "Apache Traffic"
   RRDVerticalLabel "Bytes/s"
   RRDFormat "%5.1lf%s"
-  Color count 0000ff
+  Color value 0000ff
 </Type>
 <Type apache_requests>
   DataSources value
@@ -15,7 +15,7 @@ GraphWidth 400
   RRDTitle "Apache Traffic"
   RRDVerticalLabel "Requests/s"
   RRDFormat "%5.2lf"
-  Color count 00d000
+  Color value 00d000
 </Type>
 <Type apache_scoreboard>
   Module GenericStacked
@@ -272,7 +272,7 @@ GraphWidth 400
   RRDTitle "Frequency ({type_instance})"
   RRDVerticalLabel "Hertz"
   RRDFormat "%4.1lfHz"
-  Color frequency a000a0
+  Color value a000a0
 </Type>
 <Type humidity>
   DataSources value
@@ -547,7 +547,7 @@ GraphWidth 400
   RRDTitle "Percent ({type_instance})"
   RRDVerticalLabel "Percent"
   RRDFormat "%4.1lf%%"
-  Color percent 0000ff
+  Color value 0000ff
 </Type>
 <Type ping>
   DataSources value
@@ -705,7 +705,7 @@ GraphWidth 400
   RRDTitle "Users ({type_instance}) on {hostname}"
   RRDVerticalLabel "Users"
   RRDFormat "%.1lf"
-  Color users 0000f0
+  Color value 0000f0
 </Type>
 <Type voltage>
   DataSources value
index 6af91e8..9facca8 100644 (file)
@@ -45,6 +45,7 @@
 %{?el6:%global _has_ip_vs_h 1}
 %{?el6:%global _has_lvm2app_h 1}
 %{?el6:%global _has_perl_extutils_embed 1}
+%{?el6:%global _has_libmodbus 1}
 
 # plugins enabled by default
 %define with_aggregation 0%{!?_without_aggregation:1}
@@ -92,6 +93,7 @@
 %define with_memcached 0%{!?_without_memcached:1}
 %define with_memory 0%{!?_without_memory:1}
 %define with_multimeter 0%{!?_without_multimeter:1}
+%define with_modbus 0%{!?_without_modbus:0%{?_has_libmodbus}}
 %define with_mysql 0%{!?_without_mysql:1}
 %define with_netlink 0%{!?_without_netlink:1}
 %define with_network 0%{!?_without_network:1}
@@ -422,6 +424,16 @@ The mic plugin collects CPU usage, memory usage, temperatures and power
 consumption from Intel Many Integrated Core (MIC) CPUs.
 %endif
 
+%if %{with_modbus}
+%package modbus
+Summary:       modbus plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: libmodbus-devel
+%description modbus
+The modbus plugin collects values from Modbus/TCP enabled devices
+%endif
+
 %if %{with_mysql}
 %package mysql
 Summary:       MySQL plugin for collectd
@@ -997,6 +1009,12 @@ Development files for libcollectdclient
 %define _with_multimeter --disable-multimeter
 %endif
 
+%if %{with_modbus}
+%define _with_modbus --enable-modbus
+%else
+%define _with_modbus --disable-modbus
+%endif
+
 %if %{with_mysql}
 %define _with_mysql --enable-mysql
 %else
@@ -2013,6 +2031,9 @@ fi
 - Enable cgroups, lvm and statsd plugins
 - Enable (but don't build by default) mic, aquaero and sigrok plugins
 
+* Wed Aug 06 2014 Marc Fournier <marc.fournier@camptocamp.com> 5.3.1-2
+- Enabled modbus plugin
+
 * Tue Aug 06 2013 Marc Fournier <marc.fournier@camptocamp.com> 5.3.1-1
 - New upstream version
 - Added RHEL5 support:
index 1a8ebbb..d727566 100644 (file)
@@ -520,8 +520,8 @@ if BUILD_PLUGIN_IPTABLES
 pkglib_LTLIBRARIES += iptables.la
 iptables_la_SOURCES = iptables.c
 iptables_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBIPTC_CPPFLAGS)
-iptables_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBIPTC_LDFLAGS)
-iptables_la_LIBADD = -liptc
+iptables_la_LDFLAGS = -module -avoid-version
+iptables_la_LIBADD = $(BUILD_WITH_LIBIPTC_LDFLAGS)
 collectd_LDADD += "-dlopen" iptables.la
 collectd_DEPENDENCIES += iptables.la
 endif
@@ -1467,6 +1467,14 @@ collectd_LDADD += "-dlopen" write_riemann.la
 collectd_DEPENDENCIES += write_riemann.la
 endif
 
+if BUILD_PLUGIN_WRITE_TSDB
+pkglib_LTLIBRARIES += write_tsdb.la
+write_tsdb_la_SOURCES = write_tsdb.c
+write_tsdb_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" write_tsdb.la
+collectd_DEPENDENCIES += write_tsdb.la
+endif
+
 if BUILD_PLUGIN_XMMS
 pkglib_LTLIBRARIES += xmms.la
 xmms_la_SOURCES = xmms.c
index 0c0f19d..8175c66 100644 (file)
@@ -440,8 +440,7 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
 
 /* lookup_class_callback_t for utils_vl_lookup */
 static void *agg_lookup_class_callback ( /* {{{ */
-    __attribute__((unused)) data_set_t const *ds,
-    value_list_t const *vl, void *user_class)
+    data_set_t const *ds, value_list_t const *vl, void *user_class)
 {
   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
 } /* }}} void *agg_class_callback */
index 56718f0..1764129 100644 (file)
 #include <amqp.h>
 #include <amqp_framing.h>
 
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+#ifdef HAVE_AMQP_TCP_SOCKET
+#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
+/* rabbitmq-c does not currently ship amqp_socket.h
+ * and, thus, does not define this function. */
+int amqp_socket_close(amqp_socket_t *);
+#endif
+#endif
+
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
 #define CAMQP_DM_VOLATILE   1
@@ -392,8 +406,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
-    int sockfd;
     int status;
+#ifdef HAVE_AMQP_TCP_SOCKET
+    amqp_socket_t *socket;
+#else
+    int sockfd;
+#endif
 
     if (conf->connection != NULL)
         return (0);
@@ -405,6 +423,33 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (ENOMEM);
     }
 
+#ifdef HAVE_AMQP_TCP_SOCKET
+# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */
+    /* TODO: add support for SSL using amqp_ssl_socket_new
+     *       and related functions */
+    socket = amqp_tcp_socket_new (conf->connection);
+    if (! socket)
+    {
+        ERROR ("amqp plugin: amqp_tcp_socket_new failed.");
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (ENOMEM);
+    }
+
+    status = amqp_socket_open (socket, CONF(conf, host), conf->port);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status *= -1;
+        ERROR ("amqp plugin: amqp_socket_open failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (status);
+    }
+#else /* HAVE_AMQP_TCP_SOCKET */
+# define CLOSE_SOCKET() close(sockfd)
+    /* this interface is deprecated as of rabbitmq-c 0.4 */
     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
     {
@@ -417,6 +462,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (status);
     }
     amqp_set_sockfd (conf->connection, sockfd);
+#endif
 
     reply = amqp_login (conf->connection, CONF(conf, vhost),
             /* channel max = */      0,
@@ -429,7 +475,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
                 CONF(conf, vhost), CONF(conf, user));
         amqp_destroy_connection (conf->connection);
-        close (sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
@@ -442,7 +488,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_channel_open failed.");
         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
         amqp_destroy_connection (conf->connection);
-        close(sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
index 7e4c79b..ad5975c 100644 (file)
@@ -671,9 +671,18 @@ static int apache_read_host (user_data_t *user_data) /* {{{ */
        return (0);
 } /* }}} int apache_read_host */
 
+static int apache_init (void) /* {{{ */
+{
+       /* Call this while collectd is still single-threaded to avoid
+        * initialization issues in libgcrypt. */
+       curl_global_init (CURL_GLOBAL_SSL);
+       return (0);
+} /* }}} int apache_init */
+
 void module_register (void)
 {
        plugin_register_complex_config ("apache", config);
+       plugin_register_init ("apache", apache_init);
 } /* void module_register */
 
 /* vim: set sw=8 noet fdm=marker : */
index eb1d14f..b241a9f 100644 (file)
@@ -13,6 +13,7 @@ collectd-unixsock - Documentation of collectd's C<unixsock plugin>
     SocketFile "/path/to/socket"
     SocketGroup "collectd"
     SocketPerms "0770"
+    DeleteSocket false
   </Plugin>
 
 =head1 DESCRIPTION
index 52904a2..21de844 100644 (file)
 #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb
 #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
 #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann
+#@BUILD_PLUGIN_WRITE_TSDB_TRUE@LoadPlugin write_tsdb
 #@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms
 #@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc
 
 #  </View>
 #</Plugin>
 
-#<Plugin cgroup>
+#<Plugin cgroups>
 #  CGroup "libvirt"
 #  IgnoreSelected false
 #</Plugin>
 #       Attribute "foo" "bar"
 #</Plugin>
 
+#<Plugin write_tsdb>
+#      <Node>
+#              Host "localhost"
+#              Port "4242"
+#              HostTags "status=production"
+#              StoreRates false
+#              AlwaysAppendDS false
+#      </Node>
+#</Plugin>
+
 ##############################################################################
 # Filter configuration                                                       #
 #----------------------------------------------------------------------------#
index 31e2e5a..bba9055 100644 (file)
@@ -494,6 +494,8 @@ possibly filtering or messages.
  #   StoreRates false
  #   GraphitePrefix "collectd."
  #   GraphiteEscapeChar "_"
+ #   GraphiteSeparateInstances false
+ #   GraphiteAlwaysAppendDS false
    </Publish>
 
    # Receive values from an AMQP broker
@@ -647,6 +649,19 @@ In I<Graphite> metric name, dots are used as separators between different
 metric parts (host, plugin, type).
 Default is "_" (I<Underscore>).
 
+=item B<GraphiteSeparateInstances> B<true>|B<false>
+
+If set to B<true>, the plugin instance and type instance will be in their own
+path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+default), the plugin and plugin instance (and likewise the type and type
+instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
 =back
 
 =head2 Plugin C<apache>
@@ -6512,6 +6527,59 @@ instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
 
 =item B<AlwaysAppendDS> B<false>|B<true>
 
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=back
+
+=head2 Plugin C<write_tsdb>
+
+The C<write_tsdb> plugin writes data to I<OpenTSDB>, a scalable open-source
+time series database. The plugin connects to a I<TSD>, a masterless, no shared
+state daemon that ingests metrics and stores them in HBase. The plugin uses
+I<TCP> over the "line based" protocol with a default port 4242. The data will
+be sent in blocks of at most 1428 bytes to minimize the number of network
+packets.
+
+Synopsis:
+
+ <Plugin write_tsdb>
+   <Node "example">
+     Host "tsd-1.my.domain"
+     Port "4242"
+     HostTags "status=production"
+   </Node>
+ </Plugin>
+
+The configuration consists of one or more E<lt>B<Node>E<nbsp>I<Name>E<gt>
+blocks. Inside the B<Node> blocks, the following options are recognized:
+
+=over 4
+
+=item B<Host> I<Address>
+
+Hostname or address to connect to. Defaults to C<localhost>.
+
+=item B<Port> I<Service>
+
+Service name or port number to connect to. Defaults to C<4242>.
+
+
+=item B<HostTags> I<String>
+
+When set, I<HostTags> is added to the end of the metric. It is intended to be
+used for name=value pairs that the TSD will tag the metric with. Dots and
+whitespace are I<not> escaped in this string.
+
+=item B<StoreRates> B<false>|B<true>
+
+If set to B<true>, convert counter values to rates. If set to B<false>
+(the default) counter values are stored as is, as an increasing
+integer number.
+
+=item B<AlwaysAppendDS> B<false>|B<true>
+
 If set the B<true>, append the name of the I<Data Source> (DS) to the "metric"
 identifier. If set to B<false> (the default), this is only done when there is
 more than one DS.
@@ -7340,19 +7408,36 @@ Available options:
 =item B<Plugin> I<Name>
 
 Name of the write plugin to which the data should be sent. This option may be
-given multiple times to send the data to more than one write plugin.
+given multiple times to send the data to more than one write plugin. If the
+plugin supports multiple instances, the plugin's instance(s) must also be
+specified.
 
 =back
 
 If no plugin is explicitly specified, the values will be sent to all available
 write plugins.
 
-Example:
+Single-instance plugin example:
 
  <Target "write">
    Plugin "rrdtool"
  </Target>
 
+Multi-instance plugin example:
+
+ <Plugin "write_graphite">
+   <Node "foo">
+   ...
+   </Node>
+   <Node "bar">
+   ...
+   </Node>
+ </Plugin>
+  ...
+ <Target "write">
+   Plugin "write_graphite/foo"
+ </Target>
+
 =item B<jump>
 
 Starts processing the rules of another chain, see L<"Flow control"> above. If
index 93c1ca1..8691d3e 100644 (file)
@@ -1073,9 +1073,9 @@ int parse_value (const char *value_orig, value_t *ret_value, int ds_type)
   }
 
   if (value == endptr) {
-    sfree (value);
     ERROR ("parse_value: Failed to parse string as %s: %s.",
         DS_TYPE_TO_STRING (ds_type), value);
+    sfree (value);
     return -1;
   }
   else if ((NULL != endptr) && ('\0' != *endptr))
index 855681b..d2a307d 100644 (file)
@@ -481,6 +481,12 @@ static int cf_ci_replace_child (oconfig_item_t *dst, oconfig_item_t *src,
 
        /* Resize the memory containing the children to be big enough to hold
         * all children. */
+       if (dst->children_num + src->children_num - 1 == 0)
+       {
+               dst->children_num = 0;
+               return (0);
+       }
+
        temp = (oconfig_item_t *) realloc (dst->children,
                        sizeof (oconfig_item_t)
                        * (dst->children_num + src->children_num - 1));
@@ -595,7 +601,8 @@ static int cf_include_all (oconfig_item_t *root, int depth)
                        return (-1);
 
                /* Now replace the i'th child in `root' with `new'. */
-               cf_ci_replace_child (root, new, i);
+               if (cf_ci_replace_child (root, new, i) < 0)
+                       return (-1);
 
                /* ... and go back to the new i'th child. */
                --i;
index 3e7c5a5..f605c07 100644 (file)
@@ -579,6 +579,7 @@ static int cc_init (void) /* {{{ */
     INFO ("curl plugin: No pages have been defined.");
     return (-1);
   }
+  curl_global_init (CURL_GLOBAL_SSL);
   return (0);
 } /* }}} int cc_init */
 
index 6a01590..a84cba0 100644 (file)
@@ -726,7 +726,7 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */
   if (status == 0)
   {
     user_data_t ud;
-    char cb_name[DATA_MAX_NAME_LEN];
+    char *cb_name;
     struct timespec interval = { 0, 0 };
 
     CDTIME_T_TO_TIMESPEC (db->interval, &interval);
@@ -741,12 +741,13 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */
     ud.data = (void *) db;
     ud.free_func = cj_free;
 
-    ssnprintf (cb_name, sizeof (cb_name), "curl_json-%s-%s",
+    cb_name = ssnprintf_alloc ("curl_json-%s-%s",
                db->instance, db->url ? db->url : db->sock);
 
     plugin_register_complex_read (/* group = */ NULL, cb_name, cj_read,
                                   /* interval = */ (db->interval > 0) ? &interval : NULL,
                                   &ud);
+    sfree (cb_name);
   }
   else
   {
@@ -975,9 +976,18 @@ static int cj_read (user_data_t *ud) /* {{{ */
   return cj_perform (db);
 } /* }}} int cj_read */
 
+static int cj_init (void) /* {{{ */
+{
+  /* Call this while collectd is still single-threaded to avoid
+   * initialization issues in libgcrypt. */
+  curl_global_init (CURL_GLOBAL_SSL);
+  return (0);
+} /* }}} int cj_init */
+
 void module_register (void)
 {
   plugin_register_complex_config ("curl_json", cj_config);
+  plugin_register_init ("curl_json", cj_init);
 } /* void module_register */
 
 /* vim: set sw=2 sts=2 et fdm=marker : */
index 6d36d29..a743753 100644 (file)
@@ -386,7 +386,7 @@ static int cx_handle_instance_xpath (xmlXPathContextPtr xpath_ctx, /* {{{ */
   /* If the base xpath returns more than one block, the result is assumed to be
    * a table. The `Instance' option is not optional in this case. Check for the
    * condition and inform the user. */
-  if (is_table && (vl->type_instance == NULL))
+  if (is_table)
   {
     WARNING ("curl_xml plugin: "
         "Base-XPath %s is a table (more than one result was returned), "
@@ -1042,9 +1042,18 @@ static int cx_config (oconfig_item_t *ci) /* {{{ */
   return (0);
 } /* }}} int cx_config */
 
+static int cx_init (void) /* {{{ */
+{
+  /* Call this while collectd is still single-threaded to avoid
+   * initialization issues in libgcrypt. */
+  curl_global_init (CURL_GLOBAL_SSL);
+  return (0);
+} /* }}} int cx_init */
+
 void module_register (void)
 {
   plugin_register_complex_config ("curl_xml", cx_config);
+  plugin_register_init ("curl_xml", cx_init);
 } /* void module_register */
 
 /* vim: set sw=2 sts=2 et fdm=marker : */
index d56c07f..cb6844b 100644 (file)
@@ -744,8 +744,8 @@ static void *exec_notification_one (void *arg) /* {{{ */
 
   fprintf (fh,
       "Severity: %s\n"
-      "Time: %.3f\n",
-      severity, CDTIME_T_TO_DOUBLE (n->time));
+      "Time: %u\n",
+      severity, (unsigned int)CDTIME_T_TO_TIME_T(n->time));
 
   /* Print the optional fields */
   if (strlen (n->host) > 0)
index 9e24542..82d7f6f 100644 (file)
--- a/src/lvm.c
+++ b/src/lvm.c
@@ -125,6 +125,10 @@ static void vg_read(vg_t vg, char const *vg_name)
     }
 
     dm_list_iterate_items(lvl, lvs) {
+         lvm_submit(vg_name, lvm_lv_get_name(lvl->lv), lvm_lv_get_size(lvl->lv));
+    }
+
+    dm_list_iterate_items(lvl, lvs) {
         name = lvm_lv_get_name(lvl->lv);
         attrs = get_lv_property_string(lvl->lv, "lv_attr");
         size = lvm_lv_get_size(lvl->lv);
index 5769da7..b191983 100644 (file)
@@ -501,8 +501,15 @@ static void network_init_gcrypt (void) /* {{{ */
   if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P))
     return;
 
-  gcry_check_version (NULL); /* before calling any other functions */
+ /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html
+  * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS
+  * *before* initalizing Libgcrypt with gcry_check_version(), which itself must
+  * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P
+  * above doesn't count, as it doesn't implicitly initalize Libgcrypt.
+  *
+  * tl;dr: keep all these gry_* statements in this exact order please. */
   gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+  gcry_check_version (NULL);
   gcry_control (GCRYCTL_INIT_SECMEM, 32768);
   gcry_control (GCRYCTL_INITIALIZATION_FINISHED);
 } /* }}} void network_init_gcrypt */
index 210e6f1..5601d29 100644 (file)
@@ -223,7 +223,7 @@ static int pagesize;
 int     getprocs64 (void *procsinfo, int sizproc, void *fdsinfo, int sizfd, pid_t *index, int count);
 int     getthrds64( pid_t, void *, int, tid64_t *, int );
 #endif
-int getargs (struct procentry64 *processBuffer, int bufferLen, char *argsBuffer, int argsLen);
+int getargs (void *processBuffer, int bufferLen, char *argsBuffer, int argsLen);
 #endif /* HAVE_PROCINFO_H */
 
 /* put name of process from config to list_head_g tree
index 307af17..4a658d0 100644 (file)
@@ -767,7 +767,7 @@ static void Values_dealloc(PyObject *self) {
 }
 
 static PyMemberDef Values_members[] = {
-       {"interval", T_INT, offsetof(Values, interval), 0, interval_doc},
+       {"interval", T_DOUBLE, offsetof(Values, interval), 0, interval_doc},
        {"values", T_OBJECT_EX, offsetof(Values, values), 0, values_doc},
        {"meta", T_OBJECT_EX, offsetof(Values, meta), 0, meta_doc},
        {NULL}
index 3be2ae2..7d6e0a1 100644 (file)
@@ -1618,6 +1618,10 @@ static int csnmp_read_table (host_definition_t *host, data_definition_t *data)
     snmp_free_pdu (res);
   res = NULL;
 
+  if (req != NULL)
+    snmp_free_pdu (req);
+  req = NULL;
+
   if (status == 0)
     csnmp_dispatch_table (host, data, instance_list_head, value_list_head);
 
index 72b8e2b..0885e23 100644 (file)
@@ -255,19 +255,35 @@ static int statsd_handle_gauge (char const *name, /* {{{ */
 } /* }}} int statsd_handle_gauge */
 
 static int statsd_handle_timer (char const *name, /* {{{ */
-    char const *value_str)
+    char const *value_str,
+    char const *extra)
 {
   statsd_metric_t *metric;
   value_t value_ms;
+  value_t scale;
   cdtime_t value;
   int status;
 
+  if ((extra != NULL) && (extra[0] != '@'))
+    return (-1);
+
+  scale.gauge = 1.0;
+  if (extra != NULL)
+  {
+    status = statsd_parse_value (extra + 1, &scale);
+    if (status != 0)
+      return (status);
+
+    if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
+      return (-1);
+  }
+
   value_ms.derive = 0;
   status = statsd_parse_value (value_str, &value_ms);
   if (status != 0)
     return (status);
 
-  value = MS_TO_CDTIME_T (value_ms.gauge);
+  value = MS_TO_CDTIME_T (value_ms.gauge / scale.gauge);
 
   pthread_mutex_lock (&metrics_lock);
 
@@ -377,15 +393,15 @@ static int statsd_parse_line (char *buffer) /* {{{ */
 
   if (strcmp ("c", type) == 0)
     return (statsd_handle_counter (name, value, extra));
+  else if (strcmp ("ms", type) == 0)
+    return (statsd_handle_timer (name, value, extra));
 
-  /* extra is only valid for counters */
+  /* extra is only valid for counters and timers */
   if (extra != NULL)
     return (-1);
 
   if (strcmp ("g", type) == 0)
     return (statsd_handle_gauge (name, value));
-  else if (strcmp ("ms", type) == 0)
-    return (statsd_handle_timer (name, value));
   else if (strcmp ("s", type) == 0)
     return (statsd_handle_set (name, value));
   else
index 80435db..5a04231 100644 (file)
@@ -74,7 +74,9 @@
 /* sys/socket.h is necessary to compile when using netlink on older systems. */
 # include <sys/socket.h>
 # include <linux/netlink.h>
+#if HAVE_LINUX_INET_DIAG_H
 # include <linux/inet_diag.h>
+#endif
 # include <sys/socket.h>
 # include <arpa/inet.h>
 /* #endif KERNEL_LINUX */
 #endif /* KERNEL_AIX */
 
 #if KERNEL_LINUX
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
 struct nlreq {
   struct nlmsghdr nlh;
   struct inet_diag_req r;
 };
+#endif
 
 static const char *tcp_state[] =
 {
@@ -276,7 +280,12 @@ static int port_collect_listening = 0;
 static port_entry_t *port_list_head = NULL;
 
 #if KERNEL_LINUX
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
+/* This depends on linux inet_diag_req because if this structure is missing,
+ * sequence_number is useless and we get a compilation warning.
+ */
 static uint32_t sequence_number = 0;
+#endif
 
 enum
 {
@@ -446,6 +455,7 @@ static int conn_handle_ports (uint16_t port_local, uint16_t port_remote, uint8_t
  * zero on other errors. */
 static int conn_read_netlink (void)
 {
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
   int fd;
   struct sockaddr_nl nladdr;
   struct nlreq req;
@@ -574,6 +584,9 @@ static int conn_read_netlink (void)
 
   /* Not reached because the while() loop above handles the exit condition. */
   return (0);
+#else
+  return (1);
+#endif /* HAVE_STRUCT_LINUX_INET_DIAG_REQ */
 } /* int conn_read_netlink */
 
 static int conn_handle_line (char *buffer)
index 01d33ff..75c0206 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "collectd.h"
 
+#include <pthread.h>
 #include <regex.h>
 
 #include "common.h"
@@ -86,6 +87,7 @@ struct user_obj_s
 
 struct user_class_s
 {
+  pthread_mutex_t lock;
   void *user_class;
   identifier_match_t match;
   user_obj_t *user_obj_list; /* list of user_obj */
@@ -191,6 +193,7 @@ static int lu_copy_ident_to_match (identifier_match_t *match, /* {{{ */
   return (0);
 } /* }}} int lu_copy_ident_to_match */
 
+/* user_class->lock must be held when calling this function */
 static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
     data_set_t const *ds, value_list_t const *vl,
     user_class_t *user_class)
@@ -245,6 +248,7 @@ static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
   return (user_obj);
 } /* }}} void *lu_create_user_obj */
 
+/* user_class->lock must be held when calling this function */
 static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
     value_list_t const *vl)
 {
@@ -294,14 +298,17 @@ static int lu_handle_user_class (lookup_t *obj, /* {{{ */
       || !lu_part_matches (&user_class->match.host, vl->host))
     return (1);
 
+  pthread_mutex_lock (&user_class->lock);
   user_obj = lu_find_user_obj (user_class, vl);
   if (user_obj == NULL)
   {
     /* call lookup_class_callback_t() and insert into the list of user objects. */
     user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+    pthread_mutex_unlock (&user_class->lock);
     if (user_obj == NULL)
       return (-1);
   }
+  pthread_mutex_unlock (&user_class->lock);
 
   status = obj->cb_user_obj (ds, vl,
       user_class->user_class, user_obj->user_obj);
@@ -402,7 +409,7 @@ static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
   identifier_match_t const *match = &user_class_list->entry.match;
 
   /* Lookup user_class_list from the per-plugin structure. If this is the first
-   * user_class to be added, the blocks return immediately. Otherwise they will
+   * user_class to be added, the block returns immediately. Otherwise they will
    * set "ptr" to non-NULL. */
   if (match->plugin.is_regex)
   {
@@ -487,6 +494,7 @@ static void lu_destroy_user_class_list (lookup_t *obj, /* {{{ */
 
     lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
     user_class_list->entry.user_obj_list = NULL;
+    pthread_mutex_destroy (&user_class_list->entry.lock);
 
     sfree (user_class_list);
     user_class_list = next;
@@ -599,6 +607,7 @@ int lookup_add (lookup_t *obj, /* {{{ */
     return (ENOMEM);
   }
   memset (user_class_obj, 0, sizeof (*user_class_obj));
+  pthread_mutex_init (&user_class_obj->entry.lock, /* attr = */ NULL);
   user_class_obj->entry.user_class = user_class;
   lu_copy_ident_to_match (&user_class_obj->entry.match, ident, group_by);
   user_class_obj->entry.user_obj_list = NULL;
index aabca3e..eee5a1c 100644 (file)
@@ -601,9 +601,18 @@ static int wh_config (oconfig_item_t *ci) /* {{{ */
         return (0);
 } /* }}} int wh_config */
 
+static int wh_init (void) /* {{{ */
+{
+        /* Call this while collectd is still single-threaded to avoid
+         * initialization issues in libgcrypt. */
+        curl_global_init (CURL_GLOBAL_SSL);
+        return (0);
+} /* }}} int wh_init */
+
 void module_register (void) /* {{{ */
 {
         plugin_register_complex_config ("write_http", wh_config);
+        plugin_register_init ("write_http", wh_init);
 } /* }}} void module_register */
 
 /* vim: set fdm=marker sw=8 ts=8 tw=78 et : */
index 61c1391..c3740e1 100644 (file)
@@ -52,8 +52,8 @@ struct riemann_host {
 #define F_CONNECT               0x01
        uint8_t                  flags;
        pthread_mutex_t          lock;
-    _Bool            notifications;
-    _Bool            check_thresholds;
+       _Bool                    notifications;
+       _Bool                    check_thresholds;
        _Bool                    store_rates;
        _Bool                    always_append_ds;
        char                    *node;
@@ -98,7 +98,7 @@ static void riemann_event_protobuf_free (Event *event) /* {{{ */
        sfree (event);
 } /* }}} void riemann_event_protobuf_free */
 
-static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
+static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
 {
        size_t i;
 
@@ -320,7 +320,7 @@ static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
        return (strarray_add (&event->tags, &event->n_tags, tag));
 } /* }}} int riemann_event_add_tag */
 
-static int riemann_event_add_attribute (Event *event, /* {{{ */
+static int riemann_event_add_attribute(Event *event, /* {{{ */
                char const *key, char const *value)
 {
        Attribute **new_attributes;
@@ -353,7 +353,7 @@ static int riemann_event_add_attribute (Event *event, /* {{{ */
        return (0);
 } /* }}} int riemann_event_add_attribute */
 
-static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
+static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
                notification_t const *n)
 {
        Msg *msg;
@@ -460,7 +460,7 @@ static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{
        return (msg);
 } /* }}} Msg *riemann_notification_to_protobuf */
 
-static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
+static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
                data_set_t const *ds,
                value_list_t const *vl, size_t index,
                                         gauge_t const *rates,
@@ -485,22 +485,22 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{
        event->time = CDTIME_T_TO_TIME_T (vl->time);
        event->has_time = 1;
 
-    if (host->check_thresholds) {
-        switch (status) {
-        case STATE_OKAY:
-            event->state = strdup("ok");
-            break;
-        case STATE_ERROR:
-            event->state = strdup("critical");
-            break;
-        case STATE_WARNING:
-            event->state = strdup("warning");
-            break;
-        case STATE_MISSING:
-            event->state = strdup("unknown");
-            break;
-        }
-    }
+       if (host->check_thresholds) {
+               switch (status) {
+                       case STATE_OKAY:
+                               event->state = strdup("ok");
+                               break;
+                       case STATE_ERROR:
+                               event->state = strdup("critical");
+                               break;
+                       case STATE_WARNING:
+                               event->state = strdup("warning");
+                               break;
+                       case STATE_MISSING:
+                               event->state = strdup("unknown");
+                               break;
+               }
+       }
 
        ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
        event->ttl = (float) ttl;
@@ -570,19 +570,22 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{
                        /* host = */ "", vl->plugin, vl->plugin_instance,
                        vl->type, vl->type_instance);
        if (host->always_append_ds || (ds->ds_num > 1))
-               if (host->event_service_prefix == NULL || host->event_service_prefix[0] == '\0')
-                       ssnprintf (service_buffer, sizeof (service_buffer),
-                                       "%s/%s", &name_buffer[1], ds->ds[index].name);
+       {
+               if (host->event_service_prefix == NULL)
+                       ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s",
+                                       &name_buffer[1], ds->ds[index].name);
                else
-                       ssnprintf (service_buffer, sizeof (service_buffer),
-                                       "%s%s/%s", host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
+                       ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s",
+                                       host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
+       }
        else
-               if (host->event_service_prefix == NULL || host->event_service_prefix[0] == '\0')
-                       sstrncpy (service_buffer, &name_buffer[1],
-                                       sizeof (service_buffer));
+       {
+               if (host->event_service_prefix == NULL)
+                       sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer));
                else
-                       ssnprintf (service_buffer, sizeof (service_buffer),
-                                       "%s%s", host->event_service_prefix, &name_buffer[1]);
+                       ssnprintf (service_buffer, sizeof (service_buffer), "%s%s",
+                                       host->event_service_prefix, &name_buffer[1]);
+       }
 
        event->service = strdup (service_buffer);
 
@@ -654,8 +657,8 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{
        struct riemann_host     *host = ud->data;
        Msg                     *msg;
 
-    if (!host->notifications)
-        return 0;
+       if (!host->notifications)
+               return 0;
 
        msg = riemann_notification_to_protobuf (host, n);
        if (msg == NULL)
@@ -679,8 +682,8 @@ static int riemann_write(const data_set_t *ds, /* {{{ */
        struct riemann_host     *host = ud->data;
        Msg                     *msg;
 
-    if (host->check_thresholds)
-        write_riemann_threshold_check(ds, vl, statuses);
+       if (host->check_thresholds)
+               write_riemann_threshold_check(ds, vl, statuses);
        msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
        if (msg == NULL)
                return (-1);
@@ -734,8 +737,8 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
        host->reference_count = 1;
        host->node = NULL;
        host->service = NULL;
-    host->notifications = 1;
-    host->check_thresholds = 0;
+       host->notifications = 1;
+       host->check_thresholds = 0;
        host->store_rates = 1;
        host->always_append_ds = 0;
        host->use_tcp = 0;
@@ -760,18 +763,18 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
                        status = cf_util_get_string (child, &host->node);
                        if (status != 0)
                                break;
-        } else if (strcasecmp ("Notifications", child->key) == 0) {
-            status = cf_util_get_boolean(child, &host->notifications);
-            if (status != 0)
-                break;
-        } else if (strcasecmp ("EventServicePrefix", child->key) == 0) {
-            status = cf_util_get_string (child, &host->event_service_prefix);
-            if (status != 0)
-                break;
-        } else if (strcasecmp ("CheckThresholds", child->key) == 0) {
-            status = cf_util_get_boolean(child, &host->check_thresholds);
-            if (status != 0)
-                break;
+               } else if (strcasecmp ("Notifications", child->key) == 0) {
+                       status = cf_util_get_boolean(child, &host->notifications);
+                       if (status != 0)
+                               break;
+               } else if (strcasecmp ("EventServicePrefix", child->key) == 0) {
+                       status = cf_util_get_string (child, &host->event_service_prefix);
+                       if (status != 0)
+                               break;
+               } else if (strcasecmp ("CheckThresholds", child->key) == 0) {
+                       status = cf_util_get_boolean(child, &host->check_thresholds);
+                       if (status != 0)
+                               break;
                } else if (strcasecmp ("Port", child->key) == 0) {
                        status = cf_util_get_service (child, &host->service);
                        if (status != 0) {
@@ -941,7 +944,7 @@ static int riemann_config(oconfig_item_t *ci) /* {{{ */
                                 child->key);
                }
        }
-    return 0;
+       return (0);
 } /* }}} int riemann_config */
 
 void module_register(void)
diff --git a/src/write_tsdb.c b/src/write_tsdb.c
new file mode 100644 (file)
index 0000000..2eca77e
--- /dev/null
@@ -0,0 +1,647 @@
+/**
+ * collectd - src/write_tsdb.c
+ * Copyright (C) 2012       Pierre-Yves Ritschard
+ * Copyright (C) 2011       Scott Sanders
+ * Copyright (C) 2009       Paul Sadauskas
+ * Copyright (C) 2009       Doug MacEachern
+ * Copyright (C) 2007-2012  Florian octo Forster
+ * Copyright (C) 2013-2014  Limelight Networks, Inc.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Based on the write_graphite plugin. Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Doug MacEachern <dougm at hyperic.com>
+ *   Paul Sadauskas <psadauskas at gmail.com>
+ *   Scott Sanders <scott at jssjr.com>
+ *   Pierre-Yves Ritschard <pyr at spootnik.org>
+ * write_tsdb Authors:
+ *   Brett Hawn <bhawn at llnw.com>
+ *   Kevin Bowling <kbowling@llnw.com>
+ **/
+
+/* write_tsdb plugin configuation example
+ *
+ * <Plugin write_tsdb>
+ *   <Node>
+ *     Host "localhost"
+ *     Port "4242"
+ *     HostTags "status=production deviceclass=www"
+ *   </Node>
+ * </Plugin>
+ */
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "configfile.h"
+
+#include "utils_cache.h"
+#include "utils_parse_option.h"
+
+#include <pthread.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#ifndef WT_DEFAULT_NODE
+# define WT_DEFAULT_NODE "localhost"
+#endif
+
+#ifndef WT_DEFAULT_SERVICE
+# define WT_DEFAULT_SERVICE "4242"
+#endif
+
+#ifndef WT_DEFAULT_ESCAPE
+# define WT_DEFAULT_ESCAPE '.'
+#endif
+
+/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
+#ifndef WT_SEND_BUF_SIZE
+# define WT_SEND_BUF_SIZE 1428
+#endif
+
+/*
+ * Private variables
+ */
+struct wt_callback
+{
+    int      sock_fd;
+
+    char     *node;
+    char     *service;
+    char     *host_tags;
+
+    _Bool    store_rates;
+    _Bool    always_append_ds;
+
+    char     send_buf[WT_SEND_BUF_SIZE];
+    size_t   send_buf_free;
+    size_t   send_buf_fill;
+    cdtime_t send_buf_init_time;
+
+    pthread_mutex_t send_lock;
+};
+
+
+/*
+ * Functions
+ */
+static void wt_reset_buffer(struct wt_callback *cb)
+{
+    memset(cb->send_buf, 0, sizeof(cb->send_buf));
+    cb->send_buf_free = sizeof(cb->send_buf);
+    cb->send_buf_fill = 0;
+    cb->send_buf_init_time = cdtime();
+}
+
+static int wt_send_buffer(struct wt_callback *cb)
+{
+    ssize_t status = 0;
+
+    status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
+    if (status < 0)
+    {
+        char errbuf[1024];
+        ERROR("write_tsdb plugin: send failed with status %zi (%s)",
+              status, sstrerror (errno, errbuf, sizeof (errbuf)));
+
+        close (cb->sock_fd);
+        cb->sock_fd = -1;
+
+        return -1;
+    }
+
+    return 0;
+}
+
+/* NOTE: You must hold cb->send_lock when calling this function! */
+static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb)
+{
+    int status;
+
+    DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; "
+          "send_buf_fill = %zu;",
+          (double)timeout,
+          cb->send_buf_fill);
+
+    /* timeout == 0  => flush unconditionally */
+    if (timeout > 0)
+    {
+        cdtime_t now;
+
+        now = cdtime();
+        if ((cb->send_buf_init_time + timeout) > now)
+            return 0;
+    }
+
+    if (cb->send_buf_fill <= 0)
+    {
+        cb->send_buf_init_time = cdtime();
+        return 0;
+    }
+
+    status = wt_send_buffer(cb);
+    wt_reset_buffer(cb);
+
+    return status;
+}
+
+static int wt_callback_init(struct wt_callback *cb)
+{
+    struct addrinfo ai_hints;
+    struct addrinfo *ai_list;
+    struct addrinfo *ai_ptr;
+    int status;
+
+    const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
+    const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE;
+
+    if (cb->sock_fd > 0)
+        return 0;
+
+    memset(&ai_hints, 0, sizeof(ai_hints));
+#ifdef AI_ADDRCONFIG
+    ai_hints.ai_flags    |= AI_ADDRCONFIG;
+#endif
+    ai_hints.ai_family   = AF_UNSPEC;
+    ai_hints.ai_socktype = SOCK_STREAM;
+
+    ai_list = NULL;
+
+    status = getaddrinfo(node, service, &ai_hints, &ai_list);
+    if (status != 0)
+    {
+        ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s",
+              node, service, gai_strerror (status));
+        return -1;
+    }
+
+    assert (ai_list != NULL);
+    for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+    {
+        cb->sock_fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype,
+                             ai_ptr->ai_protocol);
+        if (cb->sock_fd < 0)
+            continue;
+
+        status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+        if (status != 0)
+        {
+            close(cb->sock_fd);
+            cb->sock_fd = -1;
+            continue;
+        }
+
+        break;
+    }
+
+    freeaddrinfo(ai_list);
+
+    if (cb->sock_fd < 0)
+    {
+        char errbuf[1024];
+        ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
+              "The last error was: %s", node, service,
+              sstrerror (errno, errbuf, sizeof(errbuf)));
+        close(cb->sock_fd);
+        return -1;
+    }
+
+    wt_reset_buffer(cb);
+
+    return 0;
+}
+
+static void wt_callback_free(void *data)
+{
+    struct wt_callback *cb;
+
+    if (data == NULL)
+        return;
+
+    cb = data;
+
+    pthread_mutex_lock(&cb->send_lock);
+
+    wt_flush_nolock(0, cb);
+
+    close(cb->sock_fd);
+    cb->sock_fd = -1;
+
+    sfree(cb->node);
+    sfree(cb->service);
+    sfree(cb->host_tags);
+
+    pthread_mutex_destroy(&cb->send_lock);
+
+    sfree(cb);
+}
+
+static int wt_flush(cdtime_t timeout,
+                    const char *identifier __attribute__((unused)),
+                    user_data_t *user_data)
+{
+    struct wt_callback *cb;
+    int status;
+
+    if (user_data == NULL)
+        return -EINVAL;
+
+    cb = user_data->data;
+
+    pthread_mutex_lock(&cb->send_lock);
+
+    if (cb->sock_fd < 0)
+    {
+        status = wt_callback_init(cb);
+        if (status != 0)
+        {
+            ERROR("write_tsdb plugin: wt_callback_init failed.");
+            pthread_mutex_unlock(&cb->send_lock);
+            return -1;
+        }
+    }
+
+    status = wt_flush_nolock(timeout, cb);
+    pthread_mutex_unlock(&cb->send_lock);
+
+    return status;
+}
+
+static int wt_format_values(char *ret, size_t ret_len,
+                            int ds_num, const data_set_t *ds,
+                            const value_list_t *vl,
+                            _Bool store_rates)
+{
+    size_t offset = 0;
+    int status;
+    gauge_t *rates = NULL;
+
+    assert(0 == strcmp (ds->type, vl->type));
+
+    memset(ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+        status = ssnprintf (ret + offset, ret_len - offset, \
+                            __VA_ARGS__); \
+        if (status < 1) \
+        { \
+            sfree(rates); \
+            return -1; \
+        } \
+        else if (((size_t) status) >= (ret_len - offset)) \
+        { \
+            sfree(rates); \
+            return -1; \
+        } \
+        else \
+            offset += ((size_t) status); \
+} while (0)
+
+    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+        BUFFER_ADD("%f", vl->values[ds_num].gauge);
+    else if (store_rates)
+    {
+        if (rates == NULL)
+            rates = uc_get_rate (ds, vl);
+        if (rates == NULL)
+        {
+            WARNING("format_values: "
+                    "uc_get_rate failed.");
+            return -1;
+        }
+        BUFFER_ADD("%f", rates[ds_num]);
+    }
+    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+        BUFFER_ADD("%llu", vl->values[ds_num].counter);
+    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+        BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive);
+    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+        BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute);
+    else
+    {
+        ERROR("format_values plugin: Unknown data source type: %i",
+              ds->ds[ds_num].type);
+        sfree(rates);
+        return -1;
+    }
+
+#undef BUFFER_ADD
+
+    sfree(rates);
+    return 0;
+}
+
+static int wt_format_name(char *ret, int ret_len,
+                          const value_list_t *vl,
+                          const struct wt_callback *cb,
+                          const char *ds_name)
+{
+    int status;
+    char *temp = NULL;
+    char *prefix = "";
+    const char *meta_prefix = "tsdb_prefix";
+
+    if (vl->meta) {
+        status = meta_data_get_string(vl->meta, meta_prefix, &temp);
+        if (status == -ENOENT) {
+            /* defaults to empty string */
+        } else if (status < 0) {
+            sfree(temp);
+            return status;
+        } else {
+            prefix = temp;
+        }
+    }
+
+    if (ds_name != NULL) {
+        if (vl->plugin_instance[0] == '\0') {
+            ssnprintf(ret, ret_len, "%s%s.%s",
+                      prefix, vl->plugin, ds_name);
+        } else if (vl->type_instance == '\0') {
+            ssnprintf(ret, ret_len, "%s%s.%s.%s.%s",
+                      prefix, vl->plugin, vl->plugin_instance,
+                      vl->type_instance, ds_name);
+        } else {
+            ssnprintf(ret, ret_len, "%s%s.%s.%s.%s",
+                      prefix, vl->plugin, vl->plugin_instance, vl->type,
+                      ds_name);
+        }
+    } else if (vl->plugin_instance[0] == '\0') {
+        if (vl->type_instance[0] == '\0')
+            ssnprintf(ret, ret_len, "%s%s.%s",
+                      prefix, vl->plugin, vl->type);
+        else
+            ssnprintf(ret, ret_len, "%s%s.%s",
+                      prefix, vl->plugin, vl->type_instance);
+    } else if (vl->type_instance[0] == '\0') {
+        ssnprintf(ret, ret_len, "%s%s.%s.%s",
+                  prefix, vl->plugin, vl->plugin_instance, vl->type);
+    } else {
+        ssnprintf(ret, ret_len, "%s%s.%s.%s",
+                  prefix, vl->plugin, vl->plugin_instance, vl->type_instance);
+    }
+
+    sfree(temp);
+    return 0;
+}
+
+static int wt_send_message (const char* key, const char* value,
+                            cdtime_t time, struct wt_callback *cb,
+                            const char* host, meta_data_t *md)
+{
+    int status;
+    int message_len;
+    char *temp = NULL;
+    char *tags = "";
+    char message[1024];
+    char *host_tags = cb->host_tags ? cb->host_tags : "";
+    const char *meta_tsdb = "tsdb_tags";
+
+    /* skip if value is NaN */
+    if (value[0] == 'n')
+        return 0;
+
+    if (md) {
+        status = meta_data_get_string(md, meta_tsdb, &temp);
+        if (status == -ENOENT) {
+            /* defaults to empty string */
+        } else if (status < 0) {
+            ERROR("write_tsdb plugin: tags metadata get failure");
+            sfree(temp);
+            pthread_mutex_unlock(&cb->send_lock);
+            return status;
+        } else {
+            tags = temp;
+        }
+    }
+
+    message_len = ssnprintf (message,
+                             sizeof(message),
+                             "put %s %.0f %s fqdn=%s %s %s\r\n",
+                             key,
+                             CDTIME_T_TO_DOUBLE(time),
+                             value,
+                             host,
+                             tags,
+                             host_tags);
+
+    sfree(temp);
+
+    if (message_len >= sizeof(message)) {
+        ERROR("write_tsdb plugin: message buffer too small: "
+              "Need %d bytes.", message_len + 1);
+        return -1;
+    }
+
+    pthread_mutex_lock(&cb->send_lock);
+
+    if (cb->sock_fd < 0)
+    {
+        status = wt_callback_init(cb);
+        if (status != 0)
+        {
+            ERROR("write_tsdb plugin: wt_callback_init failed.");
+            pthread_mutex_unlock(&cb->send_lock);
+            return -1;
+        }
+    }
+
+    if (message_len >= cb->send_buf_free)
+    {
+        status = wt_flush_nolock(0, cb);
+        if (status != 0)
+        {
+            pthread_mutex_unlock(&cb->send_lock);
+            return status;
+        }
+    }
+
+    /* Assert that we have enough space for this message. */
+    assert(message_len < cb->send_buf_free);
+
+    /* `message_len + 1' because `message_len' does not include the
+     * trailing null byte. Neither does `send_buffer_fill'. */
+    memcpy(cb->send_buf + cb->send_buf_fill,
+            message, message_len + 1);
+    cb->send_buf_fill += message_len;
+    cb->send_buf_free -= message_len;
+
+    DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"",
+          cb->node,
+          cb->service,
+          cb->send_buf_fill, sizeof(cb->send_buf),
+          100.0 * ((double) cb->send_buf_fill) /
+          ((double) sizeof(cb->send_buf)),
+          message);
+
+    pthread_mutex_unlock(&cb->send_lock);
+
+    return 0;
+}
+
+static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
+                             struct wt_callback *cb)
+{
+    char key[10*DATA_MAX_NAME_LEN];
+    char values[512];
+
+    int status, i;
+
+    if (0 != strcmp(ds->type, vl->type))
+    {
+        ERROR("write_tsdb plugin: DS type does not match "
+              "value list type");
+        return -1;
+    }
+
+    for (i = 0; i < ds->ds_num; i++)
+    {
+        const char *ds_name = NULL;
+
+        if (cb->always_append_ds || (ds->ds_num > 1))
+            ds_name = ds->ds[i].name;
+
+        /* Copy the identifier to 'key' and escape it. */
+        status = wt_format_name(key, sizeof(key), vl, cb, ds_name);
+        if (status != 0)
+        {
+            ERROR("write_tsdb plugin: error with format_name");
+            return status;
+        }
+
+        escape_string(key, sizeof(key));
+        /* Convert the values to an ASCII representation and put that into
+         * 'values'. */
+        status = wt_format_values(values, sizeof(values), i, ds, vl,
+                                  cb->store_rates);
+        if (status != 0)
+        {
+            ERROR("write_tsdb plugin: error with "
+                  "wt_format_values");
+            return status;
+        }
+
+        /* Send the message to tsdb */
+        status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta);
+        if (status != 0)
+        {
+            ERROR("write_tsdb plugin: error with "
+                  "wt_send_message");
+            return status;
+        }
+    }
+
+    return 0;
+}
+
+static int wt_write(const data_set_t *ds, const value_list_t *vl,
+                    user_data_t *user_data)
+{
+    struct wt_callback *cb;
+    int status;
+
+    if (user_data == NULL)
+        return EINVAL;
+
+    cb = user_data->data;
+
+    status = wt_write_messages(ds, vl, cb);
+
+    return status;
+}
+
+static int wt_config_tsd(oconfig_item_t *ci)
+{
+    struct wt_callback *cb;
+    user_data_t user_data;
+    char callback_name[DATA_MAX_NAME_LEN];
+    int i;
+
+    cb = malloc(sizeof(*cb));
+    if (cb == NULL)
+    {
+        ERROR("write_tsdb plugin: malloc failed.");
+        return -1;
+    }
+    memset(cb, 0, sizeof(*cb));
+    cb->sock_fd = -1;
+    cb->node = NULL;
+    cb->service = NULL;
+    cb->host_tags = NULL;
+    cb->store_rates = 0;
+
+    pthread_mutex_init (&cb->send_lock, NULL);
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp("Host", child->key) == 0)
+            cf_util_get_string(child, &cb->node);
+        else if (strcasecmp("Port", child->key) == 0)
+            cf_util_get_service(child, &cb->service);
+        else if (strcasecmp("HostTags", child->key) == 0)
+            cf_util_get_string(child, &cb->host_tags);
+        else if (strcasecmp("StoreRates", child->key) == 0)
+            cf_util_get_boolean(child, &cb->store_rates);
+        else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
+            cf_util_get_boolean(child, &cb->always_append_ds);
+        else
+        {
+            ERROR("write_tsdb plugin: Invalid configuration "
+                  "option: %s.", child->key);
+        }
+    }
+
+    ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
+              cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
+              cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
+
+    memset(&user_data, 0, sizeof(user_data));
+    user_data.data = cb;
+    user_data.free_func = wt_callback_free;
+    plugin_register_write(callback_name, wt_write, &user_data);
+
+    user_data.free_func = NULL;
+    plugin_register_flush(callback_name, wt_flush, &user_data);
+
+    return 0;
+}
+
+static int wt_config(oconfig_item_t *ci)
+{
+    int i;
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp("Node", child->key) == 0)
+            wt_config_tsd(child);
+        else
+        {
+            ERROR("write_tsdb plugin: Invalid configuration "
+                  "option: %s.", child->key);
+        }
+    }
+
+    return 0;
+}
+
+void module_register(void)
+{
+    plugin_register_complex_config("write_tsdb", wt_config);
+}
+
+/* vim: set sw=4 ts=4 sts=4 tw=78 et : */
index 7fbc867..6b5e40e 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/env bash
 
-DEFAULT_VERSION="5.4.0.git"
+DEFAULT_VERSION="5.4.1.git"
 
 VERSION="`git describe 2> /dev/null | grep collectd | sed -e 's/^collectd-//'`"