- Initial author.
Sebastian "tokkee" Harl <sh at tokkee.org>
- - Bugfixes and enhancments in many places all around the project.
+ - Bugfixes and enhancements in many places all around the project.
- perl plugin.
- users plugin.
- vserver plugin.
Bert Vermeulen <bert at biot.com>
- sigrok plugin
+Brett Hawn <bhawn at llnw.com>
+ - write_tsdb plugin for http://opentsdb.net/
+
Bruno Prémont <bonbons at linux-vserver.org>
- BIND plugin.
- - Many bugreports and -fixes in various plugins,
+ - Many bug reports and -fixes in various plugins,
especially a nasty bug in the network plugin.
- Wireshark dissector.
Jérôme Renard <jerome.renard at gmail.com>
- varnish plugin.
+Kevin Bowling <kbowling at llnw.com>
+ - write_tsdb plugin for http://opentsdb.net/
+
Kris Nielander <nielander at fox-it.com>
- tail_csv plugin.
+2014-01-26, Version 5.4.1
+ * amqp plugin: Add support for RabbitMQ 0.4.x to avoid compiler
+ warnings. Thanks to Sebastian Harl for implementing this.
+ * apache / network plugins: Improved initialization order hopefully
+ resolved gcrypt initialization problems.
+ * aquaero plugin: The type used to submit fan utilization was fixed.
+ Thanks to Alex Deymo for the patch.
+ * cgroups plugin: A small memory leak was fixed. Checking the existence
+ of a mount option without a value was fixed. More permissive parsing
+ of the cpuacct.stats file fixes support for some versions of Linux.
+ Thanks to Marc Fournier for bug reports and patches.
+ * curl plugin: Fix <Match> blocks without an instance. Thanks to
+ Alexander Golovko for reporting and Sebastian Harl for fixing this.
+ * curl_json plugin: Potentially invalid memory access has been
+ sanitized. Thanks to Jim Radford for his patch.
+ * interface plugin: Fix behavior under FreeBSD 10: Reporting of
+ per-address statistics caused duplicate updates to the same metric.
+ Thanks to demon / @trtrmitya for the patch.
+ * write_graphite plugin: Use TCP to connect to Graphite by default. The
+ default changed from TCP to UDP between 5.3.1 and 5.4.0, which is a
+ regression. Thanks to Marc Fournier for fixing this. Reconnect
+ behavior was improved. Thanks to Michael Hart for his patch.
+ * zfs_arc plugin: Collect "allocated" and "stolen" on FreeBSD only.
+
2013-08-18, Version 5.4.0
* collectd: The "LoadPlugin" config option no longer attempts to load
plugins twice. If more than one "LoadPlugin" statement or block is
* collectd is able to collect the following data:
- apache
- Apache server utilization: Number of bytes transfered, number of
+ Apache server utilization: Number of bytes transferred, number of
requests handled and detailed scoreboard statistics
- apcups
APC UPS Daemon: UPS charge, load, input/output/battery voltage, etc.
- apple_sensors
- Sensors in Macs running Mac OS X / Darwin: Temperature, fanspeed and
+ Sensors in Macs running Mac OS X / Darwin: Temperature, fan speed and
voltage sensors.
- aquaero
- Various sensors in the Aquaero 5 watercooling board made by Aquacomputer.
+ Various sensors in the Aquaero 5 water cooling board made by Aquacomputer.
- ascent
Statistics about Ascent, a free server for the game `World of Warcraft'.
- dns
DNS traffic: Query types, response codes, opcodes and traffic/octets
- transfered.
+ transferred.
- drbd
Collect individual drbd resource statistics.
Receive multicast traffic from Ganglia instances.
- hddtemp
- Harddisk temperatures using hddtempd.
+ Hard disk temperatures using hddtempd.
- interface
Interface traffic: Number of octets, packets and errors for each
interfaces that use the Atheros chipset and the MadWifi driver.
- mbmon
- Motherboard sensors: temperature, fanspeed and voltage information,
+ Motherboard sensors: temperature, fan speed and voltage information,
using mbmon(1).
- md
See collectd-python(5) for details.
- redis
- The redis plugin gathers information from a redis server, including:
+ The redis plugin gathers information from a Redis server, including:
uptime, used memory, total connections etc.
- routeros
clients and calculating rates and other aggregates out of these values.
- swap
- Pages swapped out onto harddisk or whatever is called `swap' by the OS..
+ Pages swapped out onto hard disk or whatever is called `swap' by the OS..
- table
Parse table-like structured files.
- tail
- Follows (tails) logfiles, parses them by lines and submits matched
+ Follows (tails) log files, parses them by lines and submits matched
values.
- tail_csv
can be configured to avoid logging send errors (especially useful when
using UDP).
+ - write_tsdb
+ Sends data OpenTSDB, a scalable no master, no shared state time series
+ database.
+
- write_http
Sends the values collected by collectd to a web-server using HTTP POST
requests. The transmitted data is either in a form understood by the
values are out of bounds. See collectd-threshold(5) for details.
- uuid
- Sets the hostname to an unique identifier. This is meant for setups
+ Sets the hostname to a unique identifier. This is meant for setups
where each client may migrate to another physical host, possibly going
through one or more name changes in the process.
time starting up again and again. With the exception of the exec plugin no
processes are forked. Caching in output plugins, such as the rrdtool and
network plugins, makes sure your resources are used efficiently. Also,
- since collectd is programmed multithreaded it benefits from hyperthreading
+ since collectd is programmed multithreaded it benefits from hyper-threading
and multicore processors and makes sure that the daemon isn't idle if only
one plugin waits for an IO-operation to complete.
---------
* collectd's configuration file can be found at `sysconfdir'/collectd.conf.
- Run `collectd -h' for a list of builtin defaults. See `collectd.conf(5)'
+ Run `collectd -h' for a list of built-in defaults. See `collectd.conf(5)'
for a list of options and a syntax description.
* When the `csv' or `rrdtool' plugins are loaded they'll write the values to
# include <sys/socket.h>
#endif
])
+AC_CHECK_HEADERS(linux/inet_diag.h, [], [],
+[
+#if HAVE_SYS_TYPES_H
+# include <sys/types.h>
+#endif
+#if HAVE_SYS_SOCKET_H
+# include <sys/socket.h>
+#endif
+#if HAVE_LINUX_INET_DIAG_H
+# include <linux/inet_diag.h>
+#endif
+])
AC_CHECK_HEADERS(linux/netdevice.h, [], [],
[
#if HAVE_SYS_TYPES_H
fi
# Check for htonll
-AC_MSG_CHECKING([if have htonll defined])
-
- have_htonll="no"
- AC_LINK_IFELSE([AC_LANG_PROGRAM(
+AC_CACHE_CHECK([if have htonll defined],
+ [c_cv_have_htonll],
+ AC_LINK_IFELSE([AC_LANG_PROGRAM(
[[[
#include <sys/types.h>
#include <netinet/in.h>
return htonll(0);
]]]
)],
- [
- have_htonll="yes"
- AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.])
- ])
-
-AC_MSG_RESULT([$have_htonll])
+ [c_cv_have_htonll="yes"],
+ [c_cv_have_htonll="no"]
+ )
+)
+if test "x$c_cv_have_htonll" = "xyes"
+then
+ AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.])
+fi
# Check for structures
AC_CHECK_MEMBERS([struct if_data.ifi_ibytes, struct if_data.ifi_opackets, struct if_data.ifi_ierrors],
#include <linux/if.h>
#include <linux/netdevice.h>
])
+AC_CHECK_MEMBERS([struct inet_diag_req.id, struct inet_diag_req.idiag_states],
+ [AC_DEFINE(HAVE_STRUCT_LINUX_INET_DIAG_REQ, 1, [Define if struct inet_diag_req exists and is usable.])],
+ [],
+ [
+ #include <linux/inet_diag.h>
+ ])
+
AC_CHECK_MEMBERS([struct ip_mreqn.imr_ifindex], [],
[],
[with_libhal="no"])
if test "x$with_libhal" = "xyes"; then
if test "x$PKG_CONFIG" != "x"; then
- BUILD_WITH_LIBHAL_CFLAGS="`pkg-config --cflags hal`"
- BUILD_WITH_LIBHAL_LIBS="`pkg-config --libs hal`"
+ BUILD_WITH_LIBHAL_CFLAGS="`$PKG_CONFIG --cflags hal`"
+ BUILD_WITH_LIBHAL_LIBS="`$PKG_CONFIG --libs hal`"
AC_SUBST(BUILD_WITH_LIBHAL_CFLAGS)
AC_SUBST(BUILD_WITH_LIBHAL_LIBS)
fi
CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"
AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+
+with_amqp_tcp_socket="no"
+if test "x$with_librabbitmq" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ SAVE_LIBS="$LIBS"
+ CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+ LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+ LIBS="-lrabbitmq"
+
+ AC_CHECK_HEADERS(amqp_tcp_socket.h amqp_socket.h)
+ AC_CHECK_FUNC(amqp_tcp_socket_new, [with_amqp_tcp_socket="yes"], [with_amqp_tcp_socket="no"])
+ if test "x$with_amqp_tcp_socket" = "xyes"
+ then
+ AC_DEFINE(HAVE_AMQP_TCP_SOCKET, 1,
+ [Define if librabbitmq provides the new TCP socket interface.])
+ fi
+
+ AC_CHECK_DECLS(amqp_socket_close,
+ [amqp_socket_close_decl="yes"], [amqp_socket_close_decl="no"],
+ [[
+#include <amqp.h>
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+ ]])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+ LIBS="$SAVE_LIBS"
+fi
# }}}
# --with-librdkafka {{{
with_libsigrok_ldflags="-L$withval/lib"
fi
fi
-],[])
+],[with_libsigrok="yes"])
# libsigrok has a glib dependency
if test "x$with_libsigrok" = "xyes"
then
- if test -z "m4_ifdef([AM_PATH_GLIB_2_0], [yes], [])"
- then
- with_libsigrok="no (glib not available)"
- else
- AM_PATH_GLIB_2_0([2.28.0],
- [with_libsigrok_cflags="$with_libsigrok_cflags $GLIB_CFLAGS"; with_libsigrok_ldflags="$with_libsigrok_ldflags $GLIB_LIBS"])
- fi
+m4_ifdef([AM_PATH_GLIB_2_0],
+ [
+ AM_PATH_GLIB_2_0([2.28.0],
+ [with_libsigrok_cflags="$with_libsigrok_cflags $GLIB_CFLAGS"; with_libsigrok_ldflags="$with_libsigrok_ldflags $GLIB_LIBS"])
+ ],
+ [
+ with_libsigrok="no (glib not available)"
+ ]
+)
fi
# libsigrok headers
if $PKG_CONFIG --exists tokyotyrant
then
with_libtokyotyrant_cppflags="$with_libtokyotyrant_cppflags `$PKG_CONFIG --cflags tokyotyrant`"
- with_libtokyotyrant_ldflags="$with_libtokyotyrant_ldflags `pkg-config --libs-only-L tokyotyrant`"
- with_libtokyotyrant_libs="$with_libtokyotyrant_libs `pkg-config --libs-only-l tokyotyrant`"
+ with_libtokyotyrant_ldflags="$with_libtokyotyrant_ldflags `$PKG_CONFIG --libs-only-L tokyotyrant`"
+ with_libtokyotyrant_libs="$with_libtokyotyrant_libs `$PKG_CONFIG --libs-only-l tokyotyrant`"
fi
fi
with_libvirt_ldflags=""
if test "x$PKG_CONFIG" != "x"
then
- pkg-config --exists 'libxml-2.0' 2>/dev/null
+ $PKG_CONFIG --exists 'libxml-2.0' 2>/dev/null
if test "$?" = "0"
then
with_libxml2="yes"
with_libxml2="no (pkg-config doesn't know libxml-2.0)"
fi
- pkg-config --exists libvirt 2>/dev/null
+ $PKG_CONFIG --exists libvirt 2>/dev/null
if test "$?" = "0"
then
with_libvirt="yes"
fi
if test "x$with_libxml2" = "xyes"
then
- with_libxml2_cflags="`pkg-config --cflags libxml-2.0`"
+ with_libxml2_cflags="`$PKG_CONFIG --cflags libxml-2.0`"
if test $? -ne 0
then
with_libxml2="no"
fi
- with_libxml2_ldflags="`pkg-config --libs libxml-2.0`"
+ with_libxml2_ldflags="`$PKG_CONFIG --libs libxml-2.0`"
if test $? -ne 0
then
with_libxml2="no"
fi
if test "x$with_libvirt" = "xyes"
then
- with_libvirt_cflags="`pkg-config --cflags libvirt`"
+ with_libvirt_cflags="`$PKG_CONFIG --cflags libvirt`"
if test $? -ne 0
then
with_libvirt="no"
fi
- with_libvirt_ldflags="`pkg-config --libs libvirt`"
+ with_libvirt_ldflags="`$PKG_CONFIG --libs libvirt`"
if test $? -ne 0
then
with_libvirt="no"
AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin])
AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin])
AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin])
+AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin])
AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics])
AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics])
LOAD_PLUGIN_LOGFILE="##"
fi
-if test "x$enable_logfile" = "xyes"
+if test "x$enable_log_logstash" = "xyes"
then
LOAD_PLUGIN_LOG_LOGSTASH="#"
else
write_mongodb . . . . $enable_write_mongodb
write_redis . . . . . $enable_write_redis
write_riemann . . . . $enable_write_riemann
+ write_tsdb . . . . . $enable_write_tsdb
xmms . . . . . . . . $enable_xmms
zfs_arc . . . . . . . $enable_zfs_arc
RRDTitle "Apache Traffic"
RRDVerticalLabel "Bytes/s"
RRDFormat "%5.1lf%s"
- Color count 0000ff
+ Color value 0000ff
</Type>
<Type apache_requests>
DataSources value
RRDTitle "Apache Traffic"
RRDVerticalLabel "Requests/s"
RRDFormat "%5.2lf"
- Color count 00d000
+ Color value 00d000
</Type>
<Type apache_scoreboard>
Module GenericStacked
RRDTitle "Frequency ({type_instance})"
RRDVerticalLabel "Hertz"
RRDFormat "%4.1lfHz"
- Color frequency a000a0
+ Color value a000a0
</Type>
<Type humidity>
DataSources value
RRDTitle "Percent ({type_instance})"
RRDVerticalLabel "Percent"
RRDFormat "%4.1lf%%"
- Color percent 0000ff
+ Color value 0000ff
</Type>
<Type ping>
DataSources value
RRDTitle "Users ({type_instance}) on {hostname}"
RRDVerticalLabel "Users"
RRDFormat "%.1lf"
- Color users 0000f0
+ Color value 0000f0
</Type>
<Type voltage>
DataSources value
%{?el6:%global _has_ip_vs_h 1}
%{?el6:%global _has_lvm2app_h 1}
%{?el6:%global _has_perl_extutils_embed 1}
+%{?el6:%global _has_libmodbus 1}
# plugins enabled by default
%define with_aggregation 0%{!?_without_aggregation:1}
%define with_memcached 0%{!?_without_memcached:1}
%define with_memory 0%{!?_without_memory:1}
%define with_multimeter 0%{!?_without_multimeter:1}
+%define with_modbus 0%{!?_without_modbus:0%{?_has_libmodbus}}
%define with_mysql 0%{!?_without_mysql:1}
%define with_netlink 0%{!?_without_netlink:1}
%define with_network 0%{!?_without_network:1}
consumption from Intel Many Integrated Core (MIC) CPUs.
%endif
+%if %{with_modbus}
+%package modbus
+Summary: modbus plugin for collectd
+Group: System Environment/Daemons
+Requires: %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: libmodbus-devel
+%description modbus
+The modbus plugin collects values from Modbus/TCP enabled devices
+%endif
+
%if %{with_mysql}
%package mysql
Summary: MySQL plugin for collectd
%define _with_multimeter --disable-multimeter
%endif
+%if %{with_modbus}
+%define _with_modbus --enable-modbus
+%else
+%define _with_modbus --disable-modbus
+%endif
+
%if %{with_mysql}
%define _with_mysql --enable-mysql
%else
- Enable cgroups, lvm and statsd plugins
- Enable (but don't build by default) mic, aquaero and sigrok plugins
+* Wed Aug 06 2014 Marc Fournier <marc.fournier@camptocamp.com> 5.3.1-2
+- Enabled modbus plugin
+
* Tue Aug 06 2013 Marc Fournier <marc.fournier@camptocamp.com> 5.3.1-1
- New upstream version
- Added RHEL5 support:
pkglib_LTLIBRARIES += iptables.la
iptables_la_SOURCES = iptables.c
iptables_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBIPTC_CPPFLAGS)
-iptables_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBIPTC_LDFLAGS)
-iptables_la_LIBADD = -liptc
+iptables_la_LDFLAGS = -module -avoid-version
+iptables_la_LIBADD = $(BUILD_WITH_LIBIPTC_LDFLAGS)
collectd_LDADD += "-dlopen" iptables.la
collectd_DEPENDENCIES += iptables.la
endif
collectd_DEPENDENCIES += write_riemann.la
endif
+if BUILD_PLUGIN_WRITE_TSDB
+pkglib_LTLIBRARIES += write_tsdb.la
+write_tsdb_la_SOURCES = write_tsdb.c
+write_tsdb_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" write_tsdb.la
+collectd_DEPENDENCIES += write_tsdb.la
+endif
+
if BUILD_PLUGIN_XMMS
pkglib_LTLIBRARIES += xmms.la
xmms_la_SOURCES = xmms.c
/* lookup_class_callback_t for utils_vl_lookup */
static void *agg_lookup_class_callback ( /* {{{ */
- __attribute__((unused)) data_set_t const *ds,
- value_list_t const *vl, void *user_class)
+ data_set_t const *ds, value_list_t const *vl, void *user_class)
{
return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
} /* }}} void *agg_class_callback */
#include <amqp.h>
#include <amqp_framing.h>
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+#ifdef HAVE_AMQP_TCP_SOCKET
+#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
+/* rabbitmq-c does not currently ship amqp_socket.h
+ * and, thus, does not define this function. */
+int amqp_socket_close(amqp_socket_t *);
+#endif
+#endif
+
/* Defines for the delivery mode. I have no idea why they're not defined by the
* library.. */
#define CAMQP_DM_VOLATILE 1
static int camqp_connect (camqp_config_t *conf) /* {{{ */
{
amqp_rpc_reply_t reply;
- int sockfd;
int status;
+#ifdef HAVE_AMQP_TCP_SOCKET
+ amqp_socket_t *socket;
+#else
+ int sockfd;
+#endif
if (conf->connection != NULL)
return (0);
return (ENOMEM);
}
+#ifdef HAVE_AMQP_TCP_SOCKET
+# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */
+ /* TODO: add support for SSL using amqp_ssl_socket_new
+ * and related functions */
+ socket = amqp_tcp_socket_new (conf->connection);
+ if (! socket)
+ {
+ ERROR ("amqp plugin: amqp_tcp_socket_new failed.");
+ amqp_destroy_connection (conf->connection);
+ conf->connection = NULL;
+ return (ENOMEM);
+ }
+
+ status = amqp_socket_open (socket, CONF(conf, host), conf->port);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ status *= -1;
+ ERROR ("amqp plugin: amqp_socket_open failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ amqp_destroy_connection (conf->connection);
+ conf->connection = NULL;
+ return (status);
+ }
+#else /* HAVE_AMQP_TCP_SOCKET */
+# define CLOSE_SOCKET() close(sockfd)
+ /* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket (CONF(conf, host), conf->port);
if (sockfd < 0)
{
return (status);
}
amqp_set_sockfd (conf->connection, sockfd);
+#endif
reply = amqp_login (conf->connection, CONF(conf, vhost),
/* channel max = */ 0,
ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
CONF(conf, vhost), CONF(conf, user));
amqp_destroy_connection (conf->connection);
- close (sockfd);
+ CLOSE_SOCKET ();
conf->connection = NULL;
return (1);
}
ERROR ("amqp plugin: amqp_channel_open failed.");
amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conf->connection);
- close(sockfd);
+ CLOSE_SOCKET ();
conf->connection = NULL;
return (1);
}
return (0);
} /* }}} int apache_read_host */
+static int apache_init (void) /* {{{ */
+{
+ /* Call this while collectd is still single-threaded to avoid
+ * initialization issues in libgcrypt. */
+ curl_global_init (CURL_GLOBAL_SSL);
+ return (0);
+} /* }}} int apache_init */
+
void module_register (void)
{
plugin_register_complex_config ("apache", config);
+ plugin_register_init ("apache", apache_init);
} /* void module_register */
/* vim: set sw=8 noet fdm=marker : */
SocketFile "/path/to/socket"
SocketGroup "collectd"
SocketPerms "0770"
+ DeleteSocket false
</Plugin>
=head1 DESCRIPTION
#@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb
#@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
#@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann
+#@BUILD_PLUGIN_WRITE_TSDB_TRUE@LoadPlugin write_tsdb
#@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms
#@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc
# </View>
#</Plugin>
-#<Plugin cgroup>
+#<Plugin cgroups>
# CGroup "libvirt"
# IgnoreSelected false
#</Plugin>
# Attribute "foo" "bar"
#</Plugin>
+#<Plugin write_tsdb>
+# <Node>
+# Host "localhost"
+# Port "4242"
+# HostTags "status=production"
+# StoreRates false
+# AlwaysAppendDS false
+# </Node>
+#</Plugin>
+
##############################################################################
# Filter configuration #
#----------------------------------------------------------------------------#
# StoreRates false
# GraphitePrefix "collectd."
# GraphiteEscapeChar "_"
+ # GraphiteSeparateInstances false
+ # GraphiteAlwaysAppendDS false
</Publish>
# Receive values from an AMQP broker
metric parts (host, plugin, type).
Default is "_" (I<Underscore>).
+=item B<GraphiteSeparateInstances> B<true>|B<false>
+
+If set to B<true>, the plugin instance and type instance will be in their own
+path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+default), the plugin and plugin instance (and likewise the type and type
+instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
=back
=head2 Plugin C<apache>
=item B<AlwaysAppendDS> B<false>|B<true>
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=back
+
+=head2 Plugin C<write_tsdb>
+
+The C<write_tsdb> plugin writes data to I<OpenTSDB>, a scalable open-source
+time series database. The plugin connects to a I<TSD>, a masterless, no shared
+state daemon that ingests metrics and stores them in HBase. The plugin uses
+I<TCP> over the "line based" protocol with a default port 4242. The data will
+be sent in blocks of at most 1428 bytes to minimize the number of network
+packets.
+
+Synopsis:
+
+ <Plugin write_tsdb>
+ <Node "example">
+ Host "tsd-1.my.domain"
+ Port "4242"
+ HostTags "status=production"
+ </Node>
+ </Plugin>
+
+The configuration consists of one or more E<lt>B<Node>E<nbsp>I<Name>E<gt>
+blocks. Inside the B<Node> blocks, the following options are recognized:
+
+=over 4
+
+=item B<Host> I<Address>
+
+Hostname or address to connect to. Defaults to C<localhost>.
+
+=item B<Port> I<Service>
+
+Service name or port number to connect to. Defaults to C<4242>.
+
+
+=item B<HostTags> I<String>
+
+When set, I<HostTags> is added to the end of the metric. It is intended to be
+used for name=value pairs that the TSD will tag the metric with. Dots and
+whitespace are I<not> escaped in this string.
+
+=item B<StoreRates> B<false>|B<true>
+
+If set to B<true>, convert counter values to rates. If set to B<false>
+(the default) counter values are stored as is, as an increasing
+integer number.
+
+=item B<AlwaysAppendDS> B<false>|B<true>
+
If set the B<true>, append the name of the I<Data Source> (DS) to the "metric"
identifier. If set to B<false> (the default), this is only done when there is
more than one DS.
=item B<Plugin> I<Name>
Name of the write plugin to which the data should be sent. This option may be
-given multiple times to send the data to more than one write plugin.
+given multiple times to send the data to more than one write plugin. If the
+plugin supports multiple instances, the plugin's instance(s) must also be
+specified.
=back
If no plugin is explicitly specified, the values will be sent to all available
write plugins.
-Example:
+Single-instance plugin example:
<Target "write">
Plugin "rrdtool"
</Target>
+Multi-instance plugin example:
+
+ <Plugin "write_graphite">
+ <Node "foo">
+ ...
+ </Node>
+ <Node "bar">
+ ...
+ </Node>
+ </Plugin>
+ ...
+ <Target "write">
+ Plugin "write_graphite/foo"
+ </Target>
+
=item B<jump>
Starts processing the rules of another chain, see L<"Flow control"> above. If
}
if (value == endptr) {
- sfree (value);
ERROR ("parse_value: Failed to parse string as %s: %s.",
DS_TYPE_TO_STRING (ds_type), value);
+ sfree (value);
return -1;
}
else if ((NULL != endptr) && ('\0' != *endptr))
/* Resize the memory containing the children to be big enough to hold
* all children. */
+ if (dst->children_num + src->children_num - 1 == 0)
+ {
+ dst->children_num = 0;
+ return (0);
+ }
+
temp = (oconfig_item_t *) realloc (dst->children,
sizeof (oconfig_item_t)
* (dst->children_num + src->children_num - 1));
return (-1);
/* Now replace the i'th child in `root' with `new'. */
- cf_ci_replace_child (root, new, i);
+ if (cf_ci_replace_child (root, new, i) < 0)
+ return (-1);
/* ... and go back to the new i'th child. */
--i;
INFO ("curl plugin: No pages have been defined.");
return (-1);
}
+ curl_global_init (CURL_GLOBAL_SSL);
return (0);
} /* }}} int cc_init */
if (status == 0)
{
user_data_t ud;
- char cb_name[DATA_MAX_NAME_LEN];
+ char *cb_name;
struct timespec interval = { 0, 0 };
CDTIME_T_TO_TIMESPEC (db->interval, &interval);
ud.data = (void *) db;
ud.free_func = cj_free;
- ssnprintf (cb_name, sizeof (cb_name), "curl_json-%s-%s",
+ cb_name = ssnprintf_alloc ("curl_json-%s-%s",
db->instance, db->url ? db->url : db->sock);
plugin_register_complex_read (/* group = */ NULL, cb_name, cj_read,
/* interval = */ (db->interval > 0) ? &interval : NULL,
&ud);
+ sfree (cb_name);
}
else
{
return cj_perform (db);
} /* }}} int cj_read */
+static int cj_init (void) /* {{{ */
+{
+ /* Call this while collectd is still single-threaded to avoid
+ * initialization issues in libgcrypt. */
+ curl_global_init (CURL_GLOBAL_SSL);
+ return (0);
+} /* }}} int cj_init */
+
void module_register (void)
{
plugin_register_complex_config ("curl_json", cj_config);
+ plugin_register_init ("curl_json", cj_init);
} /* void module_register */
/* vim: set sw=2 sts=2 et fdm=marker : */
/* If the base xpath returns more than one block, the result is assumed to be
* a table. The `Instance' option is not optional in this case. Check for the
* condition and inform the user. */
- if (is_table && (vl->type_instance == NULL))
+ if (is_table)
{
WARNING ("curl_xml plugin: "
"Base-XPath %s is a table (more than one result was returned), "
return (0);
} /* }}} int cx_config */
+static int cx_init (void) /* {{{ */
+{
+ /* Call this while collectd is still single-threaded to avoid
+ * initialization issues in libgcrypt. */
+ curl_global_init (CURL_GLOBAL_SSL);
+ return (0);
+} /* }}} int cx_init */
+
void module_register (void)
{
plugin_register_complex_config ("curl_xml", cx_config);
+ plugin_register_init ("curl_xml", cx_init);
} /* void module_register */
/* vim: set sw=2 sts=2 et fdm=marker : */
fprintf (fh,
"Severity: %s\n"
- "Time: %.3f\n",
- severity, CDTIME_T_TO_DOUBLE (n->time));
+ "Time: %u\n",
+ severity, (unsigned int)CDTIME_T_TO_TIME_T(n->time));
/* Print the optional fields */
if (strlen (n->host) > 0)
}
dm_list_iterate_items(lvl, lvs) {
+ lvm_submit(vg_name, lvm_lv_get_name(lvl->lv), lvm_lv_get_size(lvl->lv));
+ }
+
+ dm_list_iterate_items(lvl, lvs) {
name = lvm_lv_get_name(lvl->lv);
attrs = get_lv_property_string(lvl->lv, "lv_attr");
size = lvm_lv_get_size(lvl->lv);
if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P))
return;
- gcry_check_version (NULL); /* before calling any other functions */
+ /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html
+ * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS
+ * *before* initalizing Libgcrypt with gcry_check_version(), which itself must
+ * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P
+ * above doesn't count, as it doesn't implicitly initalize Libgcrypt.
+ *
+ * tl;dr: keep all these gry_* statements in this exact order please. */
gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+ gcry_check_version (NULL);
gcry_control (GCRYCTL_INIT_SECMEM, 32768);
gcry_control (GCRYCTL_INITIALIZATION_FINISHED);
} /* }}} void network_init_gcrypt */
int getprocs64 (void *procsinfo, int sizproc, void *fdsinfo, int sizfd, pid_t *index, int count);
int getthrds64( pid_t, void *, int, tid64_t *, int );
#endif
-int getargs (struct procentry64 *processBuffer, int bufferLen, char *argsBuffer, int argsLen);
+int getargs (void *processBuffer, int bufferLen, char *argsBuffer, int argsLen);
#endif /* HAVE_PROCINFO_H */
/* put name of process from config to list_head_g tree
}
static PyMemberDef Values_members[] = {
- {"interval", T_INT, offsetof(Values, interval), 0, interval_doc},
+ {"interval", T_DOUBLE, offsetof(Values, interval), 0, interval_doc},
{"values", T_OBJECT_EX, offsetof(Values, values), 0, values_doc},
{"meta", T_OBJECT_EX, offsetof(Values, meta), 0, meta_doc},
{NULL}
snmp_free_pdu (res);
res = NULL;
+ if (req != NULL)
+ snmp_free_pdu (req);
+ req = NULL;
+
if (status == 0)
csnmp_dispatch_table (host, data, instance_list_head, value_list_head);
} /* }}} int statsd_handle_gauge */
static int statsd_handle_timer (char const *name, /* {{{ */
- char const *value_str)
+ char const *value_str,
+ char const *extra)
{
statsd_metric_t *metric;
value_t value_ms;
+ value_t scale;
cdtime_t value;
int status;
+ if ((extra != NULL) && (extra[0] != '@'))
+ return (-1);
+
+ scale.gauge = 1.0;
+ if (extra != NULL)
+ {
+ status = statsd_parse_value (extra + 1, &scale);
+ if (status != 0)
+ return (status);
+
+ if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
+ return (-1);
+ }
+
value_ms.derive = 0;
status = statsd_parse_value (value_str, &value_ms);
if (status != 0)
return (status);
- value = MS_TO_CDTIME_T (value_ms.gauge);
+ value = MS_TO_CDTIME_T (value_ms.gauge / scale.gauge);
pthread_mutex_lock (&metrics_lock);
if (strcmp ("c", type) == 0)
return (statsd_handle_counter (name, value, extra));
+ else if (strcmp ("ms", type) == 0)
+ return (statsd_handle_timer (name, value, extra));
- /* extra is only valid for counters */
+ /* extra is only valid for counters and timers */
if (extra != NULL)
return (-1);
if (strcmp ("g", type) == 0)
return (statsd_handle_gauge (name, value));
- else if (strcmp ("ms", type) == 0)
- return (statsd_handle_timer (name, value));
else if (strcmp ("s", type) == 0)
return (statsd_handle_set (name, value));
else
/* sys/socket.h is necessary to compile when using netlink on older systems. */
# include <sys/socket.h>
# include <linux/netlink.h>
+#if HAVE_LINUX_INET_DIAG_H
# include <linux/inet_diag.h>
+#endif
# include <sys/socket.h>
# include <arpa/inet.h>
/* #endif KERNEL_LINUX */
#endif /* KERNEL_AIX */
#if KERNEL_LINUX
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
struct nlreq {
struct nlmsghdr nlh;
struct inet_diag_req r;
};
+#endif
static const char *tcp_state[] =
{
static port_entry_t *port_list_head = NULL;
#if KERNEL_LINUX
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
+/* This depends on linux inet_diag_req because if this structure is missing,
+ * sequence_number is useless and we get a compilation warning.
+ */
static uint32_t sequence_number = 0;
+#endif
enum
{
* zero on other errors. */
static int conn_read_netlink (void)
{
+#if HAVE_STRUCT_LINUX_INET_DIAG_REQ
int fd;
struct sockaddr_nl nladdr;
struct nlreq req;
/* Not reached because the while() loop above handles the exit condition. */
return (0);
+#else
+ return (1);
+#endif /* HAVE_STRUCT_LINUX_INET_DIAG_REQ */
} /* int conn_read_netlink */
static int conn_handle_line (char *buffer)
#include "collectd.h"
+#include <pthread.h>
#include <regex.h>
#include "common.h"
struct user_class_s
{
+ pthread_mutex_t lock;
void *user_class;
identifier_match_t match;
user_obj_t *user_obj_list; /* list of user_obj */
return (0);
} /* }}} int lu_copy_ident_to_match */
+/* user_class->lock must be held when calling this function */
static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
data_set_t const *ds, value_list_t const *vl,
user_class_t *user_class)
return (user_obj);
} /* }}} void *lu_create_user_obj */
+/* user_class->lock must be held when calling this function */
static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
value_list_t const *vl)
{
|| !lu_part_matches (&user_class->match.host, vl->host))
return (1);
+ pthread_mutex_lock (&user_class->lock);
user_obj = lu_find_user_obj (user_class, vl);
if (user_obj == NULL)
{
/* call lookup_class_callback_t() and insert into the list of user objects. */
user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+ pthread_mutex_unlock (&user_class->lock);
if (user_obj == NULL)
return (-1);
}
+ pthread_mutex_unlock (&user_class->lock);
status = obj->cb_user_obj (ds, vl,
user_class->user_class, user_obj->user_obj);
identifier_match_t const *match = &user_class_list->entry.match;
/* Lookup user_class_list from the per-plugin structure. If this is the first
- * user_class to be added, the blocks return immediately. Otherwise they will
+ * user_class to be added, the block returns immediately. Otherwise they will
* set "ptr" to non-NULL. */
if (match->plugin.is_regex)
{
lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
user_class_list->entry.user_obj_list = NULL;
+ pthread_mutex_destroy (&user_class_list->entry.lock);
sfree (user_class_list);
user_class_list = next;
return (ENOMEM);
}
memset (user_class_obj, 0, sizeof (*user_class_obj));
+ pthread_mutex_init (&user_class_obj->entry.lock, /* attr = */ NULL);
user_class_obj->entry.user_class = user_class;
lu_copy_ident_to_match (&user_class_obj->entry.match, ident, group_by);
user_class_obj->entry.user_obj_list = NULL;
return (0);
} /* }}} int wh_config */
+static int wh_init (void) /* {{{ */
+{
+ /* Call this while collectd is still single-threaded to avoid
+ * initialization issues in libgcrypt. */
+ curl_global_init (CURL_GLOBAL_SSL);
+ return (0);
+} /* }}} int wh_init */
+
void module_register (void) /* {{{ */
{
plugin_register_complex_config ("write_http", wh_config);
+ plugin_register_init ("write_http", wh_init);
} /* }}} void module_register */
/* vim: set fdm=marker sw=8 ts=8 tw=78 et : */
#define F_CONNECT 0x01
uint8_t flags;
pthread_mutex_t lock;
- _Bool notifications;
- _Bool check_thresholds;
+ _Bool notifications;
+ _Bool check_thresholds;
_Bool store_rates;
_Bool always_append_ds;
char *node;
sfree (event);
} /* }}} void riemann_event_protobuf_free */
-static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
+static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
{
size_t i;
return (strarray_add (&event->tags, &event->n_tags, tag));
} /* }}} int riemann_event_add_tag */
-static int riemann_event_add_attribute (Event *event, /* {{{ */
+static int riemann_event_add_attribute(Event *event, /* {{{ */
char const *key, char const *value)
{
Attribute **new_attributes;
return (0);
} /* }}} int riemann_event_add_attribute */
-static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
+static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
notification_t const *n)
{
Msg *msg;
return (msg);
} /* }}} Msg *riemann_notification_to_protobuf */
-static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
+static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
data_set_t const *ds,
value_list_t const *vl, size_t index,
gauge_t const *rates,
event->time = CDTIME_T_TO_TIME_T (vl->time);
event->has_time = 1;
- if (host->check_thresholds) {
- switch (status) {
- case STATE_OKAY:
- event->state = strdup("ok");
- break;
- case STATE_ERROR:
- event->state = strdup("critical");
- break;
- case STATE_WARNING:
- event->state = strdup("warning");
- break;
- case STATE_MISSING:
- event->state = strdup("unknown");
- break;
- }
- }
+ if (host->check_thresholds) {
+ switch (status) {
+ case STATE_OKAY:
+ event->state = strdup("ok");
+ break;
+ case STATE_ERROR:
+ event->state = strdup("critical");
+ break;
+ case STATE_WARNING:
+ event->state = strdup("warning");
+ break;
+ case STATE_MISSING:
+ event->state = strdup("unknown");
+ break;
+ }
+ }
ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
event->ttl = (float) ttl;
/* host = */ "", vl->plugin, vl->plugin_instance,
vl->type, vl->type_instance);
if (host->always_append_ds || (ds->ds_num > 1))
- if (host->event_service_prefix == NULL || host->event_service_prefix[0] == '\0')
- ssnprintf (service_buffer, sizeof (service_buffer),
- "%s/%s", &name_buffer[1], ds->ds[index].name);
+ {
+ if (host->event_service_prefix == NULL)
+ ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s",
+ &name_buffer[1], ds->ds[index].name);
else
- ssnprintf (service_buffer, sizeof (service_buffer),
- "%s%s/%s", host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
+ ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s",
+ host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
+ }
else
- if (host->event_service_prefix == NULL || host->event_service_prefix[0] == '\0')
- sstrncpy (service_buffer, &name_buffer[1],
- sizeof (service_buffer));
+ {
+ if (host->event_service_prefix == NULL)
+ sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer));
else
- ssnprintf (service_buffer, sizeof (service_buffer),
- "%s%s", host->event_service_prefix, &name_buffer[1]);
+ ssnprintf (service_buffer, sizeof (service_buffer), "%s%s",
+ host->event_service_prefix, &name_buffer[1]);
+ }
event->service = strdup (service_buffer);
struct riemann_host *host = ud->data;
Msg *msg;
- if (!host->notifications)
- return 0;
+ if (!host->notifications)
+ return 0;
msg = riemann_notification_to_protobuf (host, n);
if (msg == NULL)
struct riemann_host *host = ud->data;
Msg *msg;
- if (host->check_thresholds)
- write_riemann_threshold_check(ds, vl, statuses);
+ if (host->check_thresholds)
+ write_riemann_threshold_check(ds, vl, statuses);
msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
if (msg == NULL)
return (-1);
host->reference_count = 1;
host->node = NULL;
host->service = NULL;
- host->notifications = 1;
- host->check_thresholds = 0;
+ host->notifications = 1;
+ host->check_thresholds = 0;
host->store_rates = 1;
host->always_append_ds = 0;
host->use_tcp = 0;
status = cf_util_get_string (child, &host->node);
if (status != 0)
break;
- } else if (strcasecmp ("Notifications", child->key) == 0) {
- status = cf_util_get_boolean(child, &host->notifications);
- if (status != 0)
- break;
- } else if (strcasecmp ("EventServicePrefix", child->key) == 0) {
- status = cf_util_get_string (child, &host->event_service_prefix);
- if (status != 0)
- break;
- } else if (strcasecmp ("CheckThresholds", child->key) == 0) {
- status = cf_util_get_boolean(child, &host->check_thresholds);
- if (status != 0)
- break;
+ } else if (strcasecmp ("Notifications", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->notifications);
+ if (status != 0)
+ break;
+ } else if (strcasecmp ("EventServicePrefix", child->key) == 0) {
+ status = cf_util_get_string (child, &host->event_service_prefix);
+ if (status != 0)
+ break;
+ } else if (strcasecmp ("CheckThresholds", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->check_thresholds);
+ if (status != 0)
+ break;
} else if (strcasecmp ("Port", child->key) == 0) {
status = cf_util_get_service (child, &host->service);
if (status != 0) {
child->key);
}
}
- return 0;
+ return (0);
} /* }}} int riemann_config */
void module_register(void)
--- /dev/null
+/**
+ * collectd - src/write_tsdb.c
+ * Copyright (C) 2012 Pierre-Yves Ritschard
+ * Copyright (C) 2011 Scott Sanders
+ * Copyright (C) 2009 Paul Sadauskas
+ * Copyright (C) 2009 Doug MacEachern
+ * Copyright (C) 2007-2012 Florian octo Forster
+ * Copyright (C) 2013-2014 Limelight Networks, Inc.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Based on the write_graphite plugin. Authors:
+ * Florian octo Forster <octo at collectd.org>
+ * Doug MacEachern <dougm at hyperic.com>
+ * Paul Sadauskas <psadauskas at gmail.com>
+ * Scott Sanders <scott at jssjr.com>
+ * Pierre-Yves Ritschard <pyr at spootnik.org>
+ * write_tsdb Authors:
+ * Brett Hawn <bhawn at llnw.com>
+ * Kevin Bowling <kbowling@llnw.com>
+ **/
+
+/* write_tsdb plugin configuation example
+ *
+ * <Plugin write_tsdb>
+ * <Node>
+ * Host "localhost"
+ * Port "4242"
+ * HostTags "status=production deviceclass=www"
+ * </Node>
+ * </Plugin>
+ */
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "configfile.h"
+
+#include "utils_cache.h"
+#include "utils_parse_option.h"
+
+#include <pthread.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#ifndef WT_DEFAULT_NODE
+# define WT_DEFAULT_NODE "localhost"
+#endif
+
+#ifndef WT_DEFAULT_SERVICE
+# define WT_DEFAULT_SERVICE "4242"
+#endif
+
+#ifndef WT_DEFAULT_ESCAPE
+# define WT_DEFAULT_ESCAPE '.'
+#endif
+
+/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
+#ifndef WT_SEND_BUF_SIZE
+# define WT_SEND_BUF_SIZE 1428
+#endif
+
+/*
+ * Private variables
+ */
+struct wt_callback
+{
+ int sock_fd;
+
+ char *node;
+ char *service;
+ char *host_tags;
+
+ _Bool store_rates;
+ _Bool always_append_ds;
+
+ char send_buf[WT_SEND_BUF_SIZE];
+ size_t send_buf_free;
+ size_t send_buf_fill;
+ cdtime_t send_buf_init_time;
+
+ pthread_mutex_t send_lock;
+};
+
+
+/*
+ * Functions
+ */
+static void wt_reset_buffer(struct wt_callback *cb)
+{
+ memset(cb->send_buf, 0, sizeof(cb->send_buf));
+ cb->send_buf_free = sizeof(cb->send_buf);
+ cb->send_buf_fill = 0;
+ cb->send_buf_init_time = cdtime();
+}
+
+static int wt_send_buffer(struct wt_callback *cb)
+{
+ ssize_t status = 0;
+
+ status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
+ if (status < 0)
+ {
+ char errbuf[1024];
+ ERROR("write_tsdb plugin: send failed with status %zi (%s)",
+ status, sstrerror (errno, errbuf, sizeof (errbuf)));
+
+ close (cb->sock_fd);
+ cb->sock_fd = -1;
+
+ return -1;
+ }
+
+ return 0;
+}
+
+/* NOTE: You must hold cb->send_lock when calling this function! */
+static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb)
+{
+ int status;
+
+ DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; "
+ "send_buf_fill = %zu;",
+ (double)timeout,
+ cb->send_buf_fill);
+
+ /* timeout == 0 => flush unconditionally */
+ if (timeout > 0)
+ {
+ cdtime_t now;
+
+ now = cdtime();
+ if ((cb->send_buf_init_time + timeout) > now)
+ return 0;
+ }
+
+ if (cb->send_buf_fill <= 0)
+ {
+ cb->send_buf_init_time = cdtime();
+ return 0;
+ }
+
+ status = wt_send_buffer(cb);
+ wt_reset_buffer(cb);
+
+ return status;
+}
+
+static int wt_callback_init(struct wt_callback *cb)
+{
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list;
+ struct addrinfo *ai_ptr;
+ int status;
+
+ const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
+ const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE;
+
+ if (cb->sock_fd > 0)
+ return 0;
+
+ memset(&ai_hints, 0, sizeof(ai_hints));
+#ifdef AI_ADDRCONFIG
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_STREAM;
+
+ ai_list = NULL;
+
+ status = getaddrinfo(node, service, &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s",
+ node, service, gai_strerror (status));
+ return -1;
+ }
+
+ assert (ai_list != NULL);
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ cb->sock_fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype,
+ ai_ptr->ai_protocol);
+ if (cb->sock_fd < 0)
+ continue;
+
+ status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ if (status != 0)
+ {
+ close(cb->sock_fd);
+ cb->sock_fd = -1;
+ continue;
+ }
+
+ break;
+ }
+
+ freeaddrinfo(ai_list);
+
+ if (cb->sock_fd < 0)
+ {
+ char errbuf[1024];
+ ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
+ "The last error was: %s", node, service,
+ sstrerror (errno, errbuf, sizeof(errbuf)));
+ close(cb->sock_fd);
+ return -1;
+ }
+
+ wt_reset_buffer(cb);
+
+ return 0;
+}
+
+static void wt_callback_free(void *data)
+{
+ struct wt_callback *cb;
+
+ if (data == NULL)
+ return;
+
+ cb = data;
+
+ pthread_mutex_lock(&cb->send_lock);
+
+ wt_flush_nolock(0, cb);
+
+ close(cb->sock_fd);
+ cb->sock_fd = -1;
+
+ sfree(cb->node);
+ sfree(cb->service);
+ sfree(cb->host_tags);
+
+ pthread_mutex_destroy(&cb->send_lock);
+
+ sfree(cb);
+}
+
+static int wt_flush(cdtime_t timeout,
+ const char *identifier __attribute__((unused)),
+ user_data_t *user_data)
+{
+ struct wt_callback *cb;
+ int status;
+
+ if (user_data == NULL)
+ return -EINVAL;
+
+ cb = user_data->data;
+
+ pthread_mutex_lock(&cb->send_lock);
+
+ if (cb->sock_fd < 0)
+ {
+ status = wt_callback_init(cb);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: wt_callback_init failed.");
+ pthread_mutex_unlock(&cb->send_lock);
+ return -1;
+ }
+ }
+
+ status = wt_flush_nolock(timeout, cb);
+ pthread_mutex_unlock(&cb->send_lock);
+
+ return status;
+}
+
+static int wt_format_values(char *ret, size_t ret_len,
+ int ds_num, const data_set_t *ds,
+ const value_list_t *vl,
+ _Bool store_rates)
+{
+ size_t offset = 0;
+ int status;
+ gauge_t *rates = NULL;
+
+ assert(0 == strcmp (ds->type, vl->type));
+
+ memset(ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+ status = ssnprintf (ret + offset, ret_len - offset, \
+ __VA_ARGS__); \
+ if (status < 1) \
+ { \
+ sfree(rates); \
+ return -1; \
+ } \
+ else if (((size_t) status) >= (ret_len - offset)) \
+ { \
+ sfree(rates); \
+ return -1; \
+ } \
+ else \
+ offset += ((size_t) status); \
+} while (0)
+
+ if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+ BUFFER_ADD("%f", vl->values[ds_num].gauge);
+ else if (store_rates)
+ {
+ if (rates == NULL)
+ rates = uc_get_rate (ds, vl);
+ if (rates == NULL)
+ {
+ WARNING("format_values: "
+ "uc_get_rate failed.");
+ return -1;
+ }
+ BUFFER_ADD("%f", rates[ds_num]);
+ }
+ else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+ BUFFER_ADD("%llu", vl->values[ds_num].counter);
+ else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+ BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive);
+ else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+ BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute);
+ else
+ {
+ ERROR("format_values plugin: Unknown data source type: %i",
+ ds->ds[ds_num].type);
+ sfree(rates);
+ return -1;
+ }
+
+#undef BUFFER_ADD
+
+ sfree(rates);
+ return 0;
+}
+
+static int wt_format_name(char *ret, int ret_len,
+ const value_list_t *vl,
+ const struct wt_callback *cb,
+ const char *ds_name)
+{
+ int status;
+ char *temp = NULL;
+ char *prefix = "";
+ const char *meta_prefix = "tsdb_prefix";
+
+ if (vl->meta) {
+ status = meta_data_get_string(vl->meta, meta_prefix, &temp);
+ if (status == -ENOENT) {
+ /* defaults to empty string */
+ } else if (status < 0) {
+ sfree(temp);
+ return status;
+ } else {
+ prefix = temp;
+ }
+ }
+
+ if (ds_name != NULL) {
+ if (vl->plugin_instance[0] == '\0') {
+ ssnprintf(ret, ret_len, "%s%s.%s",
+ prefix, vl->plugin, ds_name);
+ } else if (vl->type_instance == '\0') {
+ ssnprintf(ret, ret_len, "%s%s.%s.%s.%s",
+ prefix, vl->plugin, vl->plugin_instance,
+ vl->type_instance, ds_name);
+ } else {
+ ssnprintf(ret, ret_len, "%s%s.%s.%s.%s",
+ prefix, vl->plugin, vl->plugin_instance, vl->type,
+ ds_name);
+ }
+ } else if (vl->plugin_instance[0] == '\0') {
+ if (vl->type_instance[0] == '\0')
+ ssnprintf(ret, ret_len, "%s%s.%s",
+ prefix, vl->plugin, vl->type);
+ else
+ ssnprintf(ret, ret_len, "%s%s.%s",
+ prefix, vl->plugin, vl->type_instance);
+ } else if (vl->type_instance[0] == '\0') {
+ ssnprintf(ret, ret_len, "%s%s.%s.%s",
+ prefix, vl->plugin, vl->plugin_instance, vl->type);
+ } else {
+ ssnprintf(ret, ret_len, "%s%s.%s.%s",
+ prefix, vl->plugin, vl->plugin_instance, vl->type_instance);
+ }
+
+ sfree(temp);
+ return 0;
+}
+
+static int wt_send_message (const char* key, const char* value,
+ cdtime_t time, struct wt_callback *cb,
+ const char* host, meta_data_t *md)
+{
+ int status;
+ int message_len;
+ char *temp = NULL;
+ char *tags = "";
+ char message[1024];
+ char *host_tags = cb->host_tags ? cb->host_tags : "";
+ const char *meta_tsdb = "tsdb_tags";
+
+ /* skip if value is NaN */
+ if (value[0] == 'n')
+ return 0;
+
+ if (md) {
+ status = meta_data_get_string(md, meta_tsdb, &temp);
+ if (status == -ENOENT) {
+ /* defaults to empty string */
+ } else if (status < 0) {
+ ERROR("write_tsdb plugin: tags metadata get failure");
+ sfree(temp);
+ pthread_mutex_unlock(&cb->send_lock);
+ return status;
+ } else {
+ tags = temp;
+ }
+ }
+
+ message_len = ssnprintf (message,
+ sizeof(message),
+ "put %s %.0f %s fqdn=%s %s %s\r\n",
+ key,
+ CDTIME_T_TO_DOUBLE(time),
+ value,
+ host,
+ tags,
+ host_tags);
+
+ sfree(temp);
+
+ if (message_len >= sizeof(message)) {
+ ERROR("write_tsdb plugin: message buffer too small: "
+ "Need %d bytes.", message_len + 1);
+ return -1;
+ }
+
+ pthread_mutex_lock(&cb->send_lock);
+
+ if (cb->sock_fd < 0)
+ {
+ status = wt_callback_init(cb);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: wt_callback_init failed.");
+ pthread_mutex_unlock(&cb->send_lock);
+ return -1;
+ }
+ }
+
+ if (message_len >= cb->send_buf_free)
+ {
+ status = wt_flush_nolock(0, cb);
+ if (status != 0)
+ {
+ pthread_mutex_unlock(&cb->send_lock);
+ return status;
+ }
+ }
+
+ /* Assert that we have enough space for this message. */
+ assert(message_len < cb->send_buf_free);
+
+ /* `message_len + 1' because `message_len' does not include the
+ * trailing null byte. Neither does `send_buffer_fill'. */
+ memcpy(cb->send_buf + cb->send_buf_fill,
+ message, message_len + 1);
+ cb->send_buf_fill += message_len;
+ cb->send_buf_free -= message_len;
+
+ DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"",
+ cb->node,
+ cb->service,
+ cb->send_buf_fill, sizeof(cb->send_buf),
+ 100.0 * ((double) cb->send_buf_fill) /
+ ((double) sizeof(cb->send_buf)),
+ message);
+
+ pthread_mutex_unlock(&cb->send_lock);
+
+ return 0;
+}
+
+static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
+ struct wt_callback *cb)
+{
+ char key[10*DATA_MAX_NAME_LEN];
+ char values[512];
+
+ int status, i;
+
+ if (0 != strcmp(ds->type, vl->type))
+ {
+ ERROR("write_tsdb plugin: DS type does not match "
+ "value list type");
+ return -1;
+ }
+
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ const char *ds_name = NULL;
+
+ if (cb->always_append_ds || (ds->ds_num > 1))
+ ds_name = ds->ds[i].name;
+
+ /* Copy the identifier to 'key' and escape it. */
+ status = wt_format_name(key, sizeof(key), vl, cb, ds_name);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: error with format_name");
+ return status;
+ }
+
+ escape_string(key, sizeof(key));
+ /* Convert the values to an ASCII representation and put that into
+ * 'values'. */
+ status = wt_format_values(values, sizeof(values), i, ds, vl,
+ cb->store_rates);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: error with "
+ "wt_format_values");
+ return status;
+ }
+
+ /* Send the message to tsdb */
+ status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta);
+ if (status != 0)
+ {
+ ERROR("write_tsdb plugin: error with "
+ "wt_send_message");
+ return status;
+ }
+ }
+
+ return 0;
+}
+
+static int wt_write(const data_set_t *ds, const value_list_t *vl,
+ user_data_t *user_data)
+{
+ struct wt_callback *cb;
+ int status;
+
+ if (user_data == NULL)
+ return EINVAL;
+
+ cb = user_data->data;
+
+ status = wt_write_messages(ds, vl, cb);
+
+ return status;
+}
+
+static int wt_config_tsd(oconfig_item_t *ci)
+{
+ struct wt_callback *cb;
+ user_data_t user_data;
+ char callback_name[DATA_MAX_NAME_LEN];
+ int i;
+
+ cb = malloc(sizeof(*cb));
+ if (cb == NULL)
+ {
+ ERROR("write_tsdb plugin: malloc failed.");
+ return -1;
+ }
+ memset(cb, 0, sizeof(*cb));
+ cb->sock_fd = -1;
+ cb->node = NULL;
+ cb->service = NULL;
+ cb->host_tags = NULL;
+ cb->store_rates = 0;
+
+ pthread_mutex_init (&cb->send_lock, NULL);
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Host", child->key) == 0)
+ cf_util_get_string(child, &cb->node);
+ else if (strcasecmp("Port", child->key) == 0)
+ cf_util_get_service(child, &cb->service);
+ else if (strcasecmp("HostTags", child->key) == 0)
+ cf_util_get_string(child, &cb->host_tags);
+ else if (strcasecmp("StoreRates", child->key) == 0)
+ cf_util_get_boolean(child, &cb->store_rates);
+ else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
+ cf_util_get_boolean(child, &cb->always_append_ds);
+ else
+ {
+ ERROR("write_tsdb plugin: Invalid configuration "
+ "option: %s.", child->key);
+ }
+ }
+
+ ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
+ cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
+ cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
+
+ memset(&user_data, 0, sizeof(user_data));
+ user_data.data = cb;
+ user_data.free_func = wt_callback_free;
+ plugin_register_write(callback_name, wt_write, &user_data);
+
+ user_data.free_func = NULL;
+ plugin_register_flush(callback_name, wt_flush, &user_data);
+
+ return 0;
+}
+
+static int wt_config(oconfig_item_t *ci)
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Node", child->key) == 0)
+ wt_config_tsd(child);
+ else
+ {
+ ERROR("write_tsdb plugin: Invalid configuration "
+ "option: %s.", child->key);
+ }
+ }
+
+ return 0;
+}
+
+void module_register(void)
+{
+ plugin_register_complex_config("write_tsdb", wt_config);
+}
+
+/* vim: set sw=4 ts=4 sts=4 tw=78 et : */
#!/usr/bin/env bash
-DEFAULT_VERSION="5.4.0.git"
+DEFAULT_VERSION="5.4.1.git"
VERSION="`git describe 2> /dev/null | grep collectd | sed -e 's/^collectd-//'`"