Phoenix Kayo <kayo.k11.4 at gmail.com>
- pinba plugin.
+Pierre-Yves Ritschard <pyr at spootnik.org>
+ - Write-Riemann plugin.
+ - Write-Graphite plugin: Notification support.
+
Piotr Hosowicz <the55 at wp.pl>
- SMF manifest for collectd.
requests. The transmitted data is either in a form understood by the
Exec plugin or formatted in JSON.
+ - write_mongodb
+ Sends data to MongoDB, a NoSQL database.
+
- write_redis
Sends the values to a Redis key-value database server.
+ - write_riemann
+ Sends data to Riemann, a stream processing and monitoring system.
+
* Logging is, as everything in collectd, provided by plugins. The following
plugins keep up informed about what's going on:
AC_HEADER_DIRENT
AC_HEADER_STDBOOL
-AC_CHECK_HEADERS(stdio.h errno.h math.h stdarg.h syslog.h fcntl.h signal.h assert.h sys/types.h sys/socket.h sys/select.h poll.h netdb.h arpa/inet.h sys/resource.h sys/param.h kstat.h regex.h sys/ioctl.h endian.h sys/isa_defs.h)
+AC_CHECK_HEADERS(stdio.h errno.h math.h stdarg.h syslog.h fcntl.h signal.h assert.h sys/types.h sys/socket.h sys/select.h poll.h netdb.h arpa/inet.h sys/resource.h sys/param.h kstat.h regex.h sys/ioctl.h endian.h sys/isa_defs.h fnmatch.h libgen.h)
# For ping library
AC_CHECK_HEADERS(netinet/in_systm.h, [], [],
then
AC_CHECK_HEADERS(mach/mach_init.h mach/host_priv.h mach/mach_error.h mach/mach_host.h mach/mach_port.h mach/mach_types.h mach/message.h mach/processor_set.h mach/processor.h mach/processor_info.h mach/task.h mach/thread_act.h mach/vm_region.h mach/vm_map.h mach/vm_prot.h mach/vm_statistics.h mach/kern_return.h)
AC_CHECK_HEADERS(CoreFoundation/CoreFoundation.h IOKit/IOKitLib.h IOKit/IOTypes.h IOKit/ps/IOPSKeys.h IOKit/IOBSD.h IOKit/storage/IOBlockStorageDriver.h)
+ # For the battery plugin
+ AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
+[
+#if HAVE_IOKIT_IOKITLIB_H
+# include <IOKit/IOKitLib.h>
+#endif
+#if HAVE_IOKIT_IOTYPES_H
+# include <IOKit/IOTypes.h>
+#endif
+])
+
fi
+
AC_CHECK_HEADERS(sys/sysctl.h, [], [],
[
#if HAVE_SYS_TYPES_H
fi
# For hddtemp module
-AC_CHECK_HEADERS(linux/major.h libgen.h)
+AC_CHECK_HEADERS(linux/major.h)
# For md module (Linux only)
if test "x$ac_system" = "xLinux"
have_linux_raid_md_u_h="no"
fi
-# For the battery plugin
-AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
-[
-#if HAVE_IOKIT_IOKITLIB_H
-# include <IOKit/IOKitLib.h>
-#endif
-#if HAVE_IOKIT_IOTYPES_H
-# include <IOKit/IOTypes.h>
-#endif
-])
-
# For the swap module
have_linux_wireless_h="no"
if test "x$ac_system" = "xLinux"
AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics])
AC_PLUGIN([write_graphite], [yes], [Graphite / Carbon output plugin])
AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin])
-AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin])
AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin])
+AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin])
+AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin])
AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics])
AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics])
wireless . . . . . . $enable_wireless
write_graphite . . . $enable_write_graphite
write_http . . . . . $enable_write_http
- write_redis . . . . . $enable_write_redis
write_mongodb . . . . $enable_write_mongodb
+ write_redis . . . . . $enable_write_redis
+ write_riemann . . . . $enable_write_riemann
xmms . . . . . . . . $enable_xmms
zfs_arc . . . . . . . $enable_zfs_arc
collectd_DEPENDENCIES += write_redis.la
endif
+if BUILD_PLUGIN_WRITE_RIEMANN
+BUILT_SOURCES += riemann.pb-c.c riemann.pb-c.h
+CLEANFILES += riemann.pb-c.c riemann.pb-c.h
+pkglib_LTLIBRARIES += write_riemann.la
+write_riemann_la_SOURCES = write_riemann.c riemann.pb-c.c
+write_riemann_la_LDFLAGS = -module -avoid-version
+write_riemann_la_LIBADD = -lprotobuf-c
+collectd_LDADD += "-dlopen" write_riemann.la
+collectd_DEPENDENCIES += write_riemann.la
+endif
+
if BUILD_PLUGIN_XMMS
pkglib_LTLIBRARIES += xmms.la
xmms_la_SOURCES = xmms.c
#collectd_1_SOURCES = collectd.pod
-EXTRA_DIST = types.db pinba.proto
+EXTRA_DIST = types.db pinba.proto riemann.proto
EXTRA_DIST += collectd.conf.pod \
collectd-email.pod \
pinba.pb-c.c pinba.pb-c.h: pinba.proto
protoc-c -I$(srcdir) --c_out . $(srcdir)/pinba.proto
+riemann.pb-c.c riemann.pb-c.h: riemann.proto
+ protoc-c --c_out . riemann.proto
+
install-exec-hook:
$(mkinstalldirs) $(DESTDIR)$(sysconfdir)
if test -e $(DESTDIR)$(sysconfdir)/collectd.conf; \
**/
#include "collectd.h"
+
+#include <pthread.h>
+
#include "plugin.h"
#include "common.h"
#include "configfile.h"
#include "meta_data.h"
#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_subst.h"
#include "utils_vl_lookup.h"
-#include <pthread.h>
+#define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
+#define AGG_FUNC_PLACEHOLDER "%{aggregation}"
struct aggregation_s /* {{{ */
{
identifier_t ident;
+ unsigned int group_by;
+
+ unsigned int regex_fields;
+
+ char *set_host;
+ char *set_plugin;
+ char *set_plugin_instance;
+ char *set_type_instance;
_Bool calc_num;
_Bool calc_sum;
static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
static agg_instance_t *agg_instance_list_head = NULL;
+static _Bool agg_is_regex (char const *str) /* {{{ */
+{
+ size_t len;
+
+ if (str == NULL)
+ return (0);
+
+ len = strlen (str);
+ if (len < 3)
+ return (0);
+
+ if ((str[0] == '/') && (str[len - 1] == '/'))
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool agg_is_regex */
+
static void agg_destroy (aggregation_t *agg) /* {{{ */
{
sfree (agg);
inst->max = NAN;
} /* }}} void agg_instance_destroy */
+static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
+ value_list_t const *vl, aggregation_t const *agg)
+{
+#define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
+ if (agg->set_ ## field != NULL) \
+ sstrncpy (buffer, agg->set_ ## field, buffer_size); \
+ else if ((agg->regex_fields & group_mask) \
+ && (agg->group_by & group_mask)) \
+ sstrncpy (buffer, vl->field, buffer_size); \
+ else if ((agg->regex_fields & group_mask) \
+ && (AGG_MATCHES_ALL (agg->ident.field))) \
+ sstrncpy (buffer, all_value, buffer_size); \
+ else \
+ sstrncpy (buffer, agg->ident.field, buffer_size); \
+} while (0)
+
+ /* Host */
+ COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
+ host, LU_GROUP_BY_HOST, "global");
+
+ /* Plugin */
+ if (agg->set_plugin != NULL)
+ sstrncpy (inst->ident.plugin, agg->set_plugin,
+ sizeof (inst->ident.plugin));
+ else
+ sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
+
+ /* Plugin instance */
+ if (agg->set_plugin_instance != NULL)
+ sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
+ sizeof (inst->ident.plugin_instance));
+ else
+ {
+ char tmp_plugin[DATA_MAX_NAME_LEN];
+ char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
+
+ if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+ && (agg->group_by & LU_GROUP_BY_PLUGIN))
+ sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
+ else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+ && (AGG_MATCHES_ALL (agg->ident.plugin)))
+ sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
+ else
+ sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
+
+ if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+ && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
+ sstrncpy (tmp_plugin_instance, vl->plugin_instance,
+ sizeof (tmp_plugin_instance));
+ else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+ && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
+ sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
+ else
+ sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
+ sizeof (tmp_plugin_instance));
+
+ if ((strcmp ("", tmp_plugin) == 0)
+ && (strcmp ("", tmp_plugin_instance) == 0))
+ sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
+ sizeof (inst->ident.plugin_instance));
+ else if (strcmp ("", tmp_plugin) != 0)
+ ssnprintf (inst->ident.plugin_instance,
+ sizeof (inst->ident.plugin_instance),
+ "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
+ else if (strcmp ("", tmp_plugin_instance) != 0)
+ ssnprintf (inst->ident.plugin_instance,
+ sizeof (inst->ident.plugin_instance),
+ "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+ else
+ ssnprintf (inst->ident.plugin_instance,
+ sizeof (inst->ident.plugin_instance),
+ "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+ }
+
+ /* Type */
+ sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
+
+ /* Type instance */
+ COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
+ type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
+
+#undef COPY_FIELD
+
+ return (0);
+} /* }}} int agg_instance_create_name */
+
/* Create a new aggregation instance. */
static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
value_list_t const *vl, aggregation_t *agg)
inst->ds_type = ds->ds[0].type;
-#define COPY_FIELD(fld) do { \
- sstrncpy (inst->ident.fld, \
- LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
- sizeof (inst->ident.fld)); \
-} while (0)
-
- COPY_FIELD (host);
- COPY_FIELD (plugin);
- COPY_FIELD (plugin_instance);
- COPY_FIELD (type);
- COPY_FIELD (type_instance);
-
-#undef COPY_FIELD
+ agg_instance_create_name (inst, vl, agg);
inst->min = NAN;
inst->max = NAN;
int status;
if (pi_prefix[0] != 0)
- ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
- pi_prefix, func);
+ subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
+ pi_prefix, AGG_FUNC_PLACEHOLDER, func);
else
sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
vl->values = &v;
vl->values_len = 1;
- plugin_dispatch_values_secure (vl);
+ plugin_dispatch_values (vl);
vl->values = NULL;
vl->values_len = 0;
static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
{
value_list_t vl = VALUE_LIST_INIT;
- char pi_prefix[DATA_MAX_NAME_LEN];
/* Pre-set all the fields in the value list that will not change per
* aggregation type (sum, average, ...). The struct will be re-used and must
}
meta_data_add_boolean (vl.meta, "aggregation:created", 1);
- if (LU_IS_ALL (inst->ident.host))
- sstrncpy (vl.host, "global", sizeof (vl.host));
- else
- sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
-
- sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
-
- if (LU_IS_ALL (inst->ident.plugin))
- {
- if (LU_IS_ALL (inst->ident.plugin_instance))
- sstrncpy (pi_prefix, "", sizeof (pi_prefix));
- else
- sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
- }
- else
- {
- if (LU_IS_ALL (inst->ident.plugin_instance))
- sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
- else
- ssnprintf (pi_prefix, sizeof (pi_prefix),
- "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
- }
-
+ sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+ sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
-
- if (!LU_IS_ALL (inst->ident.type_instance))
- sstrncpy (vl.type_instance, inst->ident.type_instance,
- sizeof (vl.type_instance));
+ sstrncpy (vl.type_instance, inst->ident.type_instance,
+ sizeof (vl.type_instance));
#define READ_FUNC(func, rate) do { \
if (inst->state_ ## func != NULL) { \
agg_instance_read_func (inst, #func, rate, \
- inst->state_ ## func, &vl, pi_prefix, t); \
+ inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
} \
} while (0)
value = ci->values[i].value.string;
if (strcasecmp ("Host", value) == 0)
- sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+ agg->group_by |= LU_GROUP_BY_HOST;
else if (strcasecmp ("Plugin", value) == 0)
- sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+ agg->group_by |= LU_GROUP_BY_PLUGIN;
else if (strcasecmp ("PluginInstance", value) == 0)
- sstrncpy (agg->ident.plugin_instance, LU_ANY,
- sizeof (agg->ident.plugin_instance));
+ agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
else if (strcasecmp ("TypeInstance", value) == 0)
- sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+ agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
else if (strcasecmp ("Type", value) == 0)
ERROR ("aggregation plugin: Grouping by type is not supported.");
else
}
memset (agg, 0, sizeof (*agg));
- sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
- sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
- sstrncpy (agg->ident.plugin_instance, LU_ALL,
+ sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
+ sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
+ sstrncpy (agg->ident.plugin_instance, "/.*/",
sizeof (agg->ident.plugin_instance));
- sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
- sstrncpy (agg->ident.type_instance, LU_ALL,
+ sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
+ sstrncpy (agg->ident.type_instance, "/.*/",
sizeof (agg->ident.type_instance));
for (i = 0; i < ci->children_num; i++)
else if (strcasecmp ("TypeInstance", child->key) == 0)
cf_util_get_string_buffer (child, agg->ident.type_instance,
sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("SetHost", child->key) == 0)
+ cf_util_get_string (child, &agg->set_host);
+ else if (strcasecmp ("SetPlugin", child->key) == 0)
+ cf_util_get_string (child, &agg->set_plugin);
+ else if (strcasecmp ("SetPluginInstance", child->key) == 0)
+ cf_util_get_string (child, &agg->set_plugin_instance);
+ else if (strcasecmp ("SetTypeInstance", child->key) == 0)
+ cf_util_get_string (child, &agg->set_type_instance);
else if (strcasecmp ("GroupBy", child->key) == 0)
agg_config_handle_group_by (child, agg);
else if (strcasecmp ("CalculateNum", child->key) == 0)
"<Aggregation /> blocks and will be ignored.", child->key);
}
+ if (agg_is_regex (agg->ident.host))
+ agg->regex_fields |= LU_GROUP_BY_HOST;
+ if (agg_is_regex (agg->ident.plugin))
+ agg->regex_fields |= LU_GROUP_BY_PLUGIN;
+ if (agg_is_regex (agg->ident.plugin_instance))
+ agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
+ if (agg_is_regex (agg->ident.type_instance))
+ agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
+
/* Sanity checking */
is_valid = 1;
- if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+ if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
{
ERROR ("aggregation plugin: It appears you did not specify the required "
"\"Type\" option in this aggregation. "
else if (strchr (agg->ident.type, '/') != NULL)
{
ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
- "character. Especially, it may not be a wildcard. The current "
+ "character. Especially, it may not be a regex. The current "
"value is \"%s\".", agg->ident.type);
is_valid = 0;
} /* }}} */
- if (!LU_IS_ALL (agg->ident.host) /* {{{ */
- && !LU_IS_ALL (agg->ident.plugin)
- && !LU_IS_ALL (agg->ident.plugin_instance)
- && !LU_IS_ALL (agg->ident.type_instance))
+ /* Check that there is at least one regex field without a grouping. {{{ */
+ if ((agg->regex_fields & ~agg->group_by) == 0)
{
ERROR ("aggregation plugin: An aggregation must contain at least one "
"wildcard. This is achieved by leaving at least one of the \"Host\", "
"\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
- "and not grouping by that field. "
+ "or using a regular expression and not grouping by that field. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ /* Check that all grouping fields are regular expressions. {{{ */
+ if (agg->group_by & ~agg->regex_fields)
+ {
+ ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
+ "regular expression is configured or which are left blank) can be "
+ "specified in the \"GroupBy\" option. "
"(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
"Type \"%s\", TypeInstance \"%s\")",
agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
return (-1);
} /* }}} */
- status = lookup_add (lookup, &agg->ident, agg);
+ status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
if (status != 0)
{
ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
/* Default values for contacting daemon */
static char *conf_host = NULL;
static int conf_port = NISPORT;
+/* Defaults to false for backwards compatibility. */
+static _Bool conf_report_seconds = 0;
static int global_sockfd = -1;
{
"Host",
"Port",
- NULL
+ "ReportSeconds"
};
-static int config_keys_num = 2;
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
static int net_shutdown (int *fd)
{
else if (strcmp ("LINEFREQ", key) == 0)
apcups_detail->linefreq = value;
else if (strcmp ("TIMELEFT", key) == 0)
+ {
+ /* Convert minutes to seconds if requested by
+ * the user. */
+ if (conf_report_seconds)
+ value *= 60.0;
apcups_detail->timeleft = value;
+ }
tokptr = strtok_r (NULL, ":", &toksaveptr);
} /* while (tokptr != NULL) */
}
conf_port = port_tmp;
}
+ else if (strcasecmp (key, "ReportSeconds") == 0)
+ {
+ if (IS_TRUE (value))
+ conf_report_seconds = 1;
+ else
+ conf_report_seconds = 0;
+ }
else
{
return (-1);
#Timeout 2
#ReadThreads 5
+#WriteThreads 5
##############################################################################
# Logging #
#@BUILD_PLUGIN_WIRELESS_TRUE@LoadPlugin wireless
#@BUILD_PLUGIN_WRITE_GRAPHITE_TRUE@LoadPlugin write_graphite
#@BUILD_PLUGIN_WRITE_HTTP_TRUE@LoadPlugin write_http
-#@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
#@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb
+#@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
+#@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann
#@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms
#@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc
#<Plugin apcups>
# Host "localhost"
# Port "3551"
+# ReportSeconds true
#</Plugin>
#<Plugin ascent>
# #SelectDB "custdb0"
# Query "num_of_customers"
# #Query "..."
+# #Host "..."
# </Database>
#</Plugin>
#</Plugin>
#<Plugin write_graphite>
-# <Carbon>
+# <Node "example">
# Host "localhost"
# Port "2003"
# Prefix "collectd"
# Postfix "collectd"
-# StoreRates false
+# StoreRates true
# AlwaysAppendDS false
# EscapeCharacter "_"
-# </Carbon>
+# </Node>
#</Plugin>
#<Plugin write_http>
# </URL>
#</Plugin>
+#<Plugin write_mongodb>
+# <Node "example">
+# Host "localhost"
+# Port "27017"
+# Timeout 1000
+# StoreRates false
+# </Node>
+#</Plugin>
+
#<Plugin write_redis>
# <Node "example">
# Host "localhost"
# </Node>
#</Plugin>
-#<Plugin write_mongodb>
+#<Plugin write_riemann>
# <Node "example">
# Host "localhost"
-# Port "27017"
-# Timeout 1000
-# StoreRates false
+# Port 5555
+# Protocol UDP
+# StoreRates true
+# AlwaysAppendDS false
# </Node>
+# Tag "foobar"
#</Plugin>
##############################################################################
=back
-=item B<Include> I<Path>
+=item B<Include> I<Path> [I<pattern>]
If I<Path> points to a file, includes that file. If I<Path> points to a
directory, recursively includes all files within that directory and its
Include "/etc/collectd.d/*.conf"
+If the C<fnmatch> function is available on your system, a shell-like wildcard
+I<pattern> may be specified to filter which files to include. This may be used
+in combination with recursively including a directory to easily be able to
+arbitrarily mix configuration files and other documents (e.g. README files).
+The following statement is similar to the example above but includes all files
+matching C<*.conf> in any subdirectory of C</etc/collectd.d>:
+
+ Include "/etc/collectd.d" "*.conf"
+
If more than one files are included by a single B<Include> option, the files
will be included in lexicographical order (as defined by the C<strcmp>
function). Thus, you can e.E<nbsp>g. use numbered prefixes to specify the
Number of threads to start for reading plugins. The default value is B<5>, but
you may want to increase this if you have more than five plugins that take a
-long time to read. Mostly those are plugin that do network-IO. Setting this to
-a value higher than the number of plugins you've loaded is totally useless.
+long time to read. Mostly those are plugins that do network-IO. Setting this to
+a value higher than the number of registered read callbacks is not recommended.
+
+=item B<WriteThreads> I<Num>
+
+Number of threads to start for dispatching value lists to write plugins. The
+default value is B<5>, but you may want to increase this if you have more than
+five plugins that may take relatively long to write to.
=item B<Hostname> I<Name>
Selects the value lists to be added to this aggregation. B<Type> must be a
valid data set name, see L<types.db(5)> for details.
+If the string starts with and ends with a slash (C</>), the string is
+interpreted as a I<regular expression>. The regex flavor used are POSIX
+extended regular expressions as described in L<regex(7)>. Example usage:
+
+ Host "/^db[0-9]\\.example\\.com$/"
+
=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
Group valued by the specified field. The B<GroupBy> option may be repeated to
group by multiple fields.
+=item B<SetHost> I<Host>
+
+=item B<SetPlugin> I<Plugin>
+
+=item B<SetPluginInstance> I<PluginInstance>
+
+=item B<SetTypeInstance> I<TypeInstance>
+
+Sets the appropriate part of the identifier to the provided string.
+
+The I<PluginInstance> should include the placeholder C<%{aggregation}> which
+will be replaced with the aggregation function, e.g. "average". Not including
+the placeholder will result in duplication warnings and/or messed up values if
+more than one aggregation function are enabled.
+
+The following example calculates the average usage of all "even" CPUs:
+
+ <Plugin "aggregation">
+ <Aggregation>
+ Plugin "cpu"
+ PluginInstance "/[0,2,4,6,8]$/"
+ Type "cpu"
+
+ SetPlugin "cpu"
+ SetPluginInstance "even-%{aggregation}"
+
+ GroupBy "Host"
+ GroupBy "TypeInstance"
+
+ CalculateAverage true
+ </Aggregation>
+ </Plugin>
+
+This will create the files:
+
+=over 4
+
+=item
+
+foo.example.com/cpu-even-average/cpu-idle
+
+=item
+
+foo.example.com/cpu-even-average/cpu-system
+
+=item
+
+foo.example.com/cpu-even-average/cpu-user
+
+=item
+
+...
+
+=back
+
=item B<CalculateNum> B<true>|B<false>
=item B<CalculateSum> B<true>|B<false>
TCP-Port to connect to. Defaults to B<3551>.
+=item B<ReportSeconds> B<true|false>
+
+If set to B<true>, the time reported in the C<timeleft> metric will be
+converted to seconds. This is the recommended setting. If set to B<false>, the
+default for backwards compatibility, the time will be reported in minutes.
+
=back
=head2 Plugin C<ascent>
L<http://libdbi-drivers.sourceforge.net/>. However, the options "host",
"username", "password", and "dbname" seem to be deE<nbsp>facto standards.
+DBDs can register two types of options: String options and numeric options. The
+plugin will use the C<dbi_conn_set_option> function when the configuration
+provides a string and the C<dbi_conn_require_option_numeric> function when the
+configuration provides a number. So these two lines will actually result in
+different calls being used:
+
+ DriverOption "Port" 1234 # numeric
+ DriverOption "Port" "1234" # string
+
Unfortunately, drivers are not too keen to report errors when an unknown option
is passed to them, so invalid settings here may go unnoticed. This is not the
plugin's fault, it will report errors if it gets them from the libraryE<nbsp>/
the driver. If a driver complains about an option, the plugin will dump a
-complete list of all options understood by that driver to the log.
+complete list of all options understood by that driver to the log. There is no
+way to programatically find out if an option expects a string or a numeric
+argument, so you will have to refer to the appropriate DBD's documentation to
+find this out. Sorry.
=item B<SelectDB> I<Database>
blocks you want to refer to must be placed above the database block you want to
refer to them from.
+=item B<Host> I<Hostname>
+
+Sets the B<host> field of I<value lists> to I<Hostname> when dispatching
+values. Defaults to the global hostname setting.
+
=back
=head2 Plugin C<df>
IgnoreSelectedSnapshot false
</VolumeUsage>
+ <Quota>
+ Interval 60
+ </Quota>
+
+ <Snapvault>
+ Interval 30
+ </Snapvault>
+
<System>
Interval 30
GetCPULoad true
GetDiskOps true
GetDiskIO true
</System>
+
+ <VFiler vfilerA>
+ Interval 60
+
+ SnapVault true
+ # ...
+ </VFiler>
</Host>
</Plugin>
=item B<Host> I<Name>
A host block defines one NetApp filer. It will appear in collectd with the name
-you specify here which does not have to be its real name nor its hostname.
+you specify here which does not have to be its real name nor its hostname (see
+the B<Address> option below).
+
+=item B<VFiler> I<Name>
+
+A B<VFiler> block may only be used inside a host block. It accepts all the
+same options as the B<Host> block (except for cascaded B<VFiler> blocks) and
+will execute all NetApp API commands in the context of the specified
+VFiler(R). It will appear in collectd with the name you specify here which
+does not have to be its real name. The VFiler name may be specified using the
+B<VFilerName> option. If this is not specified, it will default to the name
+you specify here.
+
+The VFiler block inherits all connection related settings from the surrounding
+B<Host> block (which appear before the B<VFiler> block) but they may be
+overwritten inside the B<VFiler> block.
+
+This feature is useful, for example, when using a VFiler as SnapVault target
+(supported since OnTap 8.1). In that case, the SnapVault statistics are not
+available in the host filer (vfiler0) but only in the respective VFiler
+context.
=item B<Protocol> B<httpd>|B<http>
Type: string
+=item B<VFilerName> I<Name>
+
+The name of the VFiler in which context to execute API commands. If not
+specified, the name provided to the B<VFiler> block will be used instead.
+
+Optional
+
+Type: string
+
+Default: name of the B<VFiler> block
+
+B<Note:> This option may only be used inside B<VFiler> blocks.
+
=item B<Interval> I<Interval>
B<TODO>
=back
+=head3 The Quota block
+
+This will collect (tree) quota statistics (used disk space and number of used
+files). This mechanism is useful to get usage information for single qtrees.
+In case the quotas are not used for any other purpose, an entry similar to the
+following in C</etc/quotas> would be sufficient:
+
+ /vol/volA/some_qtree tree - - - - -
+
+After adding the entry, issue C<quota on -w volA> on the NetApp filer.
+
+=over 4
+
+=item B<Interval> I<Seconds>
+
+Collect SnapVault(R) statistics every I<Seconds> seconds.
+
+=back
+
+=head3 The SnapVault block
+
+This will collect statistics about the time and traffic of SnapVault(R)
+transfers.
+
+=over 4
+
+=item B<Interval> I<Seconds>
+
+Collect SnapVault(R) statistics every I<Seconds> seconds.
+
+=back
+
=head2 Plugin C<netlink>
The C<netlink> plugin uses a netlink socket to query the Linux kernel about
Synopsis:
<Plugin write_graphite>
- <Carbon>
+ <Node "example">
Host "localhost"
Port "2003"
Prefix "collectd"
- </Carbon>
+ </Node>
</Plugin>
+The configuration consists of one or more E<lt>B<Node>E<nbsp>I<Name>E<gt>
+blocks. Inside the B<Node> blocks, the following options are recognized:
+
=over 4
=item B<Host> I<Address>
=back
+=head2 Plugin C<write_riemann>
+
+The I<write_riemann plugin> will send values to I<Riemann>, a powerfull stream
+aggregation and monitoring system. The plugin sends I<Protobuf> encoded data to
+I<Riemann> using UDP packets.
+
+Synopsis:
+
+ <Plugin "write_riemann">
+ <Node "example">
+ Host "localhost"
+ Port "5555"
+ Protocol UDP
+ StoreRates true
+ AlwaysAppendDS false
+ Delay 10
+ </Node>
+ Tag "foobar"
+ </Plugin>
+
+The following options are understood by the I<write_riemann plugin>:
+
+=over 4
+
+=item E<lt>B<Node> I<Name>E<gt>
+
+The plugin's configuration consists of one or more B<Node> blocks. Each block
+is given a unique I<Name> and specifies one connection to an instance of
+I<Riemann>. Indise the B<Node> block, the following per-connection options are
+understood:
+
+=over 4
+
+=item B<Host> I<Address>
+
+Hostname or address to connect to. Defaults to C<localhost>.
+
+=item B<Port> I<Service>
+
+Service name or port number to connect to. Defaults to C<5555>.
+
+=item B<Protocol> B<UDP>|B<TCP>
+
+Specify the protocol to use when communicating with I<Riemann>. Defaults to
+B<UDP>.
+
+=item B<StoreRates> B<true>|B<false>
+
+If set to B<true> (the default), convert counter values to rates. If set to
+B<false> counter values are stored as is, i.e. as an increasing integer number.
+
+This will be reflected in the C<ds_type> tag: If B<StoreRates> is enabled,
+converted values will have "rate" appended to the data source type, e.g.
+C<ds_type:derive:rate>.
+
+=item B<AlwaysAppendDS> B<false>|B<true>
+
+If set the B<true>, append the name of the I<Data Source> (DS) to the
+"service", i.e. the field that, together with the "host" field, uniquely
+identifies a metric in I<Riemann>. If set to B<false> (the default), this is
+only done when there is more than one DS.
+
+=back
+
+=item B<Tag> I<String>
+
+Add the given string as an additional tag to the metric being sent to
+I<Riemann>.
+
+=back
+
=head1 THRESHOLD CONFIGURATION
Starting with version C<4.3.0> collectd has support for B<monitoring>. By that
# define COLLECTD_DEFAULT_INTERVAL 10.0
#endif
-#define STATIC_ARRAY_LEN(array) (sizeof (array) / sizeof ((array)[0]))
-
/* Remove GNU specific __attribute__ settings when using another compiler */
#if !__GNUC__
# define __attribute__(x) /**/
*ret_value = tmp;
return (0);
} /* }}} int strtoderive */
+
+int strarray_add (char ***ret_array, size_t *ret_array_len, char const *str) /* {{{ */
+{
+ char **array;
+ size_t array_len = *ret_array_len;
+
+ if (str == NULL)
+ return (EINVAL);
+
+ array = realloc (*ret_array,
+ (array_len + 1) * sizeof (*array));
+ if (array == NULL)
+ return (ENOMEM);
+ *ret_array = array;
+
+ array[array_len] = strdup (str);
+ if (array[array_len] == NULL)
+ return (ENOMEM);
+
+ array_len++;
+ *ret_array_len = array_len;
+ return (0);
+} /* }}} int strarray_add */
+
+void strarray_free (char **array, size_t array_len) /* {{{ */
+{
+ size_t i;
+
+ for (i = 0; i < array_len; i++)
+ sfree (array[i]);
+ sfree (array);
+} /* }}} void strarray_free */
* failure. If failure is returned, ret_value is not touched. */
int strtoderive (const char *string, derive_t *ret_value);
+int strarray_add (char ***ret_array, size_t *ret_array_len, char const *str);
+void strarray_free (char **array, size_t array_len);
+
#endif /* COMMON_H */
# include <wordexp.h>
#endif /* HAVE_WORDEXP_H */
+#if HAVE_FNMATCH_H
+# include <fnmatch.h>
+#endif /* HAVE_FNMATCH_H */
+
+#if HAVE_LIBGEN_H
+# include <libgen.h>
+#endif /* HAVE_LIBGEN_H */
+
#define ESCAPE_NULL(str) ((str) == NULL ? "(null)" : (str))
/*
{"PluginDir", dispatch_value_plugindir},
{"LoadPlugin", dispatch_loadplugin}
};
-static int cf_value_map_num = STATIC_ARRAY_LEN (cf_value_map);
+static int cf_value_map_num = STATIC_ARRAY_SIZE (cf_value_map);
static cf_global_option_t cf_global_options[] =
{
{"FQDNLookup", NULL, "true"},
{"Interval", NULL, NULL},
{"ReadThreads", NULL, "5"},
+ {"WriteThreads", NULL, "5"},
{"Timeout", NULL, "2"},
{"PreCacheChain", NULL, "PreCache"},
{"PostCacheChain", NULL, "PostCache"}
};
-static int cf_global_options_num = STATIC_ARRAY_LEN (cf_global_options);
+static int cf_global_options_num = STATIC_ARRAY_SIZE (cf_global_options);
static int cf_default_typesdb = 1;
} /* int cf_ci_append_children */
#define CF_MAX_DEPTH 8
-static oconfig_item_t *cf_read_generic (const char *path, int depth);
+static oconfig_item_t *cf_read_generic (const char *path,
+ const char *pattern, int depth);
static int cf_include_all (oconfig_item_t *root, int depth)
{
oconfig_item_t *new;
oconfig_item_t *old;
- /* Ignore all blocks, including `Include' blocks. */
- if (root->children[i].children_num != 0)
- continue;
+ char *pattern = NULL;
+
+ int j;
if (strcasecmp (root->children[i].key, "Include") != 0)
continue;
continue;
}
- new = cf_read_generic (old->values[0].value.string, depth + 1);
+ for (j = 0; j < old->children_num; ++j)
+ {
+ oconfig_item_t *child = old->children + j;
+
+ if (strcasecmp (child->key, "Filter") == 0)
+ cf_util_get_string (child, &pattern);
+ else
+ ERROR ("configfile: Option `%s' not allowed in <Include> block.",
+ child->key);
+ }
+
+ new = cf_read_generic (old->values[0].value.string, pattern, depth + 1);
+ sfree (pattern);
+
if (new == NULL)
continue;
return (0);
} /* int cf_include_all */
-static oconfig_item_t *cf_read_file (const char *file, int depth)
+static oconfig_item_t *cf_read_file (const char *file,
+ const char *pattern, int depth)
{
oconfig_item_t *root;
assert (depth < CF_MAX_DEPTH);
+ if (pattern != NULL) {
+#if HAVE_FNMATCH_H && HAVE_LIBGEN_H
+ char *tmp = sstrdup (file);
+ char *filename = basename (tmp);
+
+ if ((filename != NULL) && (fnmatch (pattern, filename, 0) != 0)) {
+ DEBUG ("configfile: Not including `%s' because it "
+ "does not match pattern `%s'.",
+ filename, pattern);
+ free (tmp);
+ return (NULL);
+ }
+
+ free (tmp);
+#else
+ ERROR ("configfile: Cannot apply pattern filter '%s' "
+ "to file '%s': functions basename() and / or "
+ "fnmatch() not available.", pattern, file);
+#endif /* HAVE_FNMATCH_H && HAVE_LIBGEN_H */
+ }
+
root = oconfig_parse_file (file);
if (root == NULL)
{
return strcmp (*(const char **) p1, *(const char **) p2);
}
-static oconfig_item_t *cf_read_dir (const char *dir, int depth)
+static oconfig_item_t *cf_read_dir (const char *dir,
+ const char *pattern, int depth)
{
oconfig_item_t *root = NULL;
DIR *dh;
oconfig_item_t *temp;
char *name = filenames[i];
- temp = cf_read_generic (name, depth);
+ temp = cf_read_generic (name, pattern, depth);
if (temp == NULL)
{
/* An error should already have been reported. */
* simpler function is used which does not do any such expansion.
*/
#if HAVE_WORDEXP_H
-static oconfig_item_t *cf_read_generic (const char *path, int depth)
+static oconfig_item_t *cf_read_generic (const char *path,
+ const char *pattern, int depth)
{
oconfig_item_t *root = NULL;
int status;
}
if (S_ISREG (statbuf.st_mode))
- temp = cf_read_file (path_ptr, depth);
+ temp = cf_read_file (path_ptr, pattern, depth);
else if (S_ISDIR (statbuf.st_mode))
- temp = cf_read_dir (path_ptr, depth);
+ temp = cf_read_dir (path_ptr, pattern, depth);
else
{
WARNING ("configfile: %s is neither a file nor a "
/* #endif HAVE_WORDEXP_H */
#else /* if !HAVE_WORDEXP_H */
-static oconfig_item_t *cf_read_generic (const char *path, int depth)
+static oconfig_item_t *cf_read_generic (const char *path,
+ const char *pattern, int depth)
{
struct stat statbuf;
int status;
}
if (S_ISREG (statbuf.st_mode))
- return (cf_read_file (path, depth));
+ return (cf_read_file (path, pattern, depth));
else if (S_ISDIR (statbuf.st_mode))
- return (cf_read_dir (path, depth));
+ return (cf_read_dir (path, pattern, depth));
ERROR ("configfile: %s is neither a file nor a directory.", path);
return (NULL);
oconfig_item_t *conf;
int i;
- conf = cf_read_generic (filename, 0 /* depth */);
+ conf = cf_read_generic (filename, /* pattern = */ NULL, /* depth = */ 0);
if (conf == NULL)
{
ERROR ("Unable to read config file %s.", filename);
/**
* collectd - src/dbi.c
- * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
**/
#include "collectd.h"
struct cdbi_driver_option_s /* {{{ */
{
char *key;
- char *value;
+ union
+ {
+ char *string;
+ int numeric;
+ } value;
+ _Bool is_numeric;
};
typedef struct cdbi_driver_option_s cdbi_driver_option_t; /* }}} */
char *select_db;
char *driver;
+ char *host;
cdbi_driver_option_t *driver_options;
size_t driver_options_num;
for (i = 0; i < db->driver_options_num; i++)
{
sfree (db->driver_options[i].key);
- sfree (db->driver_options[i].value);
+ if (!db->driver_options[i].is_numeric)
+ sfree (db->driver_options[i].value.string);
}
sfree (db->driver_options);
if ((ci->values_num != 2)
|| (ci->values[0].type != OCONFIG_TYPE_STRING)
- || (ci->values[1].type != OCONFIG_TYPE_STRING))
+ || ((ci->values[1].type != OCONFIG_TYPE_STRING)
+ && (ci->values[1].type != OCONFIG_TYPE_NUMBER)))
{
WARNING ("dbi plugin: The `DriverOption' config option "
- "needs exactly two string arguments.");
+ "needs exactly two arguments.");
return (-1);
}
db->driver_options = option;
option = db->driver_options + db->driver_options_num;
+ memset (option, 0, sizeof (*option));
option->key = strdup (ci->values[0].value.string);
if (option->key == NULL)
return (-1);
}
- option->value = strdup (ci->values[1].value.string);
- if (option->value == NULL)
+ if (ci->values[1].type == OCONFIG_TYPE_STRING)
{
- ERROR ("dbi plugin: strdup failed.");
- sfree (option->key);
- return (-1);
+ option->value.string = strdup (ci->values[1].value.string);
+ if (option->value.string == NULL)
+ {
+ ERROR ("dbi plugin: strdup failed.");
+ sfree (option->key);
+ return (-1);
+ }
+ }
+ else
+ {
+ assert (ci->values[1].type == OCONFIG_TYPE_NUMBER);
+ option->value.numeric = (int) (ci->values[1].value.number + .5);
+ option->is_numeric = 1;
}
db->driver_options_num++;
else if (strcasecmp ("Query", child->key) == 0)
status = udb_query_pick_from_list (child, queries, queries_num,
&db->queries, &db->queries_num);
+ else if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &db->host);
else
{
WARNING ("dbi plugin: Option `%s' not allowed here.", child->key);
sstrncpy (column_names[i], column_name, DATA_MAX_NAME_LEN);
} /* }}} for (i = 0; i < column_num; i++) */
- udb_query_prepare_result (q, prep_area, hostname_g,
+ udb_query_prepare_result (q, prep_area, (db->host ? db->host : hostname_g),
/* plugin = */ "dbi", db->name,
column_names, column_num, /* interval = */ 0);
* trouble finding out how to configure the plugin correctly.. */
for (i = 0; i < db->driver_options_num; i++)
{
- DEBUG ("dbi plugin: cdbi_connect_database (%s): "
- "key = %s; value = %s;",
- db->name,
- db->driver_options[i].key,
- db->driver_options[i].value);
+ if (db->driver_options[i].is_numeric)
+ {
+ status = dbi_conn_set_option_numeric (connection,
+ db->driver_options[i].key, db->driver_options[i].value.numeric);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("dbi plugin: cdbi_connect_database (%s): "
+ "dbi_conn_set_option_numeric (\"%s\", %i) failed: %s.",
+ db->name,
+ db->driver_options[i].key, db->driver_options[i].value.numeric,
+ cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ }
+ }
+ else
+ {
+ status = dbi_conn_set_option (connection,
+ db->driver_options[i].key, db->driver_options[i].value.string);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("dbi plugin: cdbi_connect_database (%s): "
+ "dbi_conn_set_option (\"%s\", \"%s\") failed: %s.",
+ db->name,
+ db->driver_options[i].key, db->driver_options[i].value.string,
+ cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ }
+ }
- status = dbi_conn_set_option (connection,
- db->driver_options[i].key, db->driver_options[i].value);
if (status != 0)
{
- char errbuf[1024];
- const char *opt;
-
- ERROR ("dbi plugin: cdbi_connect_database (%s): "
- "dbi_conn_set_option (%s, %s) failed: %s.",
- db->name,
- db->driver_options[i].key, db->driver_options[i].value,
- cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ char const *opt;
INFO ("dbi plugin: This is a list of all options understood "
"by the `%s' driver:", db->driver);
/**
* collectd - src/netapp.c
* Copyright (C) 2009,2010 Sven Trenkel
+ * Copyright (C) 2012-2013 teamix GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* DEALINGS IN THE SOFTWARE.
*
* Authors:
- * Sven Trenkel <collectd at semidefinite.de>
+ * Sven Trenkel <collectd at semidefinite.de>
+ * Sebastian 'tokkee' Harl <sh@teamix.net>
**/
#include "collectd.h"
*
* \brief Configuration struct for volume usage data (free / used).
*/
-#define CFG_VOLUME_USAGE_DF 0x0002
-#define CFG_VOLUME_USAGE_SNAP 0x0004
-#define CFG_VOLUME_USAGE_ALL 0x0006
-#define HAVE_VOLUME_USAGE_NORM_FREE 0x0010
-#define HAVE_VOLUME_USAGE_NORM_USED 0x0020
-#define HAVE_VOLUME_USAGE_SNAP_RSVD 0x0040
-#define HAVE_VOLUME_USAGE_SNAP_USED 0x0080
-#define HAVE_VOLUME_USAGE_SIS_SAVED 0x0100
-#define HAVE_VOLUME_USAGE_ALL 0x01f0
-#define IS_VOLUME_USAGE_OFFLINE 0x0200
+#define CFG_VOLUME_USAGE_DF 0x0002
+#define CFG_VOLUME_USAGE_SNAP 0x0004
+#define CFG_VOLUME_USAGE_ALL 0x0006
+#define HAVE_VOLUME_USAGE_NORM_FREE 0x0010
+#define HAVE_VOLUME_USAGE_NORM_USED 0x0020
+#define HAVE_VOLUME_USAGE_SNAP_RSVD 0x0040
+#define HAVE_VOLUME_USAGE_SNAP_USED 0x0080
+#define HAVE_VOLUME_USAGE_SIS_SAVED 0x0100
+#define HAVE_VOLUME_USAGE_COMPRESS_SAVED 0x0200
+#define HAVE_VOLUME_USAGE_DEDUP_SAVED 0x0400
+#define HAVE_VOLUME_USAGE_ALL 0x07f0
+#define IS_VOLUME_USAGE_OFFLINE 0x0800
struct data_volume_usage_s;
typedef struct data_volume_usage_s data_volume_usage_t;
struct data_volume_usage_s {
uint64_t snap_reserved;
uint64_t snap_used;
uint64_t sis_saved;
+ uint64_t compress_saved;
+ uint64_t dedup_saved;
data_volume_usage_t *next;
};
} cfg_volume_usage_t;
/* }}} cfg_volume_usage_t */
+/*! Data types for quota statistics {{{
+ *
+ * \brief Persistent data for quota statistics
+ */
+typedef struct {
+ cna_interval_t interval;
+ na_elem_t *query;
+} cfg_quota_t;
+/* }}} cfg_quota_t */
+
+/*! Data types for SnapVault statistics {{{
+ *
+ * \brief Persistent data for SnapVault(R) statistics
+ */
+typedef struct {
+ cna_interval_t interval;
+ na_elem_t *query;
+} cfg_snapvault_t;
+/* }}} cfg_snapvault_t */
+
/*! Data types for system statistics {{{
*
* \brief Persistent data for system performance counters
int port;
char *username;
char *password;
+ char *vfiler;
cdtime_t interval;
na_server_t *srv;
cfg_disk_t *cfg_disk;
cfg_volume_perf_t *cfg_volume_perf;
cfg_volume_usage_t *cfg_volume_usage;
+ cfg_quota_t *cfg_quota;
+ cfg_snapvault_t *cfg_snapvault;
cfg_system_t *cfg_system;
struct host_config_s *next;
sfree (cvu);
} /* }}} void free_cfg_volume_usage */
+static void free_cfg_quota (cfg_quota_t *q) /* {{{ */
+{
+ if (q == NULL)
+ return;
+
+ if (q->query != NULL)
+ na_elem_free (q->query);
+
+ sfree (q);
+} /* }}} void free_cfg_quota */
+
+static void free_cfg_snapvault (cfg_snapvault_t *sv) /* {{{ */
+{
+ if (sv == NULL)
+ return;
+
+ if (sv->query != NULL)
+ na_elem_free (sv->query);
+
+ sfree (sv);
+} /* }}} void free_cfg_snapvault */
+
static void free_cfg_system (cfg_system_t *cs) /* {{{ */
{
if (cs == NULL)
sfree (hc->host);
sfree (hc->username);
sfree (hc->password);
+ sfree (hc->vfiler);
free_cfg_disk (hc->cfg_disk);
free_cfg_wafl (hc->cfg_wafl);
free_cfg_volume_perf (hc->cfg_volume_perf);
free_cfg_volume_usage (hc->cfg_volume_usage);
+ free_cfg_quota (hc->cfg_quota);
+ free_cfg_snapvault (hc->cfg_snapvault);
free_cfg_system (hc->cfg_system);
if (hc->srv != NULL)
uint64_t norm_used = v->norm_used;
uint64_t norm_free = v->norm_free;
uint64_t sis_saved = v->sis_saved;
+ uint64_t compress_saved = v->compress_saved;
+ uint64_t dedup_saved = v->dedup_saved;
uint64_t snap_reserve_used = 0;
uint64_t snap_reserve_free = v->snap_reserved;
uint64_t snap_norm_used = v->snap_used;
"df_complex", "sis_saved",
(double) sis_saved, /* timestamp = */ 0, interval);
+ if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_COMPRESS_SAVED))
+ submit_double (hostname, /* plugin instance = */ plugin_instance,
+ "df_complex", "compression_saved",
+ (double) compress_saved, /* timestamp = */ 0, interval);
+
+ if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_DEDUP_SAVED))
+ submit_double (hostname, /* plugin instance = */ plugin_instance,
+ "df_complex", "dedup_saved",
+ (double) dedup_saved, /* timestamp = */ 0, interval);
+
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_USED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "used",
v->flags |= HAVE_VOLUME_USAGE_SNAP_USED;
} /* }}} void cna_handle_volume_snap_usage */
+static void cna_handle_volume_sis_data (const host_config_t *host, /* {{{ */
+ data_volume_usage_t *v, na_elem_t *sis)
+{
+ const char *sis_state;
+ uint64_t sis_saved_reported;
+
+ if (na_elem_child(sis, "sis-info"))
+ sis = na_elem_child(sis, "sis-info");
+
+ sis_state = na_child_get_string(sis, "state");
+ if (sis_state == NULL)
+ return;
+
+ /* If SIS is not enabled, there's nothing left to do for this volume. */
+ if (strcmp ("enabled", sis_state) != 0)
+ return;
+
+ sis_saved_reported = na_child_get_uint64(sis, "size-saved", UINT64_MAX);
+ if (sis_saved_reported == UINT64_MAX)
+ return;
+
+ /* size-saved is actually a 32 bit number, so ... time for some guesswork. */
+ if ((sis_saved_reported >> 32) != 0) {
+ /* In case they ever fix this bug. */
+ v->sis_saved = sis_saved_reported;
+ v->flags |= HAVE_VOLUME_USAGE_SIS_SAVED;
+ } else { /* really hacky work-around code. {{{ */
+ uint64_t sis_saved_percent;
+ uint64_t sis_saved_guess;
+ uint64_t overflow_guess;
+ uint64_t guess1, guess2, guess3;
+
+ /* Check if we have v->norm_used. Without it, we cannot calculate
+ * sis_saved_guess. */
+ if ((v->flags & HAVE_VOLUME_USAGE_NORM_USED) == 0)
+ return;
+
+ sis_saved_percent = na_child_get_uint64(sis, "percentage-saved", UINT64_MAX);
+ if (sis_saved_percent > 100)
+ return;
+
+ /* The "size-saved" value is a 32bit unsigned integer. This is a bug and
+ * will hopefully be fixed in later versions. To work around the bug, try
+ * to figure out how often the 32bit integer wrapped around by using the
+ * "percentage-saved" value. Because the percentage is in the range
+ * [0-100], this should work as long as the saved space does not exceed
+ * 400 GBytes. */
+ /* percentage-saved = size-saved / (size-saved + size-used) */
+ if (sis_saved_percent < 100)
+ sis_saved_guess = v->norm_used * sis_saved_percent / (100 - sis_saved_percent);
+ else
+ sis_saved_guess = v->norm_used;
+
+ overflow_guess = sis_saved_guess >> 32;
+ guess1 = overflow_guess ? ((overflow_guess - 1) << 32) + sis_saved_reported : sis_saved_reported;
+ guess2 = (overflow_guess << 32) + sis_saved_reported;
+ guess3 = ((overflow_guess + 1) << 32) + sis_saved_reported;
+
+ if (sis_saved_guess < guess2) {
+ if ((sis_saved_guess - guess1) < (guess2 - sis_saved_guess))
+ v->sis_saved = guess1;
+ else
+ v->sis_saved = guess2;
+ } else {
+ if ((sis_saved_guess - guess2) < (guess3 - sis_saved_guess))
+ v->sis_saved = guess2;
+ else
+ v->sis_saved = guess3;
+ }
+ v->flags |= HAVE_VOLUME_USAGE_SIS_SAVED;
+ } /* }}} end of 32-bit workaround */
+} /* }}} void cna_handle_volume_sis_data */
+
+/* ONTAP >= 8.1 uses SIS for managing dedup and compression */
+static void cna_handle_volume_sis_saved (const host_config_t *host, /* {{{ */
+ data_volume_usage_t *v, na_elem_t *sis)
+{
+ uint64_t saved;
+
+ if (na_elem_child(sis, "sis-info"))
+ sis = na_elem_child(sis, "sis-info");
+
+ saved = na_child_get_uint64(sis, "compress-saved", UINT64_MAX);
+ if (saved != UINT64_MAX) {
+ v->compress_saved = saved;
+ v->flags |= HAVE_VOLUME_USAGE_COMPRESS_SAVED;
+ }
+
+ saved = na_child_get_uint64(sis, "dedup-saved", UINT64_MAX);
+ if (saved != UINT64_MAX) {
+ v->dedup_saved = saved;
+ v->flags |= HAVE_VOLUME_USAGE_DEDUP_SAVED;
+ }
+} /* }}} void cna_handle_volume_sis_saved */
+
static int cna_handle_volume_usage_data (const host_config_t *host, /* {{{ */
cfg_volume_usage_t *cfg_volume, na_elem_t *data)
{
uint64_t value;
na_elem_t *sis;
- const char *sis_state;
- uint64_t sis_saved_reported;
volume_name = na_child_get_string (elem_volume, "name");
if (volume_name == NULL)
}
sis = na_elem_child(elem_volume, "sis");
- if (sis == NULL)
- continue;
-
- if (na_elem_child(sis, "sis-info"))
- sis = na_elem_child(sis, "sis-info");
-
- sis_state = na_child_get_string(sis, "state");
- if (sis_state == NULL)
- continue;
-
- /* If SIS is not enabled, there's nothing left to do for this volume. */
- if (strcmp ("enabled", sis_state) != 0)
- continue;
-
- sis_saved_reported = na_child_get_uint64(sis, "size-saved", UINT64_MAX);
- if (sis_saved_reported == UINT64_MAX)
- continue;
-
- /* size-saved is actually a 32 bit number, so ... time for some guesswork. */
- if ((sis_saved_reported >> 32) != 0) {
- /* In case they ever fix this bug. */
- v->sis_saved = sis_saved_reported;
- v->flags |= HAVE_VOLUME_USAGE_SIS_SAVED;
- } else { /* really hacky work-around code. {{{ */
- uint64_t sis_saved_percent;
- uint64_t sis_saved_guess;
- uint64_t overflow_guess;
- uint64_t guess1, guess2, guess3;
-
- /* Check if we have v->norm_used. Without it, we cannot calculate
- * sis_saved_guess. */
- if ((v->flags & HAVE_VOLUME_USAGE_NORM_USED) == 0)
- continue;
-
- sis_saved_percent = na_child_get_uint64(sis, "percentage-saved", UINT64_MAX);
- if (sis_saved_percent > 100)
- continue;
-
- /* The "size-saved" value is a 32bit unsigned integer. This is a bug and
- * will hopefully be fixed in later versions. To work around the bug, try
- * to figure out how often the 32bit integer wrapped around by using the
- * "percentage-saved" value. Because the percentage is in the range
- * [0-100], this should work as long as the saved space does not exceed
- * 400 GBytes. */
- /* percentage-saved = size-saved / (size-saved + size-used) */
- if (sis_saved_percent < 100)
- sis_saved_guess = v->norm_used * sis_saved_percent / (100 - sis_saved_percent);
- else
- sis_saved_guess = v->norm_used;
-
- overflow_guess = sis_saved_guess >> 32;
- guess1 = overflow_guess ? ((overflow_guess - 1) << 32) + sis_saved_reported : sis_saved_reported;
- guess2 = (overflow_guess << 32) + sis_saved_reported;
- guess3 = ((overflow_guess + 1) << 32) + sis_saved_reported;
-
- if (sis_saved_guess < guess2) {
- if ((sis_saved_guess - guess1) < (guess2 - sis_saved_guess))
- v->sis_saved = guess1;
- else
- v->sis_saved = guess2;
- } else {
- if ((sis_saved_guess - guess2) < (guess3 - sis_saved_guess))
- v->sis_saved = guess2;
- else
- v->sis_saved = guess3;
- }
- v->flags |= HAVE_VOLUME_USAGE_SIS_SAVED;
- } /* }}} end of 32-bit workaround */
+ if (sis != NULL) {
+ cna_handle_volume_sis_data (host, v, sis);
+ cna_handle_volume_sis_saved (host, v, sis);
+ }
} /* for (elem_volume) */
return (cna_submit_volume_usage_data (host->name, cfg_volume, host->interval));
return (status);
} /* }}} int cna_query_volume_usage */
+/* Data corresponding to <Quota /> */
+static int cna_handle_quota_data (const host_config_t *host, /* {{{ */
+ cfg_quota_t *cfg_quota, na_elem_t *data)
+{
+ na_elem_t *elem_quota;
+ na_elem_t *elem_quotas;
+ na_elem_iter_t iter_quota;
+
+ elem_quotas = na_elem_child (data, "quotas");
+ if (elem_quotas == NULL)
+ {
+ ERROR ("netapp plugin: cna_handle_quota_data: "
+ "na_elem_child (\"quotas\") failed "
+ "for host %s.", host->name);
+ return (-1);
+ }
+
+ iter_quota = na_child_iterator (elem_quotas);
+ for (elem_quota = na_iterator_next (&iter_quota);
+ elem_quota != NULL;
+ elem_quota = na_iterator_next (&iter_quota))
+ {
+ const char *quota_type, *volume_name, *tree_name;
+ uint64_t value;
+
+ char plugin_instance[DATA_MAX_NAME_LEN];
+
+ quota_type = na_child_get_string (elem_quota, "quota-type");
+ if (quota_type == NULL)
+ continue;
+
+ /* possible TODO: support other types as well */
+ if (strcmp (quota_type, "tree") != 0)
+ continue;
+
+ tree_name = na_child_get_string (elem_quota, "tree");
+ if ((tree_name == NULL) || (*tree_name == '\0'))
+ continue;
+
+ volume_name = na_child_get_string (elem_quota, "volume");
+ if (volume_name == NULL)
+ continue;
+
+ ssnprintf (plugin_instance, sizeof (plugin_instance),
+ "quota-%s-%s", volume_name, tree_name);
+
+ value = na_child_get_uint64 (elem_quota, "disk-used", UINT64_MAX);
+ if (value != UINT64_MAX) {
+ value *= 1024; /* disk-used reports kilobytes */
+ submit_double (host->name, plugin_instance,
+ /* type = */ "df_complex", /* type instance = */ NULL,
+ (double)value, /* timestamp = */ 0, host->interval);
+ }
+
+ value = na_child_get_uint64 (elem_quota, "files-used", UINT64_MAX);
+ if (value != UINT64_MAX) {
+ submit_double (host->name, plugin_instance,
+ /* type = */ "files", /* type instance = */ NULL,
+ (double)value, /* timestamp = */ 0, host->interval);
+ }
+ } /* for (elem_quota) */
+
+ return (0);
+} /* }}} int cna_handle_volume_usage_data */
+
+static int cna_setup_quota (cfg_quota_t *cq) /* {{{ */
+{
+ if (cq == NULL)
+ return (EINVAL);
+
+ if (cq->query != NULL)
+ return (0);
+
+ cq->query = na_elem_new ("quota-report");
+ if (cq->query == NULL)
+ {
+ ERROR ("netapp plugin: na_elem_new failed.");
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int cna_setup_quota */
+
+static int cna_query_quota (host_config_t *host) /* {{{ */
+{
+ na_elem_t *data;
+ int status;
+ cdtime_t now;
+
+ if (host == NULL)
+ return (EINVAL);
+
+ /* If the user did not configure quota statistics, return without
+ * doing anything. */
+ if (host->cfg_quota == NULL)
+ return (0);
+
+ now = cdtime ();
+ if ((host->cfg_quota->interval.interval + host->cfg_quota->interval.last_read) > now)
+ return (0);
+
+ status = cna_setup_quota (host->cfg_quota);
+ if (status != 0)
+ return (status);
+ assert (host->cfg_quota->query != NULL);
+
+ data = na_server_invoke_elem (host->srv, host->cfg_quota->query);
+ if (na_results_status (data) != NA_OK)
+ {
+ ERROR ("netapp plugin: cna_query_quota: na_server_invoke_elem failed for host %s: %s",
+ host->name, na_results_reason (data));
+ na_elem_free (data);
+ return (-1);
+ }
+
+ status = cna_handle_quota_data (host, host->cfg_quota, data);
+
+ if (status == 0)
+ host->cfg_quota->interval.last_read = now;
+
+ na_elem_free (data);
+ return (status);
+} /* }}} int cna_query_quota */
+
+/* Data corresponding to <SnapVault /> */
+static int cna_handle_snapvault_data (const char *hostname, /* {{{ */
+ cfg_snapvault_t *cfg_snapvault, na_elem_t *data, cdtime_t interval)
+{
+ na_elem_t *status;
+ na_elem_iter_t status_iter;
+
+ status = na_elem_child (data, "status-list");
+ if (! status) {
+ ERROR ("netapp plugin: SnapVault status record missing status-list");
+ return (0);
+ }
+
+ status_iter = na_child_iterator (status);
+ for (status = na_iterator_next (&status_iter);
+ status != NULL;
+ status = na_iterator_next (&status_iter))
+ {
+ const char *dest_sys, *dest_path, *src_sys, *src_path;
+ char plugin_instance[DATA_MAX_NAME_LEN];
+ uint64_t value;
+
+ dest_sys = na_child_get_string (status, "destination-system");
+ dest_path = na_child_get_string (status, "destination-path");
+ src_sys = na_child_get_string (status, "source-system");
+ src_path = na_child_get_string (status, "source-path");
+
+ if ((! dest_sys) || (! dest_path) || (! src_sys) || (! src_path))
+ continue;
+
+ value = na_child_get_uint64 (status, "lag-time", UINT64_MAX);
+ if (value == UINT64_MAX) /* no successful baseline transfer yet */
+ continue;
+
+ /* possible TODO: make plugin instance configurable */
+ ssnprintf (plugin_instance, sizeof (plugin_instance),
+ "snapvault-%s", dest_path);
+ submit_double (hostname, plugin_instance, /* type = */ "delay", NULL,
+ (double)value, /* timestamp = */ 0, interval);
+
+ value = na_child_get_uint64 (status, "last-transfer-duration", UINT64_MAX);
+ if (value != UINT64_MAX)
+ submit_double (hostname, plugin_instance, /* type = */ "duration", "last_transfer",
+ (double)value, /* timestamp = */ 0, interval);
+
+ value = na_child_get_uint64 (status, "transfer-progress", UINT64_MAX);
+ if (value == UINT64_MAX)
+ value = na_child_get_uint64 (status, "last-transfer-size", UINT64_MAX);
+ if (value != UINT64_MAX) {
+ value *= 1024; /* this is kilobytes */
+ submit_derive (hostname, plugin_instance, /* type = */ "if_rx_octets", "transferred",
+ value, /* timestamp = */ 0, interval);
+ }
+ } /* for (status) */
+
+ return (0);
+} /* }}} int cna_handle_snapvault_data */
+
+static int cna_handle_snapvault_iter (host_config_t *host, /* {{{ */
+ na_elem_t *data)
+{
+ const char *tag;
+
+ uint32_t records_count;
+ uint32_t i;
+
+ records_count = na_child_get_uint32 (data, "records", UINT32_MAX);
+ if (records_count == UINT32_MAX)
+ return 0;
+
+ tag = na_child_get_string (data, "tag");
+ if (! tag)
+ return 0;
+
+ DEBUG ("netapp plugin: Iterating %u SV records (tag = %s)", records_count, tag);
+
+ for (i = 0; i < records_count; ++i) {
+ na_elem_t *elem;
+
+ elem = na_server_invoke (host->srv,
+ "snapvault-secondary-relationship-status-list-iter-next",
+ "maximum", "1", "tag", tag, NULL);
+
+ if (na_results_status (elem) != NA_OK)
+ {
+ ERROR ("netapp plugin: cna_handle_snapvault_iter: "
+ "na_server_invoke failed for host %s: %s",
+ host->name, na_results_reason (data));
+ na_elem_free (elem);
+ return (-1);
+ }
+
+ cna_handle_snapvault_data (host->name, host->cfg_snapvault, elem, host->interval);
+ na_elem_free (elem);
+ }
+
+ na_elem_free (na_server_invoke (host->srv,
+ "snapvault-secondary-relationship-status-list-iter-end",
+ "tag", tag, NULL));
+ return (0);
+} /* }}} int cna_handle_snapvault_iter */
+
+static int cna_setup_snapvault (cfg_snapvault_t *sv) /* {{{ */
+{
+ if (sv == NULL)
+ return (EINVAL);
+
+ if (sv->query != NULL)
+ return (0);
+
+ sv->query = na_elem_new ("snapvault-secondary-relationship-status-list-iter-start");
+ if (sv->query == NULL)
+ {
+ ERROR ("netapp plugin: na_elem_new failed.");
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int cna_setup_snapvault */
+
+static int cna_query_snapvault (host_config_t *host) /* {{{ */
+{
+ na_elem_t *data;
+ int status;
+ cdtime_t now;
+
+ if (host == NULL)
+ return EINVAL;
+
+ if (host->cfg_snapvault == NULL)
+ return 0;
+
+ now = cdtime ();
+ if ((host->cfg_snapvault->interval.interval + host->cfg_snapvault->interval.last_read) > now)
+ return (0);
+
+ status = cna_setup_snapvault (host->cfg_snapvault);
+ if (status != 0)
+ return (status);
+ assert (host->cfg_snapvault->query != NULL);
+
+ data = na_server_invoke_elem (host->srv, host->cfg_snapvault->query);
+ if (na_results_status (data) != NA_OK)
+ {
+ ERROR ("netapp plugin: cna_query_snapvault: na_server_invoke_elem failed for host %s: %s",
+ host->name, na_results_reason (data));
+ na_elem_free (data);
+ return (-1);
+ }
+
+ status = cna_handle_snapvault_iter (host, data);
+
+ if (status == 0)
+ host->cfg_snapvault->interval.last_read = now;
+
+ na_elem_free (data);
+ return (status);
+} /* }}} int cna_query_snapvault */
+
/* Data corresponding to <System /> */
static int cna_handle_system_data (const char *hostname, /* {{{ */
cfg_system_t *cfg_system, na_elem_t *data, int interval)
ignorelist_set_invert (il, /* invert = */ 1);
} /* }}} void cna_config_volume_usage_default */
+/* Corresponds to a <Quota /> block */
+static int cna_config_quota (host_config_t *host, oconfig_item_t *ci) /* {{{ */
+{
+ cfg_quota_t *cfg_quota;
+ int i;
+
+ if ((host == NULL) || (ci == NULL))
+ return (EINVAL);
+
+ if (host->cfg_quota == NULL)
+ {
+ cfg_quota = malloc (sizeof (*cfg_quota));
+ if (cfg_quota == NULL)
+ return (ENOMEM);
+ memset (cfg_quota, 0, sizeof (*cfg_quota));
+ cfg_quota->query = NULL;
+
+ host->cfg_quota = cfg_quota;
+ }
+ cfg_quota = host->cfg_quota;
+
+ for (i = 0; i < ci->children_num; ++i) {
+ oconfig_item_t *item = ci->children + i;
+
+ if (strcasecmp (item->key, "Interval") == 0)
+ cna_config_get_interval (item, &cfg_quota->interval);
+ else
+ WARNING ("netapp plugin: The option %s is not allowed within "
+ "`Quota' blocks.", item->key);
+ }
+
+ return (0);
+} /* }}} int cna_config_quota */
+
/* Corresponds to a <Disks /> block */
static int cna_config_disk(host_config_t *host, oconfig_item_t *ci) { /* {{{ */
cfg_disk_t *cfg_disk;
return (0);
} /* }}} int cna_config_volume_usage */
+/* Corresponds to a <SnapVault /> block */
+static int cna_config_snapvault (host_config_t *host, /* {{{ */
+ const oconfig_item_t *ci)
+{
+ cfg_snapvault_t *cfg_snapvault;
+ int i;
+
+ if ((host == NULL) || (ci == NULL))
+ return EINVAL;
+
+ if (host->cfg_snapvault == NULL)
+ {
+ cfg_snapvault = malloc (sizeof (*cfg_snapvault));
+ if (cfg_snapvault == NULL)
+ return ENOMEM;
+ memset (cfg_snapvault, 0, sizeof (*cfg_snapvault));
+ cfg_snapvault->query = NULL;
+
+ host->cfg_snapvault = cfg_snapvault;
+ }
+
+ cfg_snapvault = host->cfg_snapvault;
+
+ for (i = 0; i < ci->children_num; ++i) {
+ oconfig_item_t *item = ci->children + i;
+
+ if (strcasecmp (item->key, "Interval") == 0)
+ cna_config_get_interval (item, &cfg_snapvault->interval);
+ else
+ WARNING ("netapp plugin: The option %s is not allowed within "
+ "`SnapVault' blocks.", item->key);
+ }
+
+ return 0;
+} /* }}} int cna_config_snapvault */
+
/* Corresponds to a <System /> block */
static int cna_config_system (host_config_t *host, /* {{{ */
oconfig_item_t *ci)
} /* }}} int cna_config_system */
/* Corresponds to a <Host /> block. */
-static host_config_t *cna_config_host (const oconfig_item_t *ci) /* {{{ */
+static host_config_t *cna_alloc_host (void) /* {{{ */
{
- oconfig_item_t *item;
host_config_t *host;
- int status;
- int i;
-
- if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
- WARNING("netapp plugin: \"Host\" needs exactly one string argument. Ignoring host block.");
- return 0;
- }
host = malloc(sizeof(*host));
+ if (! host)
+ return (NULL);
memset (host, 0, sizeof (*host));
+
host->name = NULL;
host->protocol = NA_SERVER_TRANSPORT_HTTPS;
host->host = NULL;
host->username = NULL;
host->password = NULL;
+ host->vfiler = NULL;
host->srv = NULL;
host->cfg_wafl = NULL;
host->cfg_disk = NULL;
host->cfg_volume_perf = NULL;
host->cfg_volume_usage = NULL;
+ host->cfg_quota = NULL;
+ host->cfg_snapvault = NULL;
host->cfg_system = NULL;
- status = cf_util_get_string (ci, &host->name);
- if (status != 0)
- {
- sfree (host);
+ return (host);
+} /* }}} host_config_t *cna_alloc_host */
+
+static host_config_t *cna_shallow_clone_host (host_config_t *host) /* {{{ */
+{
+ host_config_t *clone;
+
+ if (host == NULL)
return (NULL);
+
+ clone = cna_alloc_host ();
+ if (clone == NULL)
+ return (NULL);
+
+ if (host->name != NULL) {
+ clone->name = strdup (host->name);
+ if (clone->name == NULL) {
+ free_host_config (clone);
+ return NULL;
+ }
+ }
+
+ clone->protocol = host->protocol;
+
+ if (host->host != NULL) {
+ clone->host = strdup (host->host);
+ if (clone->host == NULL) {
+ free_host_config (clone);
+ return NULL;
+ }
+ }
+
+ clone->port = host->port;
+
+ if (host->username != NULL) {
+ clone->username = strdup (host->username);
+ if (clone->username == NULL) {
+ free_host_config (clone);
+ return NULL;
+ }
}
+ if (host->password != NULL) {
+ clone->password = strdup (host->password);
+ if (clone->password == NULL) {
+ free_host_config (clone);
+ return NULL;
+ }
+ }
+
+ clone->interval = host->interval;
+
+ return (clone);
+} /* }}} host_config_t *cna_shallow_clone_host */
+
+static int cna_read (user_data_t *ud);
+
+static int cna_register_host (host_config_t *host) /* {{{ */
+{
+ char cb_name[256];
+ struct timespec interval;
+ user_data_t ud;
+
+ if (host->vfiler)
+ ssnprintf (cb_name, sizeof (cb_name), "netapp-%s-%s",
+ host->name, host->vfiler);
+ else
+ ssnprintf (cb_name, sizeof (cb_name), "netapp-%s", host->name);
+
+ CDTIME_T_TO_TIMESPEC (host->interval, &interval);
+
+ memset (&ud, 0, sizeof (ud));
+ ud.data = host;
+ ud.free_func = (void (*) (void *)) free_host_config;
+
+ plugin_register_complex_read (/* group = */ NULL, cb_name,
+ /* callback = */ cna_read,
+ /* interval = */ (host->interval > 0) ? &interval : NULL,
+ /* user data = */ &ud);
+
+ return (0);
+} /* }}} int cna_register_host */
+
+static int cna_config_host (host_config_t *host, /* {{{ */
+ const oconfig_item_t *ci)
+{
+ oconfig_item_t *item;
+ _Bool is_vfiler = 0;
+ int status;
+ int i;
+
+ if (! strcasecmp (ci->key, "VFiler"))
+ is_vfiler = 1;
+
+ if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+ WARNING ("netapp plugin: \"%s\" needs exactly one string argument. Ignoring host block.", ci->key);
+ return (1);
+ }
+
+ status = cf_util_get_string (ci, &host->name);
+ if (status != 0)
+ return (1);
for (i = 0; i < ci->children_num; ++i) {
item = ci->children + i;
} else if (!strcasecmp(item->key, "Protocol")) {
if ((item->values_num != 1) || (item->values[0].type != OCONFIG_TYPE_STRING) || (strcasecmp(item->values[0].value.string, "http") && strcasecmp(item->values[0].value.string, "https"))) {
WARNING("netapp plugin: \"Protocol\" needs to be either \"http\" or \"https\". Ignoring host block \"%s\".", ci->values[0].value.string);
- return 0;
+ return (1);
}
if (!strcasecmp(item->values[0].value.string, "http")) host->protocol = NA_SERVER_TRANSPORT_HTTP;
else host->protocol = NA_SERVER_TRANSPORT_HTTPS;
cna_config_volume_performance(host, item);
} else if (!strcasecmp(item->key, "VolumeUsage")) {
cna_config_volume_usage(host, item);
+ } else if (!strcasecmp(item->key, "Quota")) {
+ cna_config_quota(host, item);
+ } else if (!strcasecmp(item->key, "SnapVault")) {
+ cna_config_snapvault(host, item);
} else if (!strcasecmp(item->key, "System")) {
cna_config_system(host, item);
+ } else if ((!strcasecmp(item->key, "VFiler")) && (! is_vfiler)) {
+ host_config_t *vfiler;
+
+ vfiler = cna_shallow_clone_host (host);
+ if (! vfiler) {
+ ERROR ("netapp plugin: Failed to allocate host object for vfiler.");
+ continue;
+ }
+
+ if (cna_config_host (vfiler, item)) {
+ free_host_config (vfiler);
+ continue;
+ }
+
+ cna_register_host (vfiler);
+ } else if ((!strcasecmp(item->key, "VFilerName")) && is_vfiler) {
+ status = cf_util_get_string (item, &host->vfiler);
} else {
- WARNING("netapp plugin: Ignoring unknown config option \"%s\" in host block \"%s\".",
- item->key, ci->values[0].value.string);
+ WARNING ("netapp plugin: Ignoring unknown config option \"%s\" in %s block \"%s\".",
+ item->key, is_vfiler ? "vfiler" : "host", ci->values[0].value.string);
}
if (status != 0)
if (host->host == NULL)
host->host = strdup (host->name);
+ if (is_vfiler && (! host->vfiler))
+ host->vfiler = strdup (host->name);
+
if (host->host == NULL)
status = -1;
}
if (status != 0)
- {
- free_host_config (host);
- return (NULL);
- }
+ return status;
- return host;
+ return (0);
} /* }}} host_config_t *cna_config_host */
/*
*/
static int cna_init_host (host_config_t *host) /* {{{ */
{
+ /* Request version 1.1 of the ONTAP API */
+ int major_version = 1, minor_version = 1;
+
if (host == NULL)
return (EINVAL);
if (host->srv != NULL)
return (0);
- /* Request version 1.1 of the ONTAP API */
- host->srv = na_server_open(host->host,
- /* major version = */ 1, /* minor version = */ 1);
+ if (host->vfiler != NULL) /* Request version 1.7 of the ONTAP API */
+ minor_version = 7;
+
+ host->srv = na_server_open (host->host, major_version, minor_version);
if (host->srv == NULL) {
ERROR ("netapp plugin: na_server_open (%s) failed.", host->host);
return (-1);
na_server_adminuser(host->srv, host->username, host->password);
na_server_set_timeout(host->srv, 5 /* seconds */);
+ if (host->vfiler != NULL) {
+ if (! na_server_set_vfiler (host->srv, host->vfiler)) {
+ ERROR ("netapp plugin: Failed to connect to VFiler '%s' on host '%s'.",
+ host->vfiler, host->host);
+ return (-1);
+ }
+ else {
+ INFO ("netapp plugin: Connected to VFiler '%s' on host '%s'.",
+ host->vfiler, host->host);
+ }
+ }
+
return (0);
} /* }}} int cna_init_host */
if (status != 0)
return (status);
+ status = cna_query_quota (host);
+ if (status != 0)
+ return (status);
+
+ status = cna_query_snapvault (host);
+ if (status != 0)
+ return (status);
+
status = cna_query_system (host);
if (status != 0)
return (status);
if (strcasecmp(item->key, "Host") == 0)
{
host_config_t *host;
- char cb_name[256];
- struct timespec interval;
- user_data_t ud;
- host = cna_config_host (item);
- if (host == NULL)
+ host = cna_alloc_host ();
+ if (host == NULL) {
+ ERROR ("netapp plugin: Failed to allocate host object.");
continue;
+ }
- ssnprintf (cb_name, sizeof (cb_name), "netapp-%s", host->name);
-
- CDTIME_T_TO_TIMESPEC (host->interval, &interval);
-
- memset (&ud, 0, sizeof (ud));
- ud.data = host;
- ud.free_func = (void (*) (void *)) free_host_config;
+ if (cna_config_host (host, item) != 0) {
+ free_host_config (host);
+ continue;
+ }
- plugin_register_complex_read (/* group = */ NULL, cb_name,
- /* callback = */ cna_read,
- /* interval = */ (host->interval > 0) ? &interval : NULL,
- /* user data = */ &ud);
- continue;
+ cna_register_host (host);
}
else /* if (item->key != "Host") */
{
}
}
- plugin_dispatch_values_secure (vl);
+ plugin_dispatch_values (vl);
stats_values_dispatched++;
meta_data_destroy (vl->meta);
vl.values[0].derive = (derive_t) copy_octets_rx;
vl.values[1].derive = (derive_t) copy_octets_tx;
sstrncpy (vl.type, "if_octets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Packets received / send */
vl.values[0].derive = (derive_t) copy_packets_rx;
vl.values[1].derive = (derive_t) copy_packets_tx;
sstrncpy (vl.type, "if_packets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Values (not) dispatched and (not) send */
sstrncpy (vl.type, "total_values", sizeof (vl.type));
vl.values[0].derive = (derive_t) copy_values_dispatched;
sstrncpy (vl.type_instance, "dispatch-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_dispatched;
sstrncpy (vl.type_instance, "dispatch-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_sent;
sstrncpy (vl.type_instance, "send-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_sent;
sstrncpy (vl.type_instance, "send-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Receive queue length */
vl.values[0].gauge = (gauge_t) copy_receive_list_length;
sstrncpy (vl.type, "queue_length", sizeof (vl.type));
vl.type_instance[0] = 0;
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
return (0);
} /* }}} int network_stats_read */
vl.values = values + i;
sstrncpy (vl.type_instance, type_instances[i],
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
}
} /* void nfs_procedures_submit */
/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2011 Florian octo Forster
+ * Copyright (C) 2005-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
};
typedef struct read_func_s read_func_t;
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+ value_list_t *vl;
+ plugin_ctx_t ctx;
+ write_queue_t *next;
+};
+
/*
* Private variables
*/
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static write_queue_t *write_queue_head;
+static write_queue_t *write_queue_tail;
+static _Bool write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t *write_threads = NULL;
+static size_t write_threads_num = 0;
+
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
/*
* Static functions
*/
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
static const char *plugin_get_dir (void)
{
if (plugindir == NULL)
read_threads_num = 0;
} /* void stop_read_threads */
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+ if (vl == NULL)
+ return;
+
+ meta_data_destroy (vl->meta);
+ sfree (vl->values);
+ sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+ value_list_t *vl;
+
+ if (vl_orig == NULL)
+ return (NULL);
+
+ vl = malloc (sizeof (*vl));
+ if (vl == NULL)
+ return (NULL);
+ memcpy (vl, vl_orig, sizeof (*vl));
+
+ vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+ if (vl->values == NULL)
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+ memcpy (vl->values, vl_orig->values,
+ vl_orig->values_len * sizeof (*vl->values));
+
+ vl->meta = meta_data_clone (vl->meta);
+ if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+
+ if (vl->time == 0)
+ vl->time = cdtime ();
+
+ /* Fill in the interval from the thread context, if it is zero. */
+ if (vl->interval == 0)
+ {
+ plugin_ctx_t ctx = plugin_get_ctx ();
+
+ if (ctx.interval != 0)
+ vl->interval = ctx.interval;
+ else
+ {
+ char name[6 * DATA_MAX_NAME_LEN];
+ FORMAT_VL (name, sizeof (name), vl);
+ ERROR ("plugin_value_list_clone: Unable to determine "
+ "interval from context for "
+ "value list \"%s\". "
+ "This indicates a broken plugin. "
+ "Please report this problem to the "
+ "collectd mailing list or at "
+ "<http://collectd.org/bugs/>.", name);
+ vl->interval = cf_get_default_interval ();
+ }
+ }
+
+ return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+ write_queue_t *q;
+
+ q = malloc (sizeof (*q));
+ if (q == NULL)
+ return (ENOMEM);
+ q->next = NULL;
+
+ q->vl = plugin_value_list_clone (vl);
+ if (q->vl == NULL)
+ {
+ sfree (q);
+ return (ENOMEM);
+ }
+
+ /* Store context of caller (read plugin); otherwise, it would not be
+ * available to the write plugins when actually dispatching the
+ * value-list later on. */
+ q->ctx = plugin_get_ctx ();
+
+ pthread_mutex_lock (&write_lock);
+
+ if (write_queue_tail == NULL)
+ {
+ write_queue_head = q;
+ write_queue_tail = q;
+ }
+ else
+ {
+ write_queue_tail->next = q;
+ write_queue_tail = q;
+ }
+
+ pthread_cond_signal (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+ write_queue_t *q;
+ value_list_t *vl;
+
+ pthread_mutex_lock (&write_lock);
+
+ while (write_loop && (write_queue_head == NULL))
+ pthread_cond_wait (&write_cond, &write_lock);
+
+ if (write_queue_head == NULL)
+ {
+ pthread_mutex_unlock (&write_lock);
+ return (NULL);
+ }
+
+ q = write_queue_head;
+ write_queue_head = q->next;
+ if (write_queue_head == NULL)
+ write_queue_tail = NULL;
+
+ pthread_mutex_unlock (&write_lock);
+
+ (void) plugin_set_ctx (q->ctx);
+
+ vl = q->vl;
+ sfree (q);
+ return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+ while (write_loop)
+ {
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
+ continue;
+
+ plugin_dispatch_values_internal (vl);
+
+ plugin_value_list_free (vl);
+ }
+
+ pthread_exit (NULL);
+ return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+ size_t i;
+
+ if (write_threads != NULL)
+ return;
+
+ write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (write_threads == NULL)
+ {
+ ERROR ("plugin: start_write_threads: calloc failed.");
+ return;
+ }
+
+ write_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ int status;
+
+ status = pthread_create (write_threads + write_threads_num,
+ /* attr = */ NULL,
+ plugin_write_thread,
+ /* arg = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin: start_write_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ write_threads_num++;
+ } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+ write_queue_t *q;
+ int i;
+
+ if (write_threads == NULL)
+ return;
+
+ INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+ pthread_mutex_lock (&write_lock);
+ write_loop = 0;
+ DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+ pthread_cond_broadcast (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ for (i = 0; i < write_threads_num; i++)
+ {
+ if (pthread_join (write_threads[i], NULL) != 0)
+ {
+ ERROR ("plugin: stop_write_threads: pthread_join failed.");
+ }
+ write_threads[i] = (pthread_t) 0;
+ }
+ sfree (write_threads);
+ write_threads_num = 0;
+
+ pthread_mutex_lock (&write_lock);
+ i = 0;
+ for (q = write_queue_head; q != NULL; q = q->next)
+ {
+ plugin_value_list_free (q->vl);
+ sfree (q);
+ i++;
+ }
+ write_queue_head = NULL;
+ write_queue_tail = NULL;
+ pthread_mutex_unlock (&write_lock);
+
+ if (i > 0)
+ {
+ WARNING ("plugin: %i value list%s left after shutting down "
+ "the write threads.",
+ i, (i == 1) ? " was" : "s were");
+ }
+} /* }}} void stop_write_threads */
+
/*
* Public functions
*/
struct dirent *de;
int status;
- DEBUG ("type = %s", type);
-
dir = plugin_get_dir ();
ret = 1;
status = ssnprintf (typename, sizeof (typename), "%s.so", type);
if ((status < 0) || ((size_t) status >= sizeof (typename)))
{
- WARNING ("snprintf: truncated: `%s.so'", type);
+ WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
return (-1);
}
typename_len = strlen (typename);
if ((dh = opendir (dir)) == NULL)
{
char errbuf[1024];
- ERROR ("opendir (%s): %s", dir,
+ ERROR ("plugin_load: opendir (%s) failed: %s", dir,
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
"%s/%s", dir, de->d_name);
if ((status < 0) || ((size_t) status >= sizeof (filename)))
{
- WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
+ WARNING ("plugin_load: Filename too long: \"%s/%s\"",
+ dir, de->d_name);
continue;
}
if (lstat (filename, &statbuf) == -1)
{
char errbuf[1024];
- WARNING ("stat %s: %s", filename,
+ WARNING ("plugin_load: stat (\"%s\") failed: %s",
+ filename,
sstrerror (errno, errbuf, sizeof (errbuf)));
continue;
}
else if (!S_ISREG (statbuf.st_mode))
{
/* don't follow symlinks */
- WARNING ("stat %s: not a regular file", filename);
+ WARNING ("plugin_load: %s is not a regular file.",
+ filename);
continue;
}
- if (plugin_load_file (filename, flags) == 0)
+ status = plugin_load_file (filename, flags);
+ if (status == 0)
{
/* success */
ret = 0;
}
else
{
- fprintf (stderr, "Unable to load plugin %s.\n", type);
+ ERROR ("plugin_load: Load plugin \"%s\" failed with "
+ "status %i.", type, status);
}
}
closedir (dh);
- if (filename[0] == '\0')
- fprintf (stderr, "Could not find plugin %s.\n", type);
+ if (filename[0] == 0)
+ ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
+ type, dir);
return (ret);
}
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ {
+ char const *tmp = global_option_get ("WriteThreads");
+ int num = atoi (tmp);
+
+ if (num < 1)
+ num = 5;
+
+ start_write_threads ((size_t) num);
+ }
if ((list_init == NULL) && (read_heap == NULL))
return;
plugin_set_ctx (old_ctx);
}
+ stop_write_threads ();
+
/* Write plugins which use the `user_data' pointer usually need the
* same data available to the flush callback. If this is the case, set
* the free_function to NULL when registering the flush callback and to
return (0);
} /* int }}} plugin_dispatch_missing */
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
{
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
return (-1);
}
- if (vl->time == 0)
- vl->time = cdtime ();
-
- if (vl->interval <= 0)
- {
- plugin_ctx_t ctx = plugin_get_ctx ();
-
- if (ctx.interval != 0)
- vl->interval = ctx.interval;
- else
- {
- char name[6 * DATA_MAX_NAME_LEN];
- FORMAT_VL (name, sizeof (name), vl);
- ERROR ("plugin_dispatch_values: Unable to determine "
- "interval from context for "
- "value list \"%s\". "
- "This indicates a broken plugin. "
- "Please report this problem to the "
- "collectd mailing list or at "
- "<http://collectd.org/bugs/>.", name);
- vl->interval = cf_get_default_interval ();
- }
- }
+ /* Assured by plugin_value_list_clone(). The time is determined at
+ * _enqueue_ time. */
+ assert (vl->time != 0);
+ assert (vl->interval != 0);
DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
"host = %s; "
}
return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
{
- value_list_t vl_copy;
- int status;
-
- if (vl == NULL)
- return EINVAL;
-
- memcpy (&vl_copy, vl, sizeof (vl_copy));
-
- /* Write callbacks must not change the values and meta pointers, so we can
- * savely skip copying those and make this more efficient. */
- if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
- return (plugin_dispatch_values (&vl_copy));
-
- /* Set pointers to NULL, just to be on the save side. */
- vl_copy.values = NULL;
- vl_copy.meta = NULL;
-
- vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
- if (vl_copy.values == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: malloc failed.");
- return (ENOMEM);
- }
- memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
- if (vl->meta != NULL)
- {
- vl_copy.meta = meta_data_clone (vl->meta);
- if (vl_copy.meta == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
- free (vl_copy.values);
- return (ENOMEM);
- }
- } /* if (vl->meta) */
-
- status = plugin_dispatch_values (&vl_copy);
+ int status;
- meta_data_destroy (vl_copy.meta);
- free (vl_copy.values);
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return (status);
+ }
- return (status);
-} /* int plugin_dispatch_values_secure */
+ return (0);
+}
int plugin_dispatch_notification (const notification_t *notif)
{
* `vl' Value list of the values that have been read by a `read'
* function.
*/
-int plugin_dispatch_values (value_list_t *vl);
-int plugin_dispatch_values_secure (const value_list_t *vl);
+int plugin_dispatch_values (value_list_t const *vl);
int plugin_dispatch_missing (const value_list_t *vl);
int plugin_dispatch_notification (const notification_t *notif);
c_psql_connect (db);
}
- /* "ping" */
- PQclear (PQexec (db->conn, "SELECT 42;"));
-
if (CONNECTION_OK != PQstatus (db->conn)) {
PQreset (db->conn);
if (PGRES_TUPLES_OK != PQresultStatus (res)) {
pthread_mutex_lock (&db->db_lock);
+ if ((CONNECTION_OK != PQstatus (db->conn))
+ && (0 == c_psql_check_connection (db))) {
+ PQclear (res);
+ return c_psql_exec_query (db, q, prep_area);
+ }
+
log_err ("Failed to execute SQL query: %s",
PQerrorMessage (db->conn));
log_info ("SQL query was: %s",
--- /dev/null
+option java_package = "com.aphyr.riemann";
+option java_outer_classname = "Proto";
+
+message State {
+ optional int64 time = 1;
+ optional string state = 2;
+ optional string service = 3;
+ optional string host = 4;
+ optional string description = 5;
+ optional bool once = 6;
+ repeated string tags = 7;
+ optional float ttl = 8;
+}
+
+message Event {
+ optional int64 time = 1;
+ optional string state = 2;
+ optional string service = 3;
+ optional string host = 4;
+ optional string description = 5;
+ repeated string tags = 7;
+ optional float ttl = 8;
+
+ optional sint64 metric_sint64 = 13;
+ optional double metric_d = 14;
+ optional float metric_f = 15;
+}
+
+message Query {
+ optional string string = 1;
+}
+
+message Msg {
+ optional bool ok = 2;
+ optional string error = 3;
+ repeated State states = 4;
+ optional Query query = 5;
+ repeated Event events = 6;
+}
\ No newline at end of file
*/
static void csnmp_oid_init (oid_t *dst, oid const *src, size_t n)
{
- assert (n <= STATIC_ARRAY_LEN (dst->oid));
+ assert (n <= STATIC_ARRAY_SIZE (dst->oid));
memcpy (dst->oid, src, sizeof (*src) * n);
dst->oid_len = n;
}
dns_transfer value:DERIVE:0:U
dns_update value:DERIVE:0:U
dns_zops value:DERIVE:0:U
+duration seconds:GAUGE:0:U
email_check value:GAUGE:0:U
email_count value:GAUGE:0:U
email_size value:GAUGE:0:U
if_octets rx:DERIVE:0:U, tx:DERIVE:0:U
if_packets rx:DERIVE:0:U, tx:DERIVE:0:U
if_rx_errors value:DERIVE:0:U
+if_rx_octets value:DERIVE:0:U
if_tx_errors value:DERIVE:0:U
+if_tx_octets value:DERIVE:0:U
invocations value:DERIVE:0:U
io_octets rx:DERIVE:0:U, tx:DERIVE:0:U
io_packets rx:DERIVE:0:U, tx:DERIVE:0:U
**/
#include "collectd.h"
+
+#include <regex.h>
+
#include "common.h"
#include "utils_vl_lookup.h"
#include "utils_avltree.h"
/*
* Types
*/
+struct part_match_s
+{
+ char str[DATA_MAX_NAME_LEN];
+ regex_t regex;
+ _Bool is_regex;
+};
+typedef struct part_match_s part_match_t;
+
+struct identifier_match_s
+{
+ part_match_t host;
+ part_match_t plugin;
+ part_match_t plugin_instance;
+ part_match_t type;
+ part_match_t type_instance;
+
+ unsigned int group_by;
+};
+typedef struct identifier_match_s identifier_match_t;
+
struct lookup_s
{
c_avl_tree_t *by_type_tree;
struct user_class_s
{
void *user_class;
- identifier_t ident;
+ identifier_match_t match;
user_obj_t *user_obj_list; /* list of user_obj */
};
typedef struct user_class_s user_class_t;
/*
* Private functions
*/
+static _Bool lu_part_matches (part_match_t const *match, /* {{{ */
+ char const *str)
+{
+ if (match->is_regex)
+ {
+ /* Short cut popular catch-all regex. */
+ if (strcmp (".*", match->str) == 0)
+ return (1);
+
+ int status = regexec (&match->regex, str,
+ /* nmatch = */ 0, /* pmatch = */ NULL,
+ /* flags = */ 0);
+ if (status == 0)
+ return (1);
+ else
+ return (0);
+ }
+ else if (strcmp (match->str, str) == 0)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool lu_part_matches */
+
+static int lu_copy_ident_to_match_part (part_match_t *match_part, /* {{{ */
+ char const *ident_part)
+{
+ size_t len = strlen (ident_part);
+ int status;
+
+ if ((len < 3) || (ident_part[0] != '/') || (ident_part[len - 1] != '/'))
+ {
+ sstrncpy (match_part->str, ident_part, sizeof (match_part->str));
+ match_part->is_regex = 0;
+ return (0);
+ }
+
+ /* Copy string without the leading slash. */
+ sstrncpy (match_part->str, ident_part + 1, sizeof (match_part->str));
+ assert (sizeof (match_part->str) > len);
+ /* strip trailing slash */
+ match_part->str[len - 2] = 0;
+
+ status = regcomp (&match_part->regex, match_part->str,
+ /* flags = */ REG_EXTENDED);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ regerror (status, &match_part->regex, errbuf, sizeof (errbuf));
+ ERROR ("utils_vl_lookup: Compiling regular expression \"%s\" failed: %s",
+ match_part->str, errbuf);
+ return (EINVAL);
+ }
+ match_part->is_regex = 1;
+
+ return (0);
+} /* }}} int lu_copy_ident_to_match_part */
+
+static int lu_copy_ident_to_match (identifier_match_t *match, /* {{{ */
+ identifier_t const *ident, unsigned int group_by)
+{
+ memset (match, 0, sizeof (*match));
+
+ match->group_by = group_by;
+
+#define COPY_FIELD(field) do { \
+ int status = lu_copy_ident_to_match_part (&match->field, ident->field); \
+ if (status != 0) \
+ return (status); \
+} while (0)
+
+ COPY_FIELD (host);
+ COPY_FIELD (plugin);
+ COPY_FIELD (plugin_instance);
+ COPY_FIELD (type);
+ COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+ return (0);
+} /* }}} int lu_copy_ident_to_match */
+
static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
data_set_t const *ds, value_list_t const *vl,
user_class_t *user_class)
return (NULL);
}
- sstrncpy (user_obj->ident.host,
- LU_IS_ALL (user_class->ident.host) ? "/all/" : vl->host,
- sizeof (user_obj->ident.host));
- sstrncpy (user_obj->ident.plugin,
- LU_IS_ALL (user_class->ident.plugin) ? "/all/" : vl->plugin,
- sizeof (user_obj->ident.plugin));
- sstrncpy (user_obj->ident.plugin_instance,
- LU_IS_ALL (user_class->ident.plugin_instance) ? "/all/" : vl->plugin_instance,
- sizeof (user_obj->ident.plugin_instance));
- sstrncpy (user_obj->ident.type,
- LU_IS_ALL (user_class->ident.type) ? "/all/" : vl->type,
- sizeof (user_obj->ident.type));
- sstrncpy (user_obj->ident.type_instance,
- LU_IS_ALL (user_class->ident.type_instance) ? "/all/" : vl->type_instance,
- sizeof (user_obj->ident.type_instance));
+#define COPY_FIELD(field, group_mask) do { \
+ if (user_class->match.field.is_regex \
+ && ((user_class->match.group_by & group_mask) == 0)) \
+ sstrncpy (user_obj->ident.field, "/.*/", sizeof (user_obj->ident.field)); \
+ else \
+ sstrncpy (user_obj->ident.field, vl->field, sizeof (user_obj->ident.field)); \
+} while (0)
+
+ COPY_FIELD (host, LU_GROUP_BY_HOST);
+ COPY_FIELD (plugin, LU_GROUP_BY_PLUGIN);
+ COPY_FIELD (plugin_instance, LU_GROUP_BY_PLUGIN_INSTANCE);
+ COPY_FIELD (type, 0);
+ COPY_FIELD (type_instance, LU_GROUP_BY_TYPE_INSTANCE);
+
+#undef COPY_FIELD
if (user_class->user_obj_list == NULL)
{
ptr != NULL;
ptr = ptr->next)
{
- if (!LU_IS_ALL (ptr->ident.host)
- && (strcmp (ptr->ident.host, vl->host) != 0))
+ if (user_class->match.host.is_regex
+ && (user_class->match.group_by & LU_GROUP_BY_HOST)
+ && (strcmp (vl->host, ptr->ident.host) != 0))
+ continue;
+ if (user_class->match.plugin.is_regex
+ && (user_class->match.group_by & LU_GROUP_BY_PLUGIN)
+ && (strcmp (vl->plugin, ptr->ident.plugin) != 0))
continue;
- if (!LU_IS_ALL (ptr->ident.plugin_instance)
- && (strcmp (ptr->ident.plugin_instance, vl->plugin_instance) != 0))
+ if (user_class->match.plugin_instance.is_regex
+ && (user_class->match.group_by & LU_GROUP_BY_PLUGIN_INSTANCE)
+ && (strcmp (vl->plugin_instance, ptr->ident.plugin_instance) != 0))
continue;
- if (!LU_IS_ALL (ptr->ident.type_instance)
- && (strcmp (ptr->ident.type_instance, vl->type_instance) != 0))
+ if (user_class->match.type_instance.is_regex
+ && (user_class->match.group_by & LU_GROUP_BY_TYPE_INSTANCE)
+ && (strcmp (vl->type_instance, ptr->ident.type_instance) != 0))
continue;
return (ptr);
user_obj_t *user_obj;
int status;
- assert (strcmp (vl->type, user_class->ident.type) == 0);
- assert (LU_IS_WILDCARD (user_class->ident.plugin)
- || (strcmp (vl->plugin, user_class->ident.plugin) == 0));
+ assert (strcmp (vl->type, user_class->match.type.str) == 0);
+ assert (user_class->match.plugin.is_regex
+ || (strcmp (vl->plugin, user_class->match.plugin.str)) == 0);
- /* When we get here, type and plugin already match the user class. Now check
- * the rest of the fields. */
- if (!LU_IS_WILDCARD (user_class->ident.type_instance)
- && (strcmp (vl->type_instance, user_class->ident.type_instance) != 0))
- return (1);
- if (!LU_IS_WILDCARD (user_class->ident.plugin_instance)
- && (strcmp (vl->plugin_instance,
- user_class->ident.plugin_instance) != 0))
- return (1);
- if (!LU_IS_WILDCARD (user_class->ident.host)
- && (strcmp (vl->host, user_class->ident.host) != 0))
+ if (!lu_part_matches (&user_class->match.type_instance, vl->type_instance)
+ || !lu_part_matches (&user_class->match.plugin_instance, vl->plugin_instance)
+ || !lu_part_matches (&user_class->match.plugin, vl->plugin)
+ || !lu_part_matches (&user_class->match.host, vl->host))
return (1);
user_obj = lu_find_user_obj (user_class, vl);
} /* }}} by_type_entry_t *lu_search_by_type */
static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
- identifier_t const *ident, user_class_list_t *user_class_list)
+ user_class_list_t *user_class_list)
{
user_class_list_t *ptr = NULL;
+ identifier_match_t const *match = &user_class_list->entry.match;
/* Lookup user_class_list from the per-plugin structure. If this is the first
* user_class to be added, the blocks return immediately. Otherwise they will
* set "ptr" to non-NULL. */
- if (LU_IS_WILDCARD (ident->plugin))
+ if (match->plugin.is_regex)
{
if (by_type->wildcard_plugin_list == NULL)
{
int status;
status = c_avl_get (by_type->by_plugin_tree,
- ident->plugin, (void *) &ptr);
+ match->plugin.str, (void *) &ptr);
if (status != 0) /* plugin not yet in tree */
{
- char *plugin_copy = strdup (ident->plugin);
+ char *plugin_copy = strdup (match->plugin.str);
if (plugin_copy == NULL)
{
} /* }}} void lookup_destroy */
int lookup_add (lookup_t *obj, /* {{{ */
- identifier_t const *ident, void *user_class)
+ identifier_t const *ident, unsigned int group_by, void *user_class)
{
by_type_entry_t *by_type = NULL;
user_class_list_t *user_class_obj;
}
memset (user_class_obj, 0, sizeof (*user_class_obj));
user_class_obj->entry.user_class = user_class;
- memmove (&user_class_obj->entry.ident, ident, sizeof (*ident));
+ lu_copy_ident_to_match (&user_class_obj->entry.match, ident, group_by);
user_class_obj->entry.user_obj_list = NULL;
user_class_obj->next = NULL;
- return (lu_add_by_plugin (by_type, ident, user_class_obj));
+ return (lu_add_by_plugin (by_type, user_class_obj));
} /* }}} int lookup_add */
/* returns the number of successful calls to the callback function */
};
typedef struct identifier_s identifier_t;
-#define LU_ANY "/any/"
-#define LU_ALL "/all/"
-
-#define LU_IS_ANY(str) (strcmp (str, LU_ANY) == 0)
-#define LU_IS_ALL(str) (strcmp (str, LU_ALL) == 0)
-#define LU_IS_WILDCARD(str) (LU_IS_ANY(str) || LU_IS_ALL(str))
+#define LU_GROUP_BY_HOST 0x01
+#define LU_GROUP_BY_PLUGIN 0x02
+#define LU_GROUP_BY_PLUGIN_INSTANCE 0x04
+/* #define LU_GROUP_BY_TYPE 0x00 */
+#define LU_GROUP_BY_TYPE_INSTANCE 0x10
/*
* Functions
void lookup_destroy (lookup_t *obj);
int lookup_add (lookup_t *obj,
- identifier_t const *ident, void *user_class);
+ identifier_t const *ident, unsigned int group_by, void *user_class);
/* TODO(octo): Pass lookup_obj_callback_t to lookup_search()? */
int lookup_search (lookup_t *obj,
static void checked_lookup_add (lookup_t *obj, /* {{{ */
char const *host,
char const *plugin, char const *plugin_instance,
- char const *type, char const *type_instance)
+ char const *type, char const *type_instance,
+ unsigned int group_by)
{
identifier_t ident;
void *user_class;
user_class = malloc (sizeof (ident));
memmove (user_class, &ident, sizeof (ident));
- status = lookup_add (obj, &ident, user_class);
+ status = lookup_add (obj, &ident, group_by, user_class);
assert (status == 0);
} /* }}} void test_add */
{
lookup_t *obj = checked_lookup_create ();
- checked_lookup_add (obj, "/any/", "test", "", "test", "/all/");
+ checked_lookup_add (obj, "/.*/", "test", "", "test", "/.*/", LU_GROUP_BY_HOST);
checked_lookup_search (obj, "host0", "test", "", "test", "0",
/* expect new = */ 1);
checked_lookup_search (obj, "host0", "test", "", "test", "1",
{
lookup_t *obj = checked_lookup_create ();
- checked_lookup_add (obj, "/any/", "/all/", "/all/", "test", "/all/");
+ checked_lookup_add (obj, "/.*/", "/.*/", "/.*/", "test", "/.*/", LU_GROUP_BY_HOST);
checked_lookup_search (obj, "host0", "plugin0", "", "test", "0",
/* expect new = */ 1);
checked_lookup_search (obj, "host0", "plugin0", "", "test", "1",
lookup_t *obj = checked_lookup_create ();
int status;
- checked_lookup_add (obj, "/any/", "plugin0", "", "test", "/all/");
- checked_lookup_add (obj, "/any/", "/all/", "", "test", "ti0");
+ checked_lookup_add (obj, "/.*/", "plugin0", "", "test", "/.*/", LU_GROUP_BY_HOST);
+ checked_lookup_add (obj, "/.*/", "/.*/", "", "test", "ti0", LU_GROUP_BY_HOST);
status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "",
/* expect new = */ 0);
lookup_destroy (obj);
}
+static void testcase3 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+
+ checked_lookup_add (obj, "/^db[0-9]\\./", "cpu", "/.*/", "cpu", "/.*/",
+ LU_GROUP_BY_TYPE_INSTANCE);
+ checked_lookup_search (obj, "db0.example.com", "cpu", "0", "cpu", "user",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "db0.example.com", "cpu", "0", "cpu", "idle",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "db0.example.com", "cpu", "1", "cpu", "user",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "db0.example.com", "cpu", "1", "cpu", "idle",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "app0.example.com", "cpu", "0", "cpu", "user",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "app0.example.com", "cpu", "0", "cpu", "idle",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "db1.example.com", "cpu", "0", "cpu", "user",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "db1.example.com", "cpu", "0", "cpu", "idle",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "db1.example.com", "cpu", "0", "cpu", "system",
+ /* expect new = */ 1);
+
+ lookup_destroy (obj);
+}
+
int main (int argc, char **argv) /* {{{ */
{
testcase0 ();
testcase1 ();
testcase2 ();
+ testcase3 ();
return (EXIT_SUCCESS);
} /* }}} int main */
{
int sock_fd;
+ char *name;
+
char *node;
char *service;
char *prefix;
close(cb->sock_fd);
cb->sock_fd = -1;
+ sfree(cb->name);
sfree(cb->node);
sfree(cb->service);
sfree(cb->prefix);
return (0);
}
-static int wg_config_carbon (oconfig_item_t *ci)
+static int wg_config_node (oconfig_item_t *ci)
{
struct wg_callback *cb;
user_data_t user_data;
}
memset (cb, 0, sizeof (*cb));
cb->sock_fd = -1;
+ cb->name = NULL;
cb->node = NULL;
cb->service = NULL;
cb->prefix = NULL;
cb->escape_char = WG_DEFAULT_ESCAPE;
cb->format_flags = GRAPHITE_STORE_RATES;
+ /* FIXME: Legacy configuration syntax. */
+ if (strcasecmp ("Carbon", ci->key) != 0)
+ {
+ int status = cf_util_get_string (ci, &cb->name);
+ if (status != 0)
+ {
+ wg_callback_free (cb);
+ return (status);
+ }
+ }
+
pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
C_COMPLAIN_INIT (&cb->init_complaint);
}
}
- ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s",
- cb->node != NULL ? cb->node : WG_DEFAULT_NODE,
- cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE);
+ /* FIXME: Legacy configuration syntax. */
+ if (cb->name == NULL)
+ ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s",
+ cb->node != NULL ? cb->node : WG_DEFAULT_NODE,
+ cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE);
+ else
+ ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s",
+ cb->name);
memset (&user_data, 0, sizeof (user_data));
user_data.data = cb;
{
oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Carbon", child->key) == 0)
- wg_config_carbon (child);
+ if (strcasecmp ("Node", child->key) == 0)
+ wg_config_node (child);
+ /* FIXME: Remove this legacy mode in version 6. */
+ else if (strcasecmp ("Carbon", child->key) == 0)
+ wg_config_node (child);
else
{
ERROR ("write_graphite plugin: Invalid configuration "
--- /dev/null
+/**
+ * collectd - src/write_riemann.c
+ *
+ * Copyright (C) 2012,2013 Pierre-Yves Ritschard
+ * Copyright (C) 2013 Florian octo Forster
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
+ * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
+ * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *
+ * Authors:
+ * Pierre-Yves Ritschard <pyr at spootnik.org>
+ * Florian octo Forster <octo at collectd.org>
+ */
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "utils_cache.h"
+#include "riemann.pb-c.h"
+
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <netdb.h>
+#include <inttypes.h>
+#include <pthread.h>
+
+#define RIEMANN_HOST "localhost"
+#define RIEMANN_PORT "5555"
+
+struct riemann_host {
+ char *name;
+#define F_CONNECT 0x01
+ uint8_t flags;
+ pthread_mutex_t lock;
+ _Bool store_rates;
+ _Bool always_append_ds;
+ char *node;
+ char *service;
+ _Bool use_tcp;
+ int s;
+
+ int reference_count;
+};
+
+static char **riemann_tags;
+static size_t riemann_tags_num;
+
+static int riemann_send(struct riemann_host *, Msg const *);
+static int riemann_notification(const notification_t *, user_data_t *);
+static int riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
+static int riemann_connect(struct riemann_host *);
+static int riemann_disconnect (struct riemann_host *host);
+static void riemann_free(void *);
+static int riemann_config_node(oconfig_item_t *);
+static int riemann_config(oconfig_item_t *);
+void module_register(void);
+
+static void riemann_event_protobuf_free (Event *event) /* {{{ */
+{
+ if (event == NULL)
+ return;
+
+ sfree (event->state);
+ sfree (event->service);
+ sfree (event->host);
+ sfree (event->description);
+
+ strarray_free (event->tags, event->n_tags);
+ event->tags = NULL;
+ event->n_tags = 0;
+
+ sfree (event);
+} /* }}} void riemann_event_protobuf_free */
+
+static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
+{
+ size_t i;
+
+ if (msg == NULL)
+ return;
+
+ for (i = 0; i < msg->n_events; i++)
+ {
+ riemann_event_protobuf_free (msg->events[i]);
+ msg->events[i] = NULL;
+ }
+
+ sfree (msg->events);
+ msg->n_events = 0;
+
+ sfree (msg);
+} /* }}} void riemann_msg_protobuf_free */
+
+static int
+riemann_send(struct riemann_host *host, Msg const *msg)
+{
+ u_char *buffer;
+ size_t buffer_len;
+ int status;
+
+ pthread_mutex_lock (&host->lock);
+
+ status = riemann_connect (host);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&host->lock);
+ return status;
+ }
+
+ buffer_len = msg__get_packed_size(msg);
+ if (host->use_tcp)
+ buffer_len += 4;
+
+ buffer = malloc (buffer_len);
+ if (buffer == NULL) {
+ pthread_mutex_unlock (&host->lock);
+ ERROR ("write_riemann plugin: malloc failed.");
+ return ENOMEM;
+ }
+ memset (buffer, 0, buffer_len);
+
+ if (host->use_tcp)
+ {
+ uint32_t length = htonl ((uint32_t) (buffer_len - 4));
+ memcpy (buffer, &length, 4);
+ msg__pack(msg, buffer + 4);
+ }
+ else
+ {
+ msg__pack(msg, buffer);
+ }
+
+ status = (int) swrite (host->s, buffer, buffer_len);
+ if (status != 0)
+ {
+ char errbuf[1024];
+
+ riemann_disconnect (host);
+ pthread_mutex_unlock (&host->lock);
+
+ ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
+ (host->node != NULL) ? host->node : RIEMANN_HOST,
+ (host->service != NULL) ? host->service : RIEMANN_PORT,
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ sfree (buffer);
+ return -1;
+ }
+
+ pthread_mutex_unlock (&host->lock);
+ sfree (buffer);
+ return 0;
+}
+
+static int riemann_event_add_tag (Event *event, /* {{{ */
+ char const *format, ...)
+{
+ va_list ap;
+ char buffer[1024];
+ size_t ret;
+
+ va_start (ap, format);
+ ret = vsnprintf (buffer, sizeof (buffer), format, ap);
+ if (ret >= sizeof (buffer))
+ ret = sizeof (buffer) - 1;
+ buffer[ret] = 0;
+ va_end (ap);
+
+ return (strarray_add (&event->tags, &event->n_tags, buffer));
+} /* }}} int riemann_event_add_tag */
+
+static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
+ notification_t const *n)
+{
+ Msg *msg;
+ Event *event;
+ char service_buffer[6 * DATA_MAX_NAME_LEN];
+ char const *severity;
+ notification_meta_t *meta;
+ int i;
+
+ msg = malloc (sizeof (*msg));
+ if (msg == NULL)
+ {
+ ERROR ("write_riemann plugin: malloc failed.");
+ return (NULL);
+ }
+ memset (msg, 0, sizeof (*msg));
+ msg__init (msg);
+
+ msg->events = malloc (sizeof (*msg->events));
+ if (msg->events == NULL)
+ {
+ ERROR ("write_riemann plugin: malloc failed.");
+ sfree (msg);
+ return (NULL);
+ }
+
+ event = malloc (sizeof (*event));
+ if (event == NULL)
+ {
+ ERROR ("write_riemann plugin: malloc failed.");
+ sfree (msg->events);
+ sfree (msg);
+ return (NULL);
+ }
+ memset (event, 0, sizeof (*event));
+ event__init (event);
+
+ msg->events[0] = event;
+ msg->n_events = 1;
+
+ event->host = strdup (n->host);
+ event->time = CDTIME_T_TO_TIME_T (n->time);
+ event->has_time = 1;
+
+ switch (n->severity)
+ {
+ case NOTIF_OKAY: severity = "okay"; break;
+ case NOTIF_WARNING: severity = "warning"; break;
+ case NOTIF_FAILURE: severity = "failure"; break;
+ default: severity = "unknown";
+ }
+ event->state = strdup (severity);
+
+ riemann_event_add_tag (event, "notification");
+ if (n->plugin[0] != 0)
+ riemann_event_add_tag (event, "plugin:%s", n->plugin);
+ if (n->plugin_instance[0] != 0)
+ riemann_event_add_tag (event, "plugin_instance:%s",
+ n->plugin_instance);
+
+ if (n->type[0] != 0)
+ riemann_event_add_tag (event, "type:%s", n->type);
+ if (n->type_instance[0] != 0)
+ riemann_event_add_tag (event, "type_instance:%s",
+ n->type_instance);
+
+ for (i = 0; i < riemann_tags_num; i++)
+ riemann_event_add_tag (event, "%s", riemann_tags[i]);
+
+ format_name (service_buffer, sizeof (service_buffer),
+ /* host = */ "", n->plugin, n->plugin_instance,
+ n->type, n->type_instance);
+ event->service = strdup (&service_buffer[1]);
+
+ /* Pull in values from threshold */
+ for (meta = n->meta; meta != NULL; meta = meta->next)
+ {
+ if (strcasecmp ("CurrentValue", meta->name) != 0)
+ continue;
+
+ event->metric_d = meta->nm_value.nm_double;
+ event->has_metric_d = 1;
+ break;
+ }
+
+ DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
+ "host = \"%s\", service = \"%s\", state = \"%s\"",
+ event->host, event->service, event->state);
+ return (msg);
+} /* }}} Msg *riemann_notification_to_protobuf */
+
+static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl, size_t index,
+ gauge_t const *rates)
+{
+ Event *event;
+ char name_buffer[5 * DATA_MAX_NAME_LEN];
+ char service_buffer[6 * DATA_MAX_NAME_LEN];
+ int i;
+
+ event = malloc (sizeof (*event));
+ if (event == NULL)
+ {
+ ERROR ("write_riemann plugin: malloc failed.");
+ return (NULL);
+ }
+ memset (event, 0, sizeof (*event));
+ event__init (event);
+
+ event->host = strdup (vl->host);
+ event->time = CDTIME_T_TO_TIME_T (vl->time);
+ event->has_time = 1;
+ event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval);
+ event->has_ttl = 1;
+
+ riemann_event_add_tag (event, "plugin:%s", vl->plugin);
+ if (vl->plugin_instance[0] != 0)
+ riemann_event_add_tag (event, "plugin_instance:%s",
+ vl->plugin_instance);
+
+ riemann_event_add_tag (event, "type:%s", vl->type);
+ if (vl->type_instance[0] != 0)
+ riemann_event_add_tag (event, "type_instance:%s",
+ vl->type_instance);
+
+ if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
+ {
+ riemann_event_add_tag (event, "ds_type:%s:rate",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
+ }
+ else
+ {
+ riemann_event_add_tag (event, "ds_type:%s",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
+ }
+ riemann_event_add_tag (event, "ds_name:%s", ds->ds[index].name);
+ riemann_event_add_tag (event, "ds_index:%zu", index);
+
+ for (i = 0; i < riemann_tags_num; i++)
+ riemann_event_add_tag (event, "%s", riemann_tags[i]);
+
+ if (ds->ds[index].type == DS_TYPE_GAUGE)
+ {
+ event->has_metric_d = 1;
+ event->metric_d = (double) vl->values[index].gauge;
+ }
+ else if (rates != NULL)
+ {
+ event->has_metric_d = 1;
+ event->metric_d = (double) rates[index];
+ }
+ else
+ {
+ event->has_metric_sint64 = 1;
+ if (ds->ds[index].type == DS_TYPE_DERIVE)
+ event->metric_sint64 = (int64_t) vl->values[index].derive;
+ else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
+ event->metric_sint64 = (int64_t) vl->values[index].absolute;
+ else
+ event->metric_sint64 = (int64_t) vl->values[index].counter;
+ }
+
+ format_name (name_buffer, sizeof (name_buffer),
+ /* host = */ "", vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+ if (host->always_append_ds || (ds->ds_num > 1))
+ ssnprintf (service_buffer, sizeof (service_buffer),
+ "%s/%s", &name_buffer[1], ds->ds[index].name);
+ else
+ sstrncpy (service_buffer, &name_buffer[1],
+ sizeof (service_buffer));
+
+ event->service = strdup (service_buffer);
+
+ DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
+ "host = \"%s\", service = \"%s\"",
+ event->host, event->service);
+ return (event);
+} /* }}} Event *riemann_value_to_protobuf */
+
+static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl)
+{
+ Msg *msg;
+ size_t i;
+ gauge_t *rates = NULL;
+
+ /* Initialize the Msg structure. */
+ msg = malloc (sizeof (*msg));
+ if (msg == NULL)
+ {
+ ERROR ("write_riemann plugin: malloc failed.");
+ return (NULL);
+ }
+ memset (msg, 0, sizeof (*msg));
+ msg__init (msg);
+
+ /* Set up events. First, the list of pointers. */
+ msg->n_events = (size_t) vl->values_len;
+ msg->events = calloc (msg->n_events, sizeof (*msg->events));
+ if (msg->events == NULL)
+ {
+ ERROR ("write_riemann plugin: calloc failed.");
+ riemann_msg_protobuf_free (msg);
+ return (NULL);
+ }
+
+ if (host->store_rates)
+ {
+ rates = uc_get_rate (ds, vl);
+ if (rates == NULL)
+ {
+ ERROR ("write_riemann plugin: uc_get_rate failed.");
+ riemann_msg_protobuf_free (msg);
+ return (NULL);
+ }
+ }
+
+ for (i = 0; i < msg->n_events; i++)
+ {
+ msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
+ (int) i, rates);
+ if (msg->events[i] == NULL)
+ {
+ riemann_msg_protobuf_free (msg);
+ sfree (rates);
+ return (NULL);
+ }
+ }
+
+ sfree (rates);
+ return (msg);
+} /* }}} Msg *riemann_value_list_to_protobuf */
+
+static int
+riemann_notification(const notification_t *n, user_data_t *ud)
+{
+ int status;
+ struct riemann_host *host = ud->data;
+ Msg *msg;
+
+ msg = riemann_notification_to_protobuf (host, n);
+ if (msg == NULL)
+ return (-1);
+
+ status = riemann_send (host, msg);
+ if (status != 0)
+ ERROR ("write_riemann plugin: riemann_send failed with status %i",
+ status);
+
+ riemann_msg_protobuf_free (msg);
+ return (status);
+} /* }}} int riemann_notification */
+
+static int
+riemann_write(const data_set_t *ds,
+ const value_list_t *vl,
+ user_data_t *ud)
+{
+ int status;
+ struct riemann_host *host = ud->data;
+ Msg *msg;
+
+ msg = riemann_value_list_to_protobuf (host, ds, vl);
+ if (msg == NULL)
+ return (-1);
+
+ status = riemann_send (host, msg);
+ if (status != 0)
+ ERROR ("write_riemann plugin: riemann_send failed with status %i",
+ status);
+
+ riemann_msg_protobuf_free (msg);
+ return status;
+}
+
+/* host->lock must be held when calling this function. */
+static int
+riemann_connect(struct riemann_host *host)
+{
+ int e;
+ struct addrinfo *ai, *res, hints;
+ char const *node;
+ char const *service;
+
+ if (host->flags & F_CONNECT)
+ return 0;
+
+ memset(&hints, 0, sizeof(hints));
+ memset(&service, 0, sizeof(service));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
+#ifdef AI_ADDRCONFIG
+ hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+
+ node = (host->node != NULL) ? host->node : RIEMANN_HOST;
+ service = (host->service != NULL) ? host->service : RIEMANN_PORT;
+
+ if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
+ ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
+ node, gai_strerror(e));
+ return -1;
+ }
+
+ host->s = -1;
+ for (ai = res; ai != NULL; ai = ai->ai_next) {
+ if ((host->s = socket(ai->ai_family,
+ ai->ai_socktype,
+ ai->ai_protocol)) == -1) {
+ continue;
+ }
+
+ if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
+ close(host->s);
+ host->s = -1;
+ continue;
+ }
+
+ host->flags |= F_CONNECT;
+ DEBUG("write_riemann plugin: got a succesful connection for: %s:%s",
+ node, service);
+ break;
+ }
+
+ freeaddrinfo(res);
+
+ if (host->s < 0) {
+ WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
+ node, service);
+ return -1;
+ }
+ return 0;
+}
+
+/* host->lock must be held when calling this function. */
+static int
+riemann_disconnect (struct riemann_host *host)
+{
+ if ((host->flags & F_CONNECT) == 0)
+ return (0);
+
+ close (host->s);
+ host->s = -1;
+ host->flags &= ~F_CONNECT;
+
+ return (0);
+}
+
+static void
+riemann_free(void *p)
+{
+ struct riemann_host *host = p;
+
+ if (host == NULL)
+ return;
+
+ pthread_mutex_lock (&host->lock);
+
+ host->reference_count--;
+ if (host->reference_count > 0)
+ {
+ pthread_mutex_unlock (&host->lock);
+ return;
+ }
+
+ riemann_disconnect (host);
+
+ sfree(host->service);
+ pthread_mutex_destroy (&host->lock);
+ sfree(host);
+}
+
+static int
+riemann_config_node(oconfig_item_t *ci)
+{
+ struct riemann_host *host = NULL;
+ int status = 0;
+ int i;
+ oconfig_item_t *child;
+ char callback_name[DATA_MAX_NAME_LEN];
+ user_data_t ud;
+
+ if ((host = calloc(1, sizeof (*host))) == NULL) {
+ ERROR ("write_riemann plugin: calloc failed.");
+ return ENOMEM;
+ }
+ pthread_mutex_init (&host->lock, NULL);
+ host->reference_count = 1;
+ host->node = NULL;
+ host->service = NULL;
+ host->store_rates = 1;
+ host->always_append_ds = 0;
+ host->use_tcp = 0;
+
+ status = cf_util_get_string (ci, &host->name);
+ if (status != 0) {
+ WARNING("write_riemann plugin: Required host name is missing.");
+ riemann_free (host);
+ return -1;
+ }
+
+ for (i = 0; i < ci->children_num; i++) {
+ /*
+ * The code here could be simplified but makes room
+ * for easy adding of new options later on.
+ */
+ child = &ci->children[i];
+ status = 0;
+
+ if (strcasecmp ("Host", child->key) == 0) {
+ status = cf_util_get_string (child, &host->node);
+ if (status != 0)
+ break;
+ } else if (strcasecmp ("Port", child->key) == 0) {
+ status = cf_util_get_service (child, &host->service);
+ if (status != 0) {
+ ERROR ("write_riemann plugin: Invalid argument "
+ "configured for the \"Port\" "
+ "option.");
+ break;
+ }
+ } else if (strcasecmp ("Protocol", child->key) == 0) {
+ char tmp[16];
+ status = cf_util_get_string_buffer (child,
+ tmp, sizeof (tmp));
+ if (status != 0)
+ {
+ ERROR ("write_riemann plugin: cf_util_get_"
+ "string_buffer failed with "
+ "status %i.", status);
+ break;
+ }
+
+ if (strcasecmp ("UDP", tmp) == 0)
+ host->use_tcp = 0;
+ else if (strcasecmp ("TCP", tmp) == 0)
+ host->use_tcp = 1;
+ else
+ WARNING ("write_riemann plugin: The value "
+ "\"%s\" is not valid for the "
+ "\"Protocol\" option. Use "
+ "either \"UDP\" or \"TCP\".",
+ tmp);
+ } else if (strcasecmp ("StoreRates", child->key) == 0) {
+ status = cf_util_get_boolean (child, &host->store_rates);
+ if (status != 0)
+ break;
+ } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) {
+ status = cf_util_get_boolean (child,
+ &host->always_append_ds);
+ if (status != 0)
+ break;
+ } else {
+ WARNING("write_riemann plugin: ignoring unknown config "
+ "option: \"%s\"", child->key);
+ }
+ }
+ if (status != 0) {
+ riemann_free (host);
+ return status;
+ }
+
+ ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s",
+ host->name);
+ ud.data = host;
+ ud.free_func = riemann_free;
+
+ pthread_mutex_lock (&host->lock);
+
+ status = plugin_register_write (callback_name, riemann_write, &ud);
+ if (status != 0)
+ WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
+ "failed with status %i.",
+ callback_name, status);
+ else /* success */
+ host->reference_count++;
+
+ status = plugin_register_notification (callback_name,
+ riemann_notification, &ud);
+ if (status != 0)
+ WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") "
+ "failed with status %i.",
+ callback_name, status);
+ else /* success */
+ host->reference_count++;
+
+ if (host->reference_count <= 1)
+ {
+ /* Both callbacks failed => free memory.
+ * We need to unlock here, because riemann_free() will lock.
+ * This is not a race condition, because we're the only one
+ * holding a reference. */
+ pthread_mutex_unlock (&host->lock);
+ riemann_free (host);
+ return (-1);
+ }
+
+ host->reference_count--;
+ pthread_mutex_unlock (&host->lock);
+
+ return status;
+}
+
+static int
+riemann_config(oconfig_item_t *ci)
+{
+ int i;
+ oconfig_item_t *child;
+ int status;
+
+ for (i = 0; i < ci->children_num; i++) {
+ child = &ci->children[i];
+
+ if (strcasecmp("Node", child->key) == 0) {
+ riemann_config_node (child);
+ } else if (strcasecmp(child->key, "tag") == 0) {
+ char *tmp = NULL;
+ status = cf_util_get_string(child, &tmp);
+ if (status != 0)
+ continue;
+
+ strarray_add (&riemann_tags, &riemann_tags_num, tmp);
+ DEBUG("write_riemann plugin: Got tag: %s", tmp);
+ sfree (tmp);
+ } else {
+ WARNING ("write_riemann plugin: Ignoring unknown "
+ "configuration option \"%s\" at top level.",
+ child->key);
+ }
+ }
+ return (0);
+}
+
+void
+module_register(void)
+{
+ plugin_register_complex_config ("write_riemann", riemann_config);
+}
+
+/* vim: set sw=8 sts=8 ts=8 noet : */