From: Florian Forster Date: Wed, 17 Nov 2010 14:18:03 +0000 (+0100) Subject: Merge branch 'ff/highres' X-Git-Tag: collectd-5.0.0-beta0~19 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=83077c18c3e78739c2d2d18debf99875944eaa72;hp=e94e1ca83a8ac255f045259ec898f944f9118430;p=collectd.git Merge branch 'ff/highres' Conflicts: src/netapp.c --- diff --git a/AUTHORS b/AUTHORS index e83c2f84..c57f90b2 100644 --- a/AUTHORS +++ b/AUTHORS @@ -34,6 +34,10 @@ Anthony Gialluca Antony Dovgal - memcached plugin. +Aurélien Reynaud + - LPAR plugin. + - Various fixes for AIX, HP-UX and Solaris. + Bruno Prémont - BIND plugin. - Many bugreports and -fixes in various plugins, @@ -162,6 +166,9 @@ Rodolphe Quiédeville Scott Garrett - tape plugin. +Sebastien Pahl + - AMQP plugin. + Simon Kuhnle - OpenBSD code for the cpu and memory plugins. diff --git a/README b/README index 0c7a4221..2ed8934f 100644 --- a/README +++ b/README @@ -125,6 +125,10 @@ Features - load System load average over the last 1, 5 and 15 minutes. + - lpar + Detailed CPU statistics of the “Logical Partitions” virtualization + technique built into IBM's POWER processors. + - libvirt CPU, disk and network I/O statistics from virtual machines. @@ -315,6 +319,10 @@ Features * Output can be written or sent to various destinations by the following plugins: + - amqp + Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) + server, such as RabbitMQ. + - csv Write to comma separated values (CSV) files. This needs lots of diskspace but is extremely portable and can be analysed with almost @@ -597,6 +605,9 @@ Prerequisites Used to capture packets by the `dns' plugin. + * libperfstat (optional) + Used by various plugins to gather statistics under AIX. + * libperl (optional) Obviously used by the `perl' plugin. The library has to be compiled with ithread support (introduced in Perl 5.6.0). @@ -615,6 +626,10 @@ Prerequisites Used by the `python' plugin. Currently, only 2.3 ≦ Python < 3 is supported. + * librabbitmq (optional; also called “rabbitmq-c”) + Used by the AMQP plugin for AMQP connections, for example to RabbitMQ. + + * librouteros (optional) Used by the `routeros' plugin to connect to a device running `RouterOS'. diff --git a/configure.in b/configure.in index 48942b48..fd6a2570 100644 --- a/configure.in +++ b/configure.in @@ -91,6 +91,10 @@ if test "x$ac_system" = "xSolaris" then AC_DEFINE(_POSIX_PTHREAD_SEMANTICS, 1, [Define to enforce POSIX thread semantics under Solaris.]) fi +if test "x$ac_system" = "xAIX" +then + AC_DEFINE(_THREAD_SAFE_ERRNO, 1, [Define to use the thread-safe version of errno under AIX.]) +fi # Where to install .pc files. pkgconfigdir="${libdir}/pkgconfig" @@ -1204,6 +1208,12 @@ fi if test "x$with_perfstat" = "xyes" then AC_DEFINE(HAVE_PERFSTAT, 1, [Define to 1 if you have the 'perfstat' library (-lperfstat)]) + # struct members pertaining to donation have been added to libperfstat somewhere between AIX5.3ML5 and AIX5.3ML9 + AC_CHECK_MEMBER([perfstat_partition_type_t.b.donate_enabled], [], [], [[#include @], [Path to librabbitmq.])], +[ + if test "x$withval" != "xno" && test "x$withval" != "xyes" + then + with_librabbitmq_cppflags="-I$withval/include" + with_librabbitmq_ldflags="-L$withval/lib" + with_librabbitmq="yes" + else + with_librabbitmq="$withval" + fi +], +[ + with_librabbitmq="yes" +]) +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + + AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags" + + AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags" + BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags" + BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq" + AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS) + AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.]) +fi +AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") +# }}} + # --with-librouteros {{{ AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])], [ @@ -4473,6 +4534,7 @@ AC_ARG_ENABLE([all-plugins], m4_divert_once([HELP_ENABLE], []) +AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple's hardware sensors]) @@ -4507,6 +4569,7 @@ AC_PLUGIN([java], [$with_java], [Embed the Java Virtual Machine]) AC_PLUGIN([libvirt], [$plugin_libvirt], [Virtual machine statistics]) AC_PLUGIN([load], [$plugin_load], [System load]) AC_PLUGIN([logfile], [yes], [File logging plugin]) +AC_PLUGIN([lpar], [$with_perfstat], [AIX logical partitions statistics]) AC_PLUGIN([madwifi], [$have_linux_wireless_h], [Madwifi wireless statistics]) AC_PLUGIN([match_empty_counter], [yes], [The empty counter match]) AC_PLUGIN([match_hashed], [yes], [The hashed match]) @@ -4772,6 +4835,7 @@ Configuration: libperl . . . . . . . $with_libperl libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread + librabbitmq . . . . . $with_librabbitmq librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors @@ -4796,6 +4860,7 @@ Configuration: perl . . . . . . . . $with_perl_bindings Modules: + amqp . . . . . . . $enable_amqp apache . . . . . . . $enable_apache apcups . . . . . . . $enable_apcups apple_sensors . . . . $enable_apple_sensors @@ -4830,6 +4895,7 @@ Configuration: libvirt . . . . . . . $enable_libvirt load . . . . . . . . $enable_load logfile . . . . . . . $enable_logfile + lpar... . . . . . . . $enable_lpar madwifi . . . . . . . $enable_madwifi match_empty_counter . $enable_match_empty_counter match_hashed . . . . $enable_match_hashed diff --git a/contrib/collection3/etc/collection.conf b/contrib/collection3/etc/collection.conf index 9c5e3d1d..3bb3d8b1 100644 --- a/contrib/collection3/etc/collection.conf +++ b/contrib/collection3/etc/collection.conf @@ -567,6 +567,16 @@ GraphWidth 400 Module PsCputime + + Module GenericIO + DataSources read write + DSName "read Read " + DSName write Written + RRDTitle "Process disk traffic ({instance})" + RRDVerticalLabel "Bytes per second" +# RRDOptions ... + RRDFormat "%5.1lf%s" + DataSources value DSName value RSS diff --git a/contrib/collection3/lib/Collectd/Graph/Common.pm b/contrib/collection3/lib/Collectd/Graph/Common.pm index f88c22b5..c6e25081 100644 --- a/contrib/collection3/lib/Collectd/Graph/Common.pm +++ b/contrib/collection3/lib/Collectd/Graph/Common.pm @@ -106,7 +106,9 @@ sub group_files_by_plugin_instance for (my $i = 0; $i < @files; $i++) { my $file = $files[$i]; - my $key = $file->{'plugin_instance'} || ''; + my $key1 = $file->{'hostname'} || ''; + my $key2 = $file->{'plugin_instance'} || ''; + my $key = "$key1-$key2"; $data->{$key} ||= []; push (@{$data->{$key}}, $file); diff --git a/src/Makefile.am b/src/Makefile.am index 69329bf2..1a7ba5b1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -124,6 +124,18 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if BUILD_PLUGIN_AMQP +pkglib_LTLIBRARIES += amqp.la +amqp_la_SOURCES = amqp.c \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_format_json.c utils_format_json.h +amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) +amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) +amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) +collectd_LDADD += "-dlopen" amqp.la +collectd_DEPENDENCIES += amqp.la +endif + if BUILD_PLUGIN_APACHE pkglib_LTLIBRARIES += apache.la apache_la_SOURCES = apache.c @@ -513,6 +525,15 @@ collectd_LDADD += "-dlopen" logfile.la collectd_DEPENDENCIES += logfile.la endif +if BUILD_PLUGIN_LPAR +pkglib_LTLIBRARIES += lpar.la +lpar_la_SOURCES = lpar.c +lpar_la_LDFLAGS = -module -avoid-version +collectd_LDADD += "-dlopen" lpar.la +collectd_DEPENDENCIES += lpar.la +lpar_la_LIBADD = -lperfstat +endif + if BUILD_PLUGIN_MADWIFI pkglib_LTLIBRARIES += madwifi.la madwifi_la_SOURCES = madwifi.c madwifi.h diff --git a/src/amqp.c b/src/amqp.c new file mode 100644 index 00000000..f0abd44b --- /dev/null +++ b/src/amqp.c @@ -0,0 +1,939 @@ +/** + * collectd - src/amqp.c + * Copyright (C) 2009 Sebastien Pahl + * Copyright (C) 2010 Florian Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastien Pahl + * Florian Forster + **/ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" +#include "utils_cmd_putval.h" +#include "utils_format_json.h" + +#include + +#include +#include + +/* Defines for the delivery mode. I have no idea why they're not defined by the + * library.. */ +#define CAMQP_DM_VOLATILE 1 +#define CAMQP_DM_PERSISTENT 2 + +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 + +#define CAMQP_CHANNEL 1 + +/* + * Data types + */ +struct camqp_config_s +{ + _Bool publish; + char *name; + + char *host; + int port; + char *vhost; + char *user; + char *password; + + char *exchange; + char *routing_key; + + /* publish only */ + uint8_t delivery_mode; + _Bool store_rates; + int format; + + /* subscribe only */ + char *exchange_type; + char *queue; + + amqp_connection_state_t connection; + pthread_mutex_t lock; +}; +typedef struct camqp_config_s camqp_config_t; + +/* + * Global variables + */ +static const char *def_host = "localhost"; +static const char *def_vhost = "/"; +static const char *def_user = "guest"; +static const char *def_password = "guest"; +static const char *def_exchange = "amq.fanout"; + +static pthread_t *subscriber_threads = NULL; +static size_t subscriber_threads_num = 0; +static _Bool subscriber_threads_running = 1; + +#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f) + +/* + * Functions + */ +static void camqp_close_connection (camqp_config_t *conf) /* {{{ */ +{ + int sockfd; + + if ((conf == NULL) || (conf->connection == NULL)) + return; + + sockfd = amqp_get_sockfd (conf->connection); + amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); + close (sockfd); + conf->connection = NULL; +} /* }}} void camqp_close_connection */ + +static void camqp_config_free (void *ptr) /* {{{ */ +{ + camqp_config_t *conf = ptr; + + if (conf == NULL) + return; + + camqp_close_connection (conf); + + sfree (conf->name); + sfree (conf->host); + sfree (conf->vhost); + sfree (conf->user); + sfree (conf->password); + sfree (conf->exchange); + sfree (conf->exchange_type); + sfree (conf->queue); + sfree (conf->routing_key); + + sfree (conf); +} /* }}} void camqp_config_free */ + +static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */ +{ + char *ret; + + if ((in == NULL) || (in->bytes == NULL)) + return (NULL); + + ret = malloc (in->len + 1); + if (ret == NULL) + return (NULL); + + memcpy (ret, in->bytes, in->len); + ret[in->len] = 0; + + return (ret); +} /* }}} char *camqp_bytes_cstring */ + +static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */ +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return (0); + + return (1); +} /* }}} _Bool camqp_is_error */ + +static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ + char *buffer, size_t buffer_size) +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + switch (r.reply_type) + { + case AMQP_RESPONSE_NORMAL: + sstrncpy (buffer, "Success", sizeof (buffer)); + break; + + case AMQP_RESPONSE_NONE: + sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + return (sstrerror (r.library_errno, buffer, buffer_size)); + else + sstrncpy (buffer, "End of stream", sizeof (buffer)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) + { + amqp_connection_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server connection error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) + { + amqp_channel_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server channel error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else + { + ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32, + r.reply.id); + } + break; + + default: + ssnprintf (buffer, buffer_size, "Unknown reply type %i", + (int) r.reply_type); + } + + return (buffer); +} /* }}} char *camqp_strerror */ + +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + + if (conf->exchange_type == NULL) + return (0); + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ + +static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ +{ + amqp_queue_declare_ok_t *qd_ret; + amqp_basic_consume_ok_t *cm_ret; + + qd_ret = amqp_queue_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ (conf->queue != NULL) + ? amqp_cstring_bytes (conf->queue) + : AMQP_EMPTY_BYTES, + /* passive = */ 0, + /* durable = */ 0, + /* exclusive = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if (qd_ret == NULL) + { + ERROR ("amqp plugin: amqp_queue_declare failed."); + camqp_close_connection (conf); + return (-1); + } + + if (conf->queue == NULL) + { + conf->queue = camqp_bytes_cstring (&qd_ret->queue); + if (conf->queue == NULL) + { + ERROR ("amqp plugin: camqp_bytes_cstring failed."); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Created queue \"%s\".", conf->queue); + } + DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue); + + /* bind to an exchange */ + if (conf->exchange != NULL) + { + amqp_queue_bind_ok_t *qb_ret; + + assert (conf->queue != NULL); + qb_ret = amqp_queue_bind (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* routing_key = */ (conf->routing_key != NULL) + ? amqp_cstring_bytes (conf->routing_key) + : AMQP_EMPTY_BYTES, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((qb_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_queue_bind failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".", + conf->queue, conf->exchange); + } /* if (conf->exchange != NULL) */ + + cm_ret = amqp_basic_consume (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* consumer_tag = */ AMQP_EMPTY_BYTES, + /* no_local = */ 0, + /* no_ack = */ 1, + /* exclusive = */ 0); + if ((cm_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_basic_consume failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + return (0); +} /* }}} int camqp_setup_queue */ + +static int camqp_connect (camqp_config_t *conf) /* {{{ */ +{ + amqp_rpc_reply_t reply; + int sockfd; + int status; + + if (conf->connection != NULL) + return (0); + + conf->connection = amqp_new_connection (); + if (conf->connection == NULL) + { + ERROR ("amqp plugin: amqp_new_connection failed."); + return (ENOMEM); + } + + sockfd = amqp_open_socket (CONF(conf, host), conf->port); + if (sockfd < 0) + { + char errbuf[1024]; + status = (-1) * sockfd; + ERROR ("amqp plugin: amqp_open_socket failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } + amqp_set_sockfd (conf->connection, sockfd); + + reply = amqp_login (conf->connection, CONF(conf, vhost), + /* channel max = */ 0, + /* frame max = */ 131072, + /* heartbeat = */ 0, + /* authentication = */ AMQP_SASL_METHOD_PLAIN, + CONF(conf, user), CONF(conf, password)); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", + CONF(conf, vhost), CONF(conf, user)); + amqp_destroy_connection (conf->connection); + close (sockfd); + conf->connection = NULL; + return (1); + } + + amqp_channel_open (conf->connection, /* channel = */ 1); + /* FIXME: Is checking "reply.reply_type" really correct here? How does + * it get set? --octo */ + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_channel_open failed."); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); + close(sockfd); + conf->connection = NULL; + return (1); + } + + INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " + "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); + + status = camqp_create_exchange (conf); + if (status != 0) + return (status); + + if (!conf->publish) + return (camqp_setup_queue (conf)); + return (0); +} /* }}} int camqp_connect */ + +static int camqp_shutdown (void) /* {{{ */ +{ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + + return (0); +} /* }}} int camqp_shutdown */ + +/* + * Subscribing code + */ +static int camqp_read_body (camqp_config_t *conf, /* {{{ */ + size_t body_size, const char *content_type) +{ + char body[body_size + 1]; + char *body_ptr; + size_t received; + amqp_frame_t frame; + int status; + + memset (body, 0, sizeof (body)); + body_ptr = &body[0]; + received = 0; + + while (received < body_size) + { + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_BODY) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + if ((body_size - received) < frame.payload.body_fragment.len) + { + WARNING ("amqp plugin: Body is larger than indicated by header."); + return (-1); + } + + memcpy (body_ptr, frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + body_ptr += frame.payload.body_fragment.len; + received += frame.payload.body_fragment.len; + } /* while (received < body_size) */ + + if (strcasecmp ("text/collectd", content_type) == 0) + { + status = handle_putval (stderr, body); + if (status != 0) + ERROR ("amqp plugin: handle_putval failed with status %i.", + status); + return (status); + } + else if (strcasecmp ("application/json", content_type) == 0) + { + ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not " + "been implemented yet. FIXME!"); + return (0); + } + else + { + ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".", + content_type); + return (EINVAL); + } + + /* not reached */ + return (0); +} /* }}} int camqp_read_body */ + +static int camqp_read_header (camqp_config_t *conf) /* {{{ */ +{ + int status; + amqp_frame_t frame; + amqp_basic_properties_t *properties; + char *content_type; + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_HEADER) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + properties = frame.payload.properties.decoded; + content_type = camqp_bytes_cstring (&properties->content_type); + if (content_type == NULL) + { + ERROR ("amqp plugin: Unable to determine content type."); + return (-1); + } + + status = camqp_read_body (conf, + (size_t) frame.payload.properties.body_size, + content_type); + + sfree (content_type); + return (status); +} /* }}} int camqp_read_header */ + +static void *camqp_subscribe_thread (void *user_data) /* {{{ */ +{ + camqp_config_t *conf = user_data; + int status; + + while (subscriber_threads_running) + { + amqp_frame_t frame; + + status = camqp_connect (conf); + if (status != 0) + { + ERROR ("amqp plugin: camqp_connect failed. " + "Will sleep for %i seconds.", interval_g); + sleep (interval_g); + continue; + } + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + ERROR ("amqp plugin: amqp_simple_wait_frame failed. " + "Will sleep for %i seconds.", interval_g); + camqp_close_connection (conf); + sleep (interval_g); + continue; + } + + if (frame.frame_type != AMQP_FRAME_METHOD) + { + DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + continue; + } + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + { + DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32, + frame.payload.method.id); + continue; + } + + status = camqp_read_header (conf); + + amqp_maybe_release_buffers (conf->connection); + } /* while (subscriber_threads_running) */ + + camqp_config_free (conf); + pthread_exit (NULL); +} /* }}} void *camqp_subscribe_thread */ + +static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ +{ + int status; + pthread_t *tmp; + + tmp = realloc (subscriber_threads, + sizeof (*subscriber_threads) * (subscriber_threads_num + 1)); + if (tmp == NULL) + { + ERROR ("amqp plugin: realloc failed."); + camqp_config_free (conf); + return (ENOMEM); + } + subscriber_threads = tmp; + tmp = subscriber_threads + subscriber_threads_num; + memset (tmp, 0, sizeof (*tmp)); + + status = pthread_create (tmp, /* attr = */ NULL, + camqp_subscribe_thread, conf); + if (status != 0) + { + char errbuf[1024]; + ERROR ("amqp plugin: pthread_create failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_config_free (conf); + return (status); + } + + subscriber_threads_num++; + + return (0); +} /* }}} int camqp_subscribe_init */ + +/* + * Publishing code + */ +/* XXX: You must hold "conf->lock" when calling this function! */ +static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ + const char *buffer, const char *routing_key) +{ + amqp_basic_properties_t props; + int status; + + status = camqp_connect (conf); + if (status != 0) + return (status); + + memset (&props, 0, sizeof (props)); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG + | AMQP_BASIC_APP_ID_FLAG; + if (conf->format == CAMQP_FORMAT_COMMAND) + props.content_type = amqp_cstring_bytes("text/collectd"); + else if (conf->format == CAMQP_FORMAT_JSON) + props.content_type = amqp_cstring_bytes("application/json"); + else + assert (23 == 42); + props.delivery_mode = conf->delivery_mode; + props.app_id = amqp_cstring_bytes("collectd"); + + status = amqp_basic_publish(conf->connection, + /* channel = */ 1, + amqp_cstring_bytes(CONF(conf, exchange)), + amqp_cstring_bytes (routing_key), + /* mandatory = */ 0, + /* immediate = */ 0, + &props, + amqp_cstring_bytes(buffer)); + if (status != 0) + { + ERROR ("amqp plugin: amqp_basic_publish failed with status %i.", + status); + camqp_close_connection (conf); + } + + return (status); +} /* }}} int camqp_write_locked */ + +static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) +{ + camqp_config_t *conf = user_data->data; + char routing_key[6 * DATA_MAX_NAME_LEN]; + char buffer[4096]; + int status; + + if ((ds == NULL) || (vl == NULL) || (conf == NULL)) + return (EINVAL); + + memset (buffer, 0, sizeof (buffer)); + + if (conf->routing_key != NULL) + { + sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); + } + else + { + size_t i; + ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", + vl->host, + vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance); + + /* Switch slashes (the only character forbidden by collectd) and dots + * (the separation character used by AMQP). */ + for (i = 0; routing_key[i] != 0; i++) + { + if (routing_key[i] == '.') + routing_key[i] = '/'; + else if (routing_key[i] == '/') + routing_key[i] = '.'; + } + } + + if (conf->format == CAMQP_FORMAT_COMMAND) + { + status = create_putval (buffer, sizeof (buffer), ds, vl); + if (status != 0) + { + ERROR ("amqp plugin: create_putval failed with status %i.", + status); + return (status); + } + } + else if (conf->format == CAMQP_FORMAT_JSON) + { + size_t bfree = sizeof (buffer); + size_t bfill = 0; + + format_json_initialize (buffer, &bfill, &bfree); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); + format_json_finalize (buffer, &bfill, &bfree); + } + else + { + ERROR ("amqp plugin: Invalid format (%i).", conf->format); + return (-1); + } + + pthread_mutex_lock (&conf->lock); + status = camqp_write_locked (conf, buffer, routing_key); + pthread_mutex_unlock (&conf->lock); + + return (status); +} /* }}} int camqp_write */ + +/* + * Config handling + */ +static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ + camqp_config_t *conf) +{ + char *string; + int status; + + string = NULL; + status = cf_util_get_string (ci, &string); + if (status != 0) + return (status); + + assert (string != NULL); + if (strcasecmp ("Command", string) == 0) + conf->format = CAMQP_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + conf->format = CAMQP_FORMAT_JSON; + else + { + WARNING ("amqp plugin: Invalid format string: %s", + string); + } + + free (string); + + return (0); +} /* }}} int config_set_string */ + +static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ + _Bool publish) +{ + camqp_config_t *conf; + int status; + int i; + + conf = malloc (sizeof (*conf)); + if (conf == NULL) + { + ERROR ("amqp plugin: malloc failed."); + return (ENOMEM); + } + + /* Initialize "conf" {{{ */ + memset (conf, 0, sizeof (*conf)); + conf->publish = publish; + conf->name = NULL; + conf->format = CAMQP_FORMAT_COMMAND; + conf->host = NULL; + conf->port = 5672; + conf->vhost = NULL; + conf->user = NULL; + conf->password = NULL; + conf->exchange = NULL; + conf->routing_key = NULL; + /* publish only */ + conf->delivery_mode = CAMQP_DM_VOLATILE; + conf->store_rates = 0; + /* subscribe only */ + conf->exchange_type = NULL; + conf->queue = NULL; + /* general */ + conf->connection = NULL; + pthread_mutex_init (&conf->lock, /* attr = */ NULL); + /* }}} */ + + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + sfree (conf); + return (status); + } + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) + { + status = cf_util_get_port_number (child); + if (status > 0) + { + conf->port = status; + status = 0; + } + } + else if (strcasecmp ("VHost", child->key) == 0) + status = cf_util_get_string (child, &conf->vhost); + else if (strcasecmp ("User", child->key) == 0) + status = cf_util_get_string (child, &conf->user); + else if (strcasecmp ("Password", child->key) == 0) + status = cf_util_get_string (child, &conf->password); + else if (strcasecmp ("Exchange", child->key) == 0) + status = cf_util_get_string (child, &conf->exchange); + else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish) + status = cf_util_get_string (child, &conf->exchange_type); + else if ((strcasecmp ("Queue", child->key) == 0) && !publish) + status = cf_util_get_string (child, &conf->queue); + else if (strcasecmp ("RoutingKey", child->key) == 0) + status = cf_util_get_string (child, &conf->routing_key); + else if ((strcasecmp ("Persistent", child->key) == 0) && publish) + { + _Bool tmp = 0; + status = cf_util_get_boolean (child, &tmp); + if (tmp) + conf->delivery_mode = CAMQP_DM_PERSISTENT; + else + conf->delivery_mode = CAMQP_DM_VOLATILE; + } + else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) + status = cf_util_get_boolean (child, &conf->store_rates); + else if ((strcasecmp ("Format", child->key) == 0) && publish) + status = camqp_config_set_format (child, conf); + else + WARNING ("amqp plugin: Ignoring unknown " + "configuration option \"%s\".", child->key); + + if (status != 0) + break; + } /* for (i = 0; i < ci->children_num; i++) */ + + if ((status == 0) && (conf->exchange == NULL)) + { + if (conf->exchange_type != NULL) + WARNING ("amqp plugin: The option \"ExchangeType\" was given " + "without the \"Exchange\" option. It will be ignored."); + + if (!publish && (conf->routing_key != NULL)) + WARNING ("amqp plugin: The option \"RoutingKey\" was given " + "without the \"Exchange\" option. It will be ignored."); + + } + + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + + if (conf->exchange != NULL) + { + DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;", + conf->exchange); + } + + if (publish) + { + char cbname[128]; + user_data_t ud = { conf, camqp_config_free }; + + ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name); + + status = plugin_register_write (cbname, camqp_write, &ud); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } + else + { + status = camqp_subscribe_init (conf); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } + + return (0); +} /* }}} int camqp_config_connection */ + +static int camqp_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Publish", child->key) == 0) + camqp_config_connection (child, /* publish = */ 1); + else if (strcasecmp ("Subscribe", child->key) == 0) + camqp_config_connection (child, /* publish = */ 0); + else + WARNING ("amqp plugin: Ignoring unknown config option \"%s\".", + child->key); + } /* for (ci->children_num) */ + + return (0); +} /* }}} int camqp_config */ + +void module_register (void) +{ + plugin_register_complex_config ("amqp", camqp_config); + plugin_register_shutdown ("amqp", camqp_shutdown); +} /* void module_register */ + +/* vim: set sw=4 sts=4 et fdm=marker : */ diff --git a/src/collectd-python.pod b/src/collectd-python.pod index b9408a3d..267296cf 100644 --- a/src/collectd-python.pod +++ b/src/collectd-python.pod @@ -1,3 +1,13 @@ +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + =head1 NAME collectd-python - Documentation of collectd's C @@ -693,14 +703,8 @@ dispatched by the python plugin after upgrades. =item -This plugin is not compatible with python3. Trying to compile it with python3 -will fail because of the ways string, unicode and bytearray behavior was -changed. - -=item - Not all aspects of the collectd API are accessible from python. This includes -but is not limited to meta-data, filters and data sets. +but is not limited to filters and data sets. =back diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 12cec759..e194d142 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -52,6 +52,7 @@ # to missing dependencies or because they have been deactivated explicitly. # ############################################################################## +#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors @@ -150,6 +151,20 @@ # ription of those options is available in the collectd.conf(5) manual page. # ############################################################################## +# +# +# Host "localhost" +# Port "5672" +# VHost "/" +# User "guest" +# Password "guest" +# Exchange "amq.fanout" +# RoutingKey "collectd" +# Persistent false +# StoreRates false +# +# + # # # URL "http://localhost/status?auto" @@ -898,6 +913,7 @@ # SocketFile "@prefix@/var/run/@PACKAGE_NAME@-unixsock" # SocketGroup "collectd" # SocketPerms "0660" +# DeleteSocket false # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 8481a542..1c31d97f 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -181,6 +181,143 @@ A list of all plugins and a short summary for each plugin can be found in the F file shipped with the sourcecode and hopefully binary packets as well. +=head2 Plugin C + +The I can be used to communicate with other instances of +I or third party applications using an AMQP message broker. Values +are sent to or received from the broker, which handles routing, queueing and +possibly filtering or messages. + + + # Send values to an AMQP broker + + Host "localhost" + Port "5672" + VHost "/" + User "guest" + Password "guest" + Exchange "amq.fanout" + # ExchangeType "fanout" + # RoutingKey "collectd" + # Persistent false + # Format "command" + # StoreRates false + + + # Receive values from an AMQP broker + + Host "localhost" + Port "5672" + VHost "/" + User "guest" + Password "guest" + Exchange "amq.fanout" + # ExchangeType "fanout" + # Queue "queue_name" + # RoutingKey "collectd.#" + + + +The plugin's configuration consists of a number of I and I +blocks, which configure sending and receiving of values respectively. The two +blocks are very similar, so unless otherwise noted, an option can be used in +either block. The name given in the blocks starting tag is only used for +reporting messages, but may be used to support I of certain +I blocks in the future. + +=over 4 + +=item B I + +Hostname or IP-address of the AMQP broker. Defaults to the default behavior of +the underlying communications library, I, which is "localhost". + +=item B I + +Service name or port number on which the AMQP broker accepts connections. This +argument must be a string, even if the numeric form is used. Defaults to +"5672". + +=item B I + +Name of the I on the AMQP broker to use. Defaults to "/". + +=item B I + +=item B I + +Credentials used to authenticate to the AMQP broker. By default "guest"/"guest" +is used. + +=item B I + +In I blocks, this option specifies the I to send values to. +By default, "amq.fanout" will be used. + +In I blocks this option is optional. If given, a I between +the given exchange and the I is created, using the I if +configured. See the B and B options below. + +=item B I + +If given, the plugin will try to create the configured I with this +I after connecting. When in a I block, the I will then +be bound to this exchange. + +=item B I (Subscribe only) + +Configures the I name to subscribe to. If no queue name was configures +explicitly, a unique queue name will be created by the broker. + +=item B I + +In I blocks, this configures the routing key to set on all outgoing +messages. If not given, the routing key will be computed from the I +of the value. The host, plugin, type and the two instances are concatenated +together using dots as the separator and all containing dots replaced with +slashes. For example "collectd.host/example/com.cpu.0.cpu.user". This makes it +possible to receive only specific values using a "topic" exchange. + +In I blocks, configures the I used when creating a +I between an I and the I. The usual wildcards can be +used to filter messages when using a "topic" exchange. If you're only +interested in CPU statistics, you could use the routing key "collectd.*.cpu.#" +for example. + +=item B B|B (Publish only) + +Selects the I to use. If set to B, the I +mode will be used, i.e. delivery is guaranteed. If set to B (the +default), the I delivery mode will be used, i.e. messages may be +lost due to high load, overflowing queues or similar issues. + +=item B B|B (Publish only) + +Selects the format in which messages are sent to the broker. If set to +B (the default), values are sent as C commands which are +identical to the syntax used by the I and I. In this +case, the C header field will be set to C. + +If set to B, the values are encoded in the I, +an easy and straight forward exchange format. The C header field +will be set to C. + +A subscribing client I use the C header field to +determine how to decode the values. Currently, the I itself can +only decode the B format. + +=item B B|B (Publish only) + +Determines whether or not C, C and C data sources +are converted to a I (i.e. a C value). If set to B (the +default), no conversion is performed. Otherwise the conversion is performed +using the internal value cache. + +Please note that currently this option is only used if the B option has +been set to B. + +=back + =head2 Plugin C To configure the C-plugin you first need to configure the Apache @@ -4421,6 +4558,13 @@ Change the file permissions of the UNIX-socket after it has been created. The permissions must be given as a numeric, octal value as you would pass to L. Defaults to B<0770>. +=item B B|B + +If set to B, delete the socket file before calling L, if a file +with the given name already exists. If I crashes a socket file may be +left over, preventing the daemon from opening a new socket when restarted. +Since this is potentially dangerous, this defaults to B. + =back =head2 Plugin C diff --git a/src/collectd.h b/src/collectd.h index af0033a5..8dd0f426 100644 --- a/src/collectd.h +++ b/src/collectd.h @@ -212,10 +212,6 @@ # include #endif -#if HAVE_SENSORS_SENSORS_H -# include -#endif - #ifndef PACKAGE_NAME #define PACKAGE_NAME "collectd" #endif diff --git a/src/curl_json.c b/src/curl_json.c index fbac7ad1..433764e2 100644 --- a/src/curl_json.c +++ b/src/curl_json.c @@ -758,6 +758,7 @@ static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */ if (db->yajl == NULL) { ERROR ("curl_json plugin: yajl_alloc failed."); + db->yajl = yprev; return (-1); } diff --git a/src/lpar.c b/src/lpar.c new file mode 100644 index 00000000..4d534476 --- /dev/null +++ b/src/lpar.c @@ -0,0 +1,273 @@ +/** + * collectd - src/lpar.c + * Copyright (C) 2010 Aurélien Reynaud + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Aurélien Reynaud + **/ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" + +#include +#include +#include + +/* XINTFRAC was defined in libperfstat.h somewhere between AIX 5.3 and 6.1 */ +#ifndef XINTFRAC +# include +# define XINTFRAC ((double)(_system_configuration.Xint) / \ + (double)(_system_configuration.Xfrac)) +#endif + +#define CLOCKTICKS_TO_TICKS(cticks) ((cticks) / XINTFRAC) + +static const char *config_keys[] = +{ + "CpuPoolStats", + "ReportBySerial" +}; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + +static _Bool pool_stats = 0; +static _Bool report_by_serial = 0; +#if PERFSTAT_SUPPORTS_DONATION +static _Bool donate_flag = 0; +#endif +static char serial[SYS_NMLN]; + +static perfstat_partition_total_t lparstats_old; + +static int lpar_config (const char *key, const char *value) +{ + if (strcasecmp ("CpuPoolStats", key) == 0) + { + if (IS_TRUE (value)) + pool_stats = 1; + else + pool_stats = 0; + } + else if (strcasecmp ("ReportBySerial", key) == 0) + { + if (IS_TRUE (value)) + report_by_serial = 1; + else + report_by_serial = 0; + } + else + { + return (-1); + } + + return (0); +} /* int lpar_config */ + +static int lpar_init (void) +{ + int status; + + /* Retrieve the initial metrics. Returns the number of structures filled. */ + status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */ + &lparstats_old, sizeof (perfstat_partition_total_t), + /* number = */ 1 /* (must be 1) */); + if (status != 1) + { + char errbuf[1024]; + ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)", + sstrerror (errno, errbuf, sizeof (errbuf)), + status); + return (-1); + } + +#if PERFSTAT_SUPPORTS_DONATION + if (!lparstats_old.type.b.shared_enabled + && lparstats_old.type.b.donate_enabled) + { + donate_flag = 1; + } +#endif + + if (pool_stats && !lparstats_old.type.b.pool_util_authority) + { + WARNING ("lpar plugin: This partition does not have pool authority. " + "Disabling CPU pool statistics collection."); + pool_stats = 0; + } + + return (0); +} /* int lpar_init */ + +static void lpar_submit (const char *type_instance, double value) +{ + value_t values[1]; + value_list_t vl = VALUE_LIST_INIT; + + values[0].gauge = (gauge_t)value; + + vl.values = values; + vl.values_len = 1; + if (report_by_serial) + { + sstrncpy (vl.host, serial, sizeof (vl.host)); + sstrncpy (vl.plugin_instance, hostname_g, sizeof (vl.plugin)); + } + else + { + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + } + sstrncpy (vl.plugin, "lpar", sizeof (vl.plugin)); + sstrncpy (vl.type, "vcpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); + + plugin_dispatch_values (&vl); +} /* void lpar_submit */ + +static int lpar_read (void) +{ + perfstat_partition_total_t lparstats; + int status; + struct utsname name; + u_longlong_t ticks; + u_longlong_t user_ticks, syst_ticks, wait_ticks, idle_ticks; + u_longlong_t consumed_ticks; + double entitled_proc_capacity; + + /* An LPAR has the same serial number as the physical system it is currently + running on. It is a convenient way of tracking LPARs as they are moved + from chassis to chassis through Live Partition Mobility (LPM). */ + if (uname (&name) != 0) + { + ERROR ("lpar plugin: uname failed."); + return (-1); + } + sstrncpy (serial, name.machine, sizeof (serial)); + + /* Retrieve the current metrics. Returns the number of structures filled. */ + status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */ + &lparstats, sizeof (perfstat_partition_total_t), + /* number = */ 1 /* (must be 1) */); + if (status != 1) + { + char errbuf[1024]; + ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)", + sstrerror (errno, errbuf, sizeof (errbuf)), + status); + return (-1); + } + + /* Number of ticks since we last run. */ + ticks = lparstats.timebase_last - lparstats_old.timebase_last; + if (ticks == 0) + { + /* The stats have not been updated. Return now to avoid + * dividing by zero */ + return (0); + } + + /* + * On a shared partition, we're "entitled" to a certain amount of + * processing power, for example 250/100 of a physical CPU. Processing + * capacity not used by the partition may be assigned to a different + * partition by the hypervisor, so "idle" is hopefully a very small + * number. + * + * A dedicated partition may donate its CPUs to another partition and + * may steal ticks from somewhere else (another partition or maybe the + * shared pool, I don't know --octo). + */ + + /* entitled_proc_capacity is in 1/100th of a CPU */ + entitled_proc_capacity = 0.01 * ((double) lparstats.entitled_proc_capacity); + lpar_submit ("entitled", entitled_proc_capacity); + + /* The number of ticks actually spent in the various states */ + user_ticks = lparstats.puser - lparstats_old.puser; + syst_ticks = lparstats.psys - lparstats_old.psys; + wait_ticks = lparstats.pwait - lparstats_old.pwait; + idle_ticks = lparstats.pidle - lparstats_old.pidle; + consumed_ticks = user_ticks + syst_ticks + wait_ticks + idle_ticks; + + lpar_submit ("user", (double) user_ticks / (double) ticks); + lpar_submit ("system", (double) syst_ticks / (double) ticks); + lpar_submit ("wait", (double) wait_ticks / (double) ticks); + lpar_submit ("idle", (double) idle_ticks / (double) ticks); + +#if PERFSTAT_SUPPORTS_DONATION + if (donate_flag) + { + /* donated => ticks given to another partition + * stolen => ticks received from another partition */ + u_longlong_t idle_donated_ticks, busy_donated_ticks; + u_longlong_t idle_stolen_ticks, busy_stolen_ticks; + + /* FYI: PURR == Processor Utilization of Resources Register + * SPURR == Scaled PURR */ + idle_donated_ticks = lparstats.idle_donated_purr - lparstats_old.idle_donated_purr; + busy_donated_ticks = lparstats.busy_donated_purr - lparstats_old.busy_donated_purr; + idle_stolen_ticks = lparstats.idle_stolen_purr - lparstats_old.idle_stolen_purr; + busy_stolen_ticks = lparstats.busy_stolen_purr - lparstats_old.busy_stolen_purr; + + lpar_submit ("idle_donated", (double) idle_donated_ticks / (double) ticks); + lpar_submit ("busy_donated", (double) busy_donated_ticks / (double) ticks); + lpar_submit ("idle_stolen", (double) idle_stolen_ticks / (double) ticks); + lpar_submit ("busy_stolen", (double) busy_stolen_ticks / (double) ticks); + + /* Donated ticks will be accounted for as stolen ticks in other LPARs */ + consumed_ticks += idle_stolen_ticks + busy_stolen_ticks; + } +#endif + + lpar_submit ("consumed", (double) consumed_ticks / (double) ticks); + + if (pool_stats) + { + char typinst[DATA_MAX_NAME_LEN]; + u_longlong_t pool_idle_cticks; + double pool_idle_cpus; + double pool_busy_cpus; + + /* We're calculating "busy" from "idle" and the total number of + * CPUs, because the "busy" member didn't exist in early versions + * of libperfstat. It was added somewhere between AIX 5.3 ML5 and ML9. */ + pool_idle_cticks = lparstats.pool_idle_time - lparstats_old.pool_idle_time; + pool_idle_cpus = CLOCKTICKS_TO_TICKS ((double) pool_idle_cticks) / (double) ticks; + pool_busy_cpus = ((double) lparstats.phys_cpus_pool) - pool_idle_cpus; + if (pool_busy_cpus < 0.0) + pool_busy_cpus = 0.0; + + ssnprintf (typinst, sizeof (typinst), "pool-%X-busy", lparstats.pool_id); + lpar_submit (typinst, pool_busy_cpus); + + ssnprintf (typinst, sizeof (typinst), "pool-%X-idle", lparstats.pool_id); + lpar_submit (typinst, pool_idle_cpus); + } + + memcpy (&lparstats_old, &lparstats, sizeof (lparstats_old)); + + return (0); +} /* int lpar_read */ + +void module_register (void) +{ + plugin_register_config ("lpar", lpar_config, + config_keys, config_keys_num); + plugin_register_init ("lpar", lpar_init); + plugin_register_read ("lpar", lpar_read); +} /* void module_register */ + +/* vim: set sw=8 noet : */ + diff --git a/src/match_value.c b/src/match_value.c index 9f02226b..ae6282c4 100644 --- a/src/match_value.c +++ b/src/match_value.c @@ -53,9 +53,18 @@ struct mv_match_s */ static void mv_free_match (mv_match_t *m) /* {{{ */ { + int i; + if (m == NULL) return; + if (m->data_sources != NULL) + { + for (i = 0; i < m->data_sources_num; ++i) + free(m->data_sources[i]); + free(m->data_sources); + } + free (m); } /* }}} void mv_free_match */ diff --git a/src/netapp.c b/src/netapp.c index 1640cfd2..5c4b6e73 100644 --- a/src/netapp.c +++ b/src/netapp.c @@ -566,7 +566,7 @@ static int submit_values (const char *host, /* {{{ */ const char *plugin_inst, const char *type, const char *type_inst, value_t *values, int values_len, - cdtime_t timestamp) + cdtime_t timestamp, cdtime_t interval) { value_list_t vl = VALUE_LIST_INIT; @@ -576,6 +576,9 @@ static int submit_values (const char *host, /* {{{ */ if (timestamp > 0) vl.time = timestamp; + if (interval > 0) + vl.interval = interval; + if (host != NULL) sstrncpy (vl.host, host, sizeof (vl.host)); else @@ -592,7 +595,7 @@ static int submit_values (const char *host, /* {{{ */ static int submit_two_counters (const char *host, const char *plugin_inst, /* {{{ */ const char *type, const char *type_inst, counter_t val0, counter_t val1, - cdtime_t timestamp) + cdtime_t timestamp, cdtime_t interval) { value_t values[2]; @@ -600,23 +603,24 @@ static int submit_two_counters (const char *host, const char *plugin_inst, /* {{ values[1].counter = val1; return (submit_values (host, plugin_inst, type, type_inst, - values, 2, timestamp)); + values, 2, timestamp, interval)); } /* }}} int submit_two_counters */ static int submit_counter (const char *host, const char *plugin_inst, /* {{{ */ - const char *type, const char *type_inst, counter_t counter, cdtime_t timestamp) + const char *type, const char *type_inst, counter_t counter, + cdtime_t timestamp, cdtime_t interval) { value_t v; v.counter = counter; return (submit_values (host, plugin_inst, type, type_inst, - &v, 1, timestamp)); + &v, 1, timestamp, interval)); } /* }}} int submit_counter */ static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ */ const char *type, const char *type_inst, gauge_t val0, gauge_t val1, - cdtime_t timestamp) + cdtime_t timestamp, cdtime_t interval) { value_t values[2]; @@ -624,18 +628,19 @@ static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ * values[1].gauge = val1; return (submit_values (host, plugin_inst, type, type_inst, - values, 2, timestamp)); + values, 2, timestamp, interval)); } /* }}} int submit_two_gauge */ static int submit_double (const char *host, const char *plugin_inst, /* {{{ */ - const char *type, const char *type_inst, double d, cdtime_t timestamp) + const char *type, const char *type_inst, double d, + cdtime_t timestamp, cdtime_t interval) { value_t v; v.gauge = (gauge_t) d; return (submit_values (host, plugin_inst, type, type_inst, - &v, 1, timestamp)); + &v, 1, timestamp, interval)); } /* }}} int submit_uint64 */ /* Calculate hit ratio from old and new counters and submit the resulting @@ -647,7 +652,8 @@ static int submit_cache_ratio (const char *host, /* {{{ */ uint64_t new_misses, uint64_t old_hits, uint64_t old_misses, - cdtime_t timestamp) + cdtime_t timestamp, + cdtime_t interval) { value_t v; @@ -664,12 +670,12 @@ static int submit_cache_ratio (const char *host, /* {{{ */ } return (submit_values (host, plugin_inst, "cache_ratio", type_inst, - &v, 1, timestamp)); + &v, 1, timestamp, interval)); } /* }}} int submit_cache_ratio */ /* Submits all the caches used by WAFL. Uses "submit_cache_ratio". */ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ */ - cfg_wafl_t *old_data, const cfg_wafl_t *new_data) + cfg_wafl_t *old_data, const cfg_wafl_t *new_data, int interval) { /* Submit requested counters */ if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_NAME_CACHE | HAVE_WAFL_NAME_CACHE) @@ -677,28 +683,28 @@ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ submit_cache_ratio (hostname, instance, "name_cache_hit", new_data->name_cache_hit, new_data->name_cache_miss, old_data->name_cache_hit, old_data->name_cache_miss, - new_data->timestamp); + new_data->timestamp, interval); if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_DIR_CACHE | HAVE_WAFL_FIND_DIR) && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_FIND_DIR)) submit_cache_ratio (hostname, instance, "find_dir_hit", new_data->find_dir_hit, new_data->find_dir_miss, old_data->find_dir_hit, old_data->find_dir_miss, - new_data->timestamp); + new_data->timestamp, interval); if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_BUF_CACHE | HAVE_WAFL_BUF_HASH) && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_BUF_HASH)) submit_cache_ratio (hostname, instance, "buf_hash_hit", new_data->buf_hash_hit, new_data->buf_hash_miss, old_data->buf_hash_hit, old_data->buf_hash_miss, - new_data->timestamp); + new_data->timestamp, interval); if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_INODE_CACHE | HAVE_WAFL_INODE_CACHE) && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_INODE_CACHE)) submit_cache_ratio (hostname, instance, "inode_cache_hit", new_data->inode_cache_hit, new_data->inode_cache_miss, old_data->inode_cache_hit, old_data->inode_cache_miss, - new_data->timestamp); + new_data->timestamp, interval); /* Clear old HAVE_* flags */ old_data->flags &= ~HAVE_WAFL_ALL; @@ -724,7 +730,7 @@ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ * update flags appropriately. */ static int submit_volume_perf_data (const char *hostname, /* {{{ */ data_volume_perf_t *old_data, - const data_volume_perf_t *new_data) + const data_volume_perf_t *new_data, int interval) { char plugin_instance[DATA_MAX_NAME_LEN]; @@ -739,7 +745,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */ && HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_BYTES_READ | HAVE_VOLUME_PERF_BYTES_WRITE)) { submit_two_counters (hostname, plugin_instance, "disk_octets", /* type instance = */ NULL, - (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp); + (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp, interval); } /* Check for and submit disk-operations values */ @@ -747,7 +753,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */ && HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_OPS_READ | HAVE_VOLUME_PERF_OPS_WRITE)) { submit_two_counters (hostname, plugin_instance, "disk_ops", /* type instance = */ NULL, - (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp); + (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp, interval); } /* Check for, calculate and submit disk-latency values */ @@ -791,7 +797,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */ } submit_two_gauge (hostname, plugin_instance, "disk_latency", /* type instance = */ NULL, - latency_per_op_read, latency_per_op_write, new_data->timestamp); + latency_per_op_read, latency_per_op_write, new_data->timestamp, interval); } /* Clear all HAVE_* flags. */ @@ -830,7 +836,7 @@ static cdtime_t cna_child_get_cdtime (na_elem_t *data) /* {{{ */ */ /* Data corresponding to */ static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /* {{{ */ - na_elem_t *data) + na_elem_t *data, int interval) { cfg_wafl_t perf_data; const char *plugin_inst; @@ -909,7 +915,7 @@ static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /* } } - return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data)); + return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data, interval)); } /* }}} void cna_handle_wafl_data */ static int cna_setup_wafl (cfg_wafl_t *cw) /* {{{ */ @@ -983,7 +989,7 @@ static int cna_query_wafl (host_config_t *host) /* {{{ */ return (-1); } - status = cna_handle_wafl_data (host->name, host->cfg_wafl, data); + status = cna_handle_wafl_data (host->name, host->cfg_wafl, data, host->interval); if (status == 0) host->cfg_wafl->interval.last_read = now; @@ -994,7 +1000,7 @@ static int cna_query_wafl (host_config_t *host) /* {{{ */ /* Data corresponding to */ static int cna_handle_disk_data (const char *hostname, /* {{{ */ - cfg_disk_t *cfg_disk, na_elem_t *data) + cfg_disk_t *cfg_disk, na_elem_t *data, cdtime_t interval) { cdtime_t timestamp; na_elem_t *instances; @@ -1108,7 +1114,7 @@ static int cna_handle_disk_data (const char *hostname, /* {{{ */ if ((cfg_disk->flags & CFG_DISK_BUSIEST) && (worst_disk != NULL)) submit_double (hostname, "system", "percent", "disk_busy", - worst_disk->disk_busy_percent, timestamp); + worst_disk->disk_busy_percent, timestamp, interval); return (0); } /* }}} int cna_handle_disk_data */ @@ -1178,7 +1184,7 @@ static int cna_query_disk (host_config_t *host) /* {{{ */ return (-1); } - status = cna_handle_disk_data (host->name, host->cfg_disk, data); + status = cna_handle_disk_data (host->name, host->cfg_disk, data, host->interval); if (status == 0) host->cfg_disk->interval.last_read = now; @@ -1189,7 +1195,7 @@ static int cna_query_disk (host_config_t *host) /* {{{ */ /* Data corresponding to */ static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */ - cfg_volume_perf_t *cvp, na_elem_t *data) + cfg_volume_perf_t *cvp, na_elem_t *data, cdtime_t interval) { cdtime_t timestamp; na_elem_t *elem_instances; @@ -1274,7 +1280,7 @@ static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */ } } /* for (elem_counter) */ - submit_volume_perf_data (hostname, v, &perf_data); + submit_volume_perf_data (hostname, v, &perf_data, interval); } /* for (volume) */ return (0); @@ -1349,7 +1355,7 @@ static int cna_query_volume_perf (host_config_t *host) /* {{{ */ return (-1); } - status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data); + status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data, host->interval); if (status == 0) host->cfg_volume_perf->interval.last_read = now; @@ -1360,7 +1366,7 @@ static int cna_query_volume_perf (host_config_t *host) /* {{{ */ /* Data corresponding to */ static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */ - cfg_volume_usage_t *cfg_volume) + cfg_volume_usage_t *cfg_volume, int interval) { data_volume_usage_t *v; @@ -1408,32 +1414,32 @@ static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */ if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_FREE)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "free", - (double) norm_free, /* timestamp = */ 0); + (double) norm_free, /* timestamp = */ 0, interval); if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SIS_SAVED)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "sis_saved", - (double) sis_saved, /* timestamp = */ 0); + (double) sis_saved, /* timestamp = */ 0, interval); if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_USED)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "used", - (double) norm_used, /* timestamp = */ 0); + (double) norm_used, /* timestamp = */ 0, interval); if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_RSVD)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "snap_reserved", - (double) snap_reserve_free, /* timestamp = */ 0); + (double) snap_reserve_free, /* timestamp = */ 0, interval); if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED | HAVE_VOLUME_USAGE_SNAP_RSVD)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "snap_reserve_used", - (double) snap_reserve_used, /* timestamp = */ 0); + (double) snap_reserve_used, /* timestamp = */ 0, interval); if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED)) submit_double (hostname, /* plugin instance = */ plugin_instance, "df_complex", "snap_normal_used", - (double) snap_norm_used, /* timestamp = */ 0); + (double) snap_norm_used, /* timestamp = */ 0, interval); /* Clear all the HAVE_* flags */ v->flags &= ~HAVE_VOLUME_USAGE_ALL; @@ -1660,7 +1666,7 @@ static int cna_handle_volume_usage_data (const host_config_t *host, /* {{{ */ } /* }}} end of 32-bit workaround */ } /* for (elem_volume) */ - return (cna_submit_volume_usage_data (host->name, cfg_volume)); + return (cna_submit_volume_usage_data (host->name, cfg_volume, host->interval)); } /* }}} int cna_handle_volume_usage_data */ static int cna_setup_volume_usage (cfg_volume_usage_t *cvu) /* {{{ */ @@ -1724,7 +1730,7 @@ static int cna_query_volume_usage (host_config_t *host) /* {{{ */ /* Data corresponding to */ static int cna_handle_system_data (const char *hostname, /* {{{ */ - cfg_system_t *cfg_system, na_elem_t *data) + cfg_system_t *cfg_system, na_elem_t *data, int interval) { na_elem_t *instances; na_elem_t *counter; @@ -1796,27 +1802,27 @@ static int cna_handle_system_data (const char *hostname, /* {{{ */ && (value > 0) && (strlen(name) > 4) && (!strcmp(name + strlen(name) - 4, "_ops"))) { submit_counter (hostname, instance, "disk_ops_complex", name, - (counter_t) value, timestamp); + (counter_t) value, timestamp, interval); } } /* for (counter) */ if ((cfg_system->flags & CFG_SYSTEM_DISK) && (HAS_ALL_FLAGS (counter_flags, 0x01 | 0x02))) submit_two_counters (hostname, instance, "disk_octets", NULL, - disk_read, disk_written, timestamp); + disk_read, disk_written, timestamp, interval); if ((cfg_system->flags & CFG_SYSTEM_NET) && (HAS_ALL_FLAGS (counter_flags, 0x04 | 0x08))) submit_two_counters (hostname, instance, "if_octets", NULL, - net_recv, net_sent, timestamp); + net_recv, net_sent, timestamp, interval); if ((cfg_system->flags & CFG_SYSTEM_CPU) && (HAS_ALL_FLAGS (counter_flags, 0x10 | 0x20))) { submit_counter (hostname, instance, "cpu", "system", - cpu_busy, timestamp); + cpu_busy, timestamp, interval); submit_counter (hostname, instance, "cpu", "idle", - cpu_total - cpu_busy, timestamp); + cpu_total - cpu_busy, timestamp, interval); } return (0); @@ -1872,7 +1878,7 @@ static int cna_query_system (host_config_t *host) /* {{{ */ return (-1); } - status = cna_handle_system_data (host->name, host->cfg_system, data); + status = cna_handle_system_data (host->name, host->cfg_system, data, host->interval); if (status == 0) host->cfg_system->interval.last_read = now; diff --git a/src/network.c b/src/network.c index eb32ad15..7a1fcf7f 100644 --- a/src/network.c +++ b/src/network.c @@ -31,6 +31,7 @@ #include "utils_fbhash.h" #include "utils_avltree.h" #include "utils_cache.h" +#include "utils_complain.h" #include "network.h" @@ -917,6 +918,8 @@ static int parse_packet (sockent_t *se, static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ void **ret_buffer, size_t *ret_buffer_len, int flags) { + static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC; + char *buffer; size_t buffer_len; size_t buffer_offset; @@ -938,8 +941,9 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ if (se->data.server.userdb == NULL) { - NOTICE ("network plugin: Received signed network packet but can't verify " - "it because no user DB has been configured. Will accept it."); + c_complain (LOG_NOTICE, &complain_no_users, + "network plugin: Received signed network packet but can't verify it " + "because no user DB has been configured. Will accept it."); return (0); } diff --git a/src/notify_email.c b/src/notify_email.c index a13b1f9e..da6894a3 100644 --- a/src/notify_email.c +++ b/src/notify_email.c @@ -1,6 +1,7 @@ /** * collectd - src/notify_email.c * Copyright (C) 2008 Oleg King + * Copyright (C) 2010 Florian Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -18,6 +19,7 @@ * * Authors: * Oleg King + * Florian Forster **/ #include "collectd.h" @@ -26,6 +28,7 @@ #include #include +#include #define MAXSTRING 256 @@ -45,6 +48,7 @@ static char **recipients; static int recipients_len = 0; static smtp_session_t session; +static pthread_mutex_t session_lock = PTHREAD_MUTEX_INITIALIZER; static smtp_message_t message; static auth_context_t authctx = NULL; @@ -113,17 +117,23 @@ static int notify_email_init (void) { char server[MAXSTRING]; + ssnprintf(server, sizeof (server), "%s:%i", + (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host, + smtp_port); + + pthread_mutex_lock (&session_lock); + auth_client_init(); - if (!(session = smtp_create_session ())) { + + session = smtp_create_session (); + if (session == NULL) { + pthread_mutex_unlock (&session_lock); ERROR ("notify_email plugin: cannot create SMTP session"); return (-1); } smtp_set_monitorcb (session, monitor_cb, NULL, 1); smtp_set_hostname (session, hostname_g); - ssnprintf(server, sizeof (server), "%s:%i", - (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host, - smtp_port); smtp_set_server (session, server); if (smtp_user && smtp_password) { @@ -133,18 +143,30 @@ static int notify_email_init (void) } if ( !smtp_auth_set_context (session, authctx)) { + pthread_mutex_unlock (&session_lock); ERROR ("notify_email plugin: cannot set SMTP auth context"); return (-1); } + pthread_mutex_unlock (&session_lock); return (0); } /* int notify_email_init */ static int notify_email_shutdown (void) { - smtp_destroy_session (session); - auth_destroy_context (authctx); + pthread_mutex_lock (&session_lock); + + if (session != NULL) + smtp_destroy_session (session); + session = NULL; + + if (authctx != NULL) + auth_destroy_context (authctx); + authctx = NULL; + auth_client_exit(); + + pthread_mutex_unlock (&session_lock); return (0); } /* int notify_email_shutdown */ @@ -250,7 +272,16 @@ static int notify_email_notification (const notification_t *n, n->host, n->message); + pthread_mutex_lock (&session_lock); + + if (session == NULL) { + /* Initialization failed or we're in the process of shutting down. */ + pthread_mutex_unlock (&session_lock); + return (-1); + } + if (!(message = smtp_add_message (session))) { + pthread_mutex_unlock (&session_lock); ERROR ("notify_email plugin: cannot set SMTP message"); return (-1); } @@ -266,6 +297,7 @@ static int notify_email_notification (const notification_t *n, char buf[MAXSTRING]; ERROR ("notify_email plugin: SMTP server problem: %s", smtp_strerror (smtp_errno (), buf, sizeof buf)); + pthread_mutex_unlock (&session_lock); return (-1); } else { const smtp_status_t *status; @@ -276,6 +308,7 @@ static int notify_email_notification (const notification_t *n, smtp_enumerate_recipients (message, print_recipient_status, NULL); } + pthread_mutex_unlock (&session_lock); return (0); } /* int notify_email_notification */ diff --git a/src/python.c b/src/python.c index a3027e0d..eed0591d 100644 --- a/src/python.c +++ b/src/python.c @@ -978,6 +978,7 @@ PyMODINIT_FUNC PyInit_collectd(void) { static int cpy_config(oconfig_item_t *ci) { int i; + char *argv = ""; PyObject *sys, *tb; PyObject *sys_path; PyObject *module; @@ -1017,6 +1018,9 @@ static int cpy_config(oconfig_item_t *ci) { cpy_log_exception("python initialization"); return 1; } + PySys_SetArgv(1, &argv); + PyList_SetSlice(sys_path, 0, 1, NULL); + #ifdef IS_PY3K module = PyImport_ImportModule("collectd"); #else diff --git a/src/target_v5upgrade.c b/src/target_v5upgrade.c index 7fc0d421..25f4637d 100644 --- a/src/target_v5upgrade.c +++ b/src/target_v5upgrade.c @@ -210,6 +210,213 @@ static int v5_mysql_threads (const data_set_t *ds, value_list_t *vl) /* {{{ */ return (FC_TARGET_STOP); } /* }}} int v5_mysql_threads */ +/* + * ZFS ARC hit and miss counters + * + * 4.* uses the flawed "arc_counts" type. In 5.* this has been replaced by the + * more generic "cache_result" type. + */ +static int v5_zfs_arc_counts (const data_set_t *ds, value_list_t *vl) /* {{{ */ +{ + value_list_t new_vl; + value_t new_value; + _Bool is_hits; + + if (vl->values_len != 4) + return (FC_TARGET_STOP); + + if (strcmp ("hits", vl->type_instance) == 0) + is_hits = 1; + else if (strcmp ("misses", vl->type_instance) == 0) + is_hits = 0; + else + return (FC_TARGET_STOP); + + /* Copy everything: Time, interval, host, ... */ + memcpy (&new_vl, vl, sizeof (new_vl)); + + /* Reset data we can't simply copy */ + new_vl.values = &new_value; + new_vl.values_len = 1; + new_vl.meta = NULL; + + /* Change the type to "cache_result" */ + sstrncpy (new_vl.type, "cache_result", sizeof (new_vl.type)); + + /* Dispatch new value lists instead of this one */ + new_vl.values[0].derive = (derive_t) vl->values[0].counter; + ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance), + "demand_data-%s", + is_hits ? "hit" : "miss"); + plugin_dispatch_values (&new_vl); + + new_vl.values[0].derive = (derive_t) vl->values[1].counter; + ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance), + "demand_metadata-%s", + is_hits ? "hit" : "miss"); + plugin_dispatch_values (&new_vl); + + new_vl.values[0].derive = (derive_t) vl->values[2].counter; + ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance), + "prefetch_data-%s", + is_hits ? "hit" : "miss"); + plugin_dispatch_values (&new_vl); + + new_vl.values[0].derive = (derive_t) vl->values[3].counter; + ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance), + "prefetch_metadata-%s", + is_hits ? "hit" : "miss"); + plugin_dispatch_values (&new_vl); + + /* Abort processing */ + return (FC_TARGET_STOP); +} /* }}} int v5_zfs_arc_counts */ + +/* + * ZFS ARC L2 bytes + * + * "arc_l2_bytes" -> "io_octets-L2". + */ +static int v5_zfs_arc_l2_bytes (const data_set_t *ds, value_list_t *vl) /* {{{ */ +{ + value_list_t new_vl; + value_t new_values[2]; + + if (vl->values_len != 2) + return (FC_TARGET_STOP); + + /* Copy everything: Time, interval, host, ... */ + memcpy (&new_vl, vl, sizeof (new_vl)); + + /* Reset data we can't simply copy */ + new_vl.values = new_values; + new_vl.values_len = 2; + new_vl.meta = NULL; + + /* Change the type/-instance to "io_octets-L2" */ + sstrncpy (new_vl.type, "io_octets", sizeof (new_vl.type)); + sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance)); + + /* Copy the actual values. */ + new_vl.values[0].derive = (derive_t) vl->values[0].counter; + new_vl.values[1].derive = (derive_t) vl->values[1].counter; + + /* Dispatch new value lists instead of this one */ + plugin_dispatch_values (&new_vl); + + /* Abort processing */ + return (FC_TARGET_STOP); +} /* }}} int v5_zfs_arc_l2_bytes */ + +/* + * ZFS ARC L2 cache size + * + * 4.* uses a separate type for this. 5.* uses the generic "cache_size" type + * instead. + */ +static int v5_zfs_arc_l2_size (const data_set_t *ds, value_list_t *vl) /* {{{ */ +{ + value_list_t new_vl; + value_t new_value; + + if (vl->values_len != 1) + return (FC_TARGET_STOP); + + /* Copy everything: Time, interval, host, ... */ + memcpy (&new_vl, vl, sizeof (new_vl)); + + /* Reset data we can't simply copy */ + new_vl.values = &new_value; + new_vl.values_len = 1; + new_vl.meta = NULL; + + new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge; + + /* Change the type to "cache_size" */ + sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type)); + + /* Adapt the type instance */ + sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance)); + + /* Dispatch new value lists instead of this one */ + plugin_dispatch_values (&new_vl); + + /* Abort processing */ + return (FC_TARGET_STOP); +} /* }}} int v5_zfs_arc_l2_size */ + +/* + * ZFS ARC ratio + * + * "arc_ratio-L1" -> "cache_ratio-arc" + * "arc_ratio-L2" -> "cache_ratio-L2" + */ +static int v5_zfs_arc_ratio (const data_set_t *ds, value_list_t *vl) /* {{{ */ +{ + value_list_t new_vl; + value_t new_value; + + if (vl->values_len != 1) + return (FC_TARGET_STOP); + + /* Copy everything: Time, interval, host, ... */ + memcpy (&new_vl, vl, sizeof (new_vl)); + + /* Reset data we can't simply copy */ + new_vl.values = &new_value; + new_vl.values_len = 1; + new_vl.meta = NULL; + + new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge; + + /* Change the type to "cache_ratio" */ + sstrncpy (new_vl.type, "cache_ratio", sizeof (new_vl.type)); + + /* Adapt the type instance */ + if (strcmp ("L1", vl->type_instance) == 0) + sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance)); + + /* Dispatch new value lists instead of this one */ + plugin_dispatch_values (&new_vl); + + /* Abort processing */ + return (FC_TARGET_STOP); +} /* }}} int v5_zfs_arc_ratio */ + +/* + * ZFS ARC size + * + * 4.* uses the "arc_size" type with four data sources. In 5.* this has been + * replaces with the "cache_size" type and static data has been removed. + */ +static int v5_zfs_arc_size (const data_set_t *ds, value_list_t *vl) /* {{{ */ +{ + value_list_t new_vl; + value_t new_value; + + if (vl->values_len != 4) + return (FC_TARGET_STOP); + + /* Copy everything: Time, interval, host, ... */ + memcpy (&new_vl, vl, sizeof (new_vl)); + + /* Reset data we can't simply copy */ + new_vl.values = &new_value; + new_vl.values_len = 1; + new_vl.meta = NULL; + + /* Change the type to "cache_size" */ + sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type)); + + /* Dispatch new value lists instead of this one */ + new_vl.values[0].derive = (derive_t) vl->values[0].counter; + sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance)); + plugin_dispatch_values (&new_vl); + + /* Abort processing */ + return (FC_TARGET_STOP); +} /* }}} int v5_zfs_arc_size */ + static int v5_destroy (void **user_data) /* {{{ */ { return (0); @@ -236,6 +443,16 @@ static int v5_invoke (const data_set_t *ds, value_list_t *vl, /* {{{ */ return (v5_mysql_qcache (ds, vl)); else if (strcmp ("mysql_threads", vl->type) == 0) return (v5_mysql_threads (ds, vl)); + else if (strcmp ("arc_counts", vl->type) == 0) + return (v5_zfs_arc_counts (ds, vl)); + else if (strcmp ("arc_l2_bytes", vl->type) == 0) + return (v5_zfs_arc_l2_bytes (ds, vl)); + else if (strcmp ("arc_l2_size", vl->type) == 0) + return (v5_zfs_arc_l2_size (ds, vl)); + else if (strcmp ("arc_ratio", vl->type) == 0) + return (v5_zfs_arc_ratio (ds, vl)); + else if (strcmp ("arc_size", vl->type) == 0) + return (v5_zfs_arc_size (ds, vl)); return (FC_TARGET_CONTINUE); } /* }}} int v5_invoke */ diff --git a/src/types.db b/src/types.db index 1b0020f6..04849832 100644 --- a/src/types.db +++ b/src/types.db @@ -28,6 +28,7 @@ cpufreq value:GAUGE:0:U cpu value:COUNTER:0:4294967295 current value:GAUGE:U:U current_connections value:GAUGE:0:U +current_sessions value:GAUGE:0:U delay seconds:GAUGE:-1000000:1000000 derive value:DERIVE:0:U df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623 @@ -165,6 +166,7 @@ total_time_in_ms value:DERIVE:0:U total_values value:DERIVE:0:U uptime value:GAUGE:0:4294967295 users users:GAUGE:0:65535 +vcpu value:GAUGE:0:U virt_cpu_total ns:COUNTER:0:256000000000 virt_vcpu ns:COUNTER:0:1000000000 vmpage_action value:COUNTER:0:4294967295 @@ -177,4 +179,3 @@ voltage value:GAUGE:U:U vs_memory value:GAUGE:0:9223372036854775807 vs_processes value:GAUGE:0:65535 vs_threads value:GAUGE:0:65535 -pinba_view req_per_sec:GAUGE:0:U, req_time:GAUGE:0:U, ru_utime:GAUGE:0:U, ru_stime:GAUGE:0:U, doc_size:GAUGE:0:U, mem_peak:GAUGE:0:U diff --git a/src/unixsock.c b/src/unixsock.c index 0b897482..6de13956 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -54,7 +54,8 @@ static const char *config_keys[] = { "SocketFile", "SocketGroup", - "SocketPerms" + "SocketPerms", + "DeleteSocket" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); @@ -65,6 +66,7 @@ static int sock_fd = -1; static char *sock_file = NULL; static char *sock_group = NULL; static int sock_perms = S_IRWXU | S_IRWXG; +static _Bool delete_socket = 0; static pthread_t listen_thread = (pthread_t) 0; @@ -89,10 +91,27 @@ static int us_open_socket (void) sa.sun_family = AF_UNIX; sstrncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, sizeof (sa.sun_path)); - /* unlink (sa.sun_path); */ DEBUG ("unixsock plugin: socket path = %s", sa.sun_path); + if (delete_socket) + { + errno = 0; + status = unlink (sa.sun_path); + if ((status != 0) && (errno != ENOENT)) + { + char errbuf[1024]; + WARNING ("unixsock plugin: Deleting socket file \"%s\" failed: %s", + sa.sun_path, + sstrerror (errno, errbuf, sizeof (errbuf))); + } + else if (status == 0) + { + INFO ("unixsock plugin: Successfully deleted socket file \"%s\".", + sa.sun_path); + } + } + status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { @@ -392,6 +411,13 @@ static int us_config (const char *key, const char *val) { sock_perms = (int) strtol (val, NULL, 8); } + else if (strcasecmp (key, "DeleteSocket") == 0) + { + if (IS_TRUE (val)) + delete_socket = 1; + else + delete_socket = 0; + } else { return (-1); diff --git a/src/utils_cache.c b/src/utils_cache.c index 10e8a48e..20b12375 100644 --- a/src/utils_cache.c +++ b/src/utils_cache.c @@ -175,7 +175,7 @@ static int uc_send_notification (const char *name) } /* Check if the entry has been updated in the meantime */ - if ((n.time - ce->last_update) < (2 * ce->interval)) + if ((n.time - ce->last_update) < (timeout_g * ce->interval)) { ce->state = STATE_OKAY; pthread_mutex_unlock (&cache_lock); diff --git a/src/utils_match.c b/src/utils_match.c index 4d4b57d0..062bcfe3 100644 --- a/src/utils_match.c +++ b/src/utils_match.c @@ -228,7 +228,7 @@ cu_match_t *match_create_callback (const char *regex, const char *excluderegex, return (NULL); memset (obj, '\0', sizeof (cu_match_t)); - status = regcomp (&obj->regex, regex, REG_EXTENDED); + status = regcomp (&obj->regex, regex, REG_EXTENDED | REG_NEWLINE); if (status != 0) { ERROR ("Compiling the regular expression \"%s\" failed.", regex); diff --git a/src/utils_tail.c b/src/utils_tail.c index 904a5212..5b7551d3 100644 --- a/src/utils_tail.c +++ b/src/utils_tail.c @@ -220,6 +220,8 @@ int cu_tail_read (cu_tail_t *obj, char *buf, int buflen, tailfunc_t *callback, while (42) { + size_t len; + status = cu_tail_readline (obj, buf, buflen); if (status != 0) { @@ -232,6 +234,13 @@ int cu_tail_read (cu_tail_t *obj, char *buf, int buflen, tailfunc_t *callback, if (buf[0] == 0) break; + len = strlen (buf); + while (len > 0) { + if (buf[len - 1] != '\n') + break; + buf[len - 1] = '\0'; + } + status = callback (data, buf, buflen); if (status != 0) { diff --git a/src/zfs_arc.c b/src/zfs_arc.c index 5f14e909..bc2b0d2b 100644 --- a/src/zfs_arc.c +++ b/src/zfs_arc.c @@ -46,54 +46,48 @@ static void za_submit (const char* type, const char* type_instance, value_t* val static void za_submit_gauge (const char* type, const char* type_instance, gauge_t value) { - value_t values[1]; + value_t vv; - values[0].gauge = value; - - za_submit (type, type_instance, values, STATIC_ARRAY_SIZE(values)); + vv.gauge = value; + za_submit (type, type_instance, &vv, 1); } -static void za_submit_size (gauge_t size, gauge_t size_target, gauge_t limit_min, gauge_t limit_max) +static void za_submit_derive (const char* type, const char* type_instance, derive_t dv) { - value_t values[4]; - - values[0].gauge = size; - values[1].gauge = size_target; - values[2].gauge = limit_min; - values[3].gauge = limit_max; + value_t vv; - za_submit ("arc_size", "", values, STATIC_ARRAY_SIZE(values)); + vv.derive = dv; + za_submit (type, type_instance, &vv, 1); } -static void za_submit_bytes (counter_t read, counter_t write) +static void za_submit_ratio (const char* type_instance, gauge_t hits, gauge_t misses) { - value_t values[2]; + gauge_t ratio = NAN; - values[0].counter = read; - values[1].counter = write; + if (!isfinite (hits) || (hits < 0.0)) + hits = 0.0; + if (!isfinite (misses) || (misses < 0.0)) + misses = 0.0; - za_submit ("arc_l2_bytes", "", values, STATIC_ARRAY_SIZE(values)); -} - -static void za_submit_counts (char *type_instance, counter_t demand_data, counter_t demand_metadata, - counter_t prefetch_data, counter_t prefetch_metadata) -{ - value_t values[4]; + if ((hits != 0.0) || (misses != 0.0)) + ratio = hits / (hits + misses); - values[0].counter = demand_data; - values[1].counter = demand_metadata; - values[2].counter = prefetch_data; - values[3].counter = prefetch_metadata; - - za_submit ("arc_counts", type_instance, values, STATIC_ARRAY_SIZE(values)); + za_submit_gauge ("cache_ratio", type_instance, ratio); } static int za_read (void) { - gauge_t arcsize, targetsize, minlimit, maxlimit, hits, misses, l2_size, l2_hits, l2_misses; - counter_t demand_data_hits, demand_metadata_hits, prefetch_data_hits, prefetch_metadata_hits; - counter_t demand_data_misses, demand_metadata_misses, prefetch_data_misses, prefetch_metadata_misses; - counter_t l2_read_bytes, l2_write_bytes; + gauge_t arc_size, l2_size; + derive_t demand_data_hits, + demand_metadata_hits, + prefetch_data_hits, + prefetch_metadata_hits, + demand_data_misses, + demand_metadata_misses, + prefetch_data_misses, + prefetch_metadata_misses; + gauge_t arc_hits, arc_misses, arc_ratio, l2_hits, l2_misses, l2_ratio; + value_t l2_io[2]; get_kstat (&ksp, "zfs", 0, "arcstats"); if (ksp == NULL) @@ -102,11 +96,14 @@ static int za_read (void) return (-1); } - arcsize = get_kstat_value(ksp, "size"); - targetsize = get_kstat_value(ksp, "c"); - minlimit = get_kstat_value(ksp, "c_min"); - maxlimit = get_kstat_value(ksp, "c_max"); + /* Sizes */ + arc_size = get_kstat_value(ksp, "size"); + l2_size = get_kstat_value(ksp, "l2_size"); + + za_submit_gauge ("cache_size", "arc", arc_size); + za_submit_gauge ("cache_size", "L2", l2_size); + /* Hits / misses */ demand_data_hits = get_kstat_value(ksp, "demand_data_hits"); demand_metadata_hits = get_kstat_value(ksp, "demand_metadata_hits"); prefetch_data_hits = get_kstat_value(ksp, "prefetch_data_hits"); @@ -117,31 +114,33 @@ static int za_read (void) prefetch_data_misses = get_kstat_value(ksp, "prefetch_data_misses"); prefetch_metadata_misses = get_kstat_value(ksp, "prefetch_metadata_misses"); - hits = get_kstat_value(ksp, "hits"); - misses = get_kstat_value(ksp, "misses"); + za_submit_derive ("cache_result", "demand_data-hit", demand_data_hits); + za_submit_derive ("cache_result", "demand_metadata-hit", demand_metadata_hits); + za_submit_derive ("cache_result", "prefetch_data-hit", prefetch_data_hits); + za_submit_derive ("cache_result", "prefetch_metadata-hit", prefetch_metadata_hits); - l2_size = get_kstat_value(ksp, "l2_size"); - l2_read_bytes = get_kstat_value(ksp, "l2_read_bytes"); - l2_write_bytes = get_kstat_value(ksp, "l2_write_bytes"); - l2_hits = get_kstat_value(ksp, "l2_hits"); - l2_misses = get_kstat_value(ksp, "l2_misses"); + za_submit_derive ("cache_result", "demand_data-miss", demand_data_misses); + za_submit_derive ("cache_result", "demand_metadata-miss", demand_metadata_misses); + za_submit_derive ("cache_result", "prefetch_data-miss", prefetch_data_misses); + za_submit_derive ("cache_result", "prefetch_metadata-miss", prefetch_metadata_misses); + /* Ratios */ + arc_hits = (gauge_t) get_kstat_value(ksp, "hits"); + arc_misses = (gauge_t) get_kstat_value(ksp, "misses"); + l2_hits = (gauge_t) get_kstat_value(ksp, "l2_hits"); + l2_misses = (gauge_t) get_kstat_value(ksp, "l2_misses"); - za_submit_size (arcsize, targetsize, minlimit, maxlimit); - za_submit_gauge ("arc_l2_size", "", l2_size); + za_submit_ratio ("arc", arc_hits, arc_misses); + za_submit_ratio ("L2", l2_hits, l2_misses); - za_submit_counts ("hits", demand_data_hits, demand_metadata_hits, - prefetch_data_hits, prefetch_metadata_hits); - za_submit_counts ("misses", demand_data_misses, demand_metadata_misses, - prefetch_data_misses, prefetch_metadata_misses); + /* I/O */ + l2_io[0].derive = get_kstat_value(ksp, "l2_read_bytes"); + l2_io[1].derive = get_kstat_value(ksp, "l2_write_bytes"); - za_submit_gauge ("arc_ratio", "L1", hits / (hits + misses)); - za_submit_gauge ("arc_ratio", "L2", l2_hits / (l2_hits + l2_misses)); - - za_submit_bytes (l2_read_bytes, l2_write_bytes); + za_submit ("io_octets", "L2", l2_io, /* num values = */ 2); return (0); -} +} /* int za_read */ static int za_init (void) /* {{{ */ {