From: Florian Forster Date: Sat, 6 Nov 2010 08:40:05 +0000 (+0100) Subject: Merge branch 'ar/lpar' X-Git-Tag: collectd-5.0.0-beta0~27 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=fbb822b876e877c69be066ceca693769d4a9618a;hp=584b130e51fc0e8214c8b4499b404e8728356fd9;p=collectd.git Merge branch 'ar/lpar' --- diff --git a/AUTHORS b/AUTHORS index e83c2f84..6193a2a5 100644 --- a/AUTHORS +++ b/AUTHORS @@ -162,6 +162,9 @@ Rodolphe Quiédeville Scott Garrett - tape plugin. +Sebastien Pahl + - AMQP plugin. + Simon Kuhnle - OpenBSD code for the cpu and memory plugins. diff --git a/README b/README index 0c7a4221..436f1468 100644 --- a/README +++ b/README @@ -315,6 +315,10 @@ Features * Output can be written or sent to various destinations by the following plugins: + - amqp + Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) + server, such as RabbitMQ. + - csv Write to comma separated values (CSV) files. This needs lots of diskspace but is extremely portable and can be analysed with almost @@ -615,6 +619,10 @@ Prerequisites Used by the `python' plugin. Currently, only 2.3 ≦ Python < 3 is supported. + * librabbitmq (optional; also called “rabbitmq-c”) + Used by the AMQP plugin for AMQP connections, for example to RabbitMQ. + + * librouteros (optional) Used by the `routeros' plugin to connect to a device running `RouterOS'. diff --git a/configure.in b/configure.in index 5abca73a..66785583 100644 --- a/configure.in +++ b/configure.in @@ -3115,6 +3115,57 @@ then fi # }}} --with-python +# --with-librabbitmq {{{ +with_librabbitmq_cppflags="" +with_librabbitmq_ldflags="" +AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])], +[ + if test "x$withval" != "xno" && test "x$withval" != "xyes" + then + with_librabbitmq_cppflags="-I$withval/include" + with_librabbitmq_ldflags="-L$withval/lib" + with_librabbitmq="yes" + else + with_librabbitmq="$withval" + fi +], +[ + with_librabbitmq="yes" +]) +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + + AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags" + + AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags" + BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags" + BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq" + AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS) + AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.]) +fi +AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") +# }}} + # --with-librouteros {{{ AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])], [ @@ -4340,6 +4391,7 @@ then fi if test "x$have_sysctlbyname" = "xyes" then + plugin_contextswitch="yes" plugin_cpu="yes" plugin_memory="yes" plugin_tcpconns="yes" @@ -4467,6 +4519,7 @@ AC_ARG_ENABLE([all-plugins], m4_divert_once([HELP_ENABLE], []) +AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple's hardware sensors]) @@ -4767,6 +4820,7 @@ Configuration: libperl . . . . . . . $with_libperl libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread + librabbitmq . . . . . $with_librabbitmq librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors @@ -4791,6 +4845,7 @@ Configuration: perl . . . . . . . . $with_perl_bindings Modules: + amqp . . . . . . . $enable_amqp apache . . . . . . . $enable_apache apcups . . . . . . . $enable_apcups apple_sensors . . . . $enable_apple_sensors diff --git a/src/Makefile.am b/src/Makefile.am index 74c64305..247892bb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -123,6 +123,18 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if BUILD_PLUGIN_AMQP +pkglib_LTLIBRARIES += amqp.la +amqp_la_SOURCES = amqp.c \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_format_json.c utils_format_json.h +amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) +amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) +amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) +collectd_LDADD += "-dlopen" amqp.la +collectd_DEPENDENCIES += amqp.la +endif + if BUILD_PLUGIN_APACHE pkglib_LTLIBRARIES += apache.la apache_la_SOURCES = apache.c diff --git a/src/amqp.c b/src/amqp.c new file mode 100644 index 00000000..f0abd44b --- /dev/null +++ b/src/amqp.c @@ -0,0 +1,939 @@ +/** + * collectd - src/amqp.c + * Copyright (C) 2009 Sebastien Pahl + * Copyright (C) 2010 Florian Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastien Pahl + * Florian Forster + **/ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" +#include "utils_cmd_putval.h" +#include "utils_format_json.h" + +#include + +#include +#include + +/* Defines for the delivery mode. I have no idea why they're not defined by the + * library.. */ +#define CAMQP_DM_VOLATILE 1 +#define CAMQP_DM_PERSISTENT 2 + +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 + +#define CAMQP_CHANNEL 1 + +/* + * Data types + */ +struct camqp_config_s +{ + _Bool publish; + char *name; + + char *host; + int port; + char *vhost; + char *user; + char *password; + + char *exchange; + char *routing_key; + + /* publish only */ + uint8_t delivery_mode; + _Bool store_rates; + int format; + + /* subscribe only */ + char *exchange_type; + char *queue; + + amqp_connection_state_t connection; + pthread_mutex_t lock; +}; +typedef struct camqp_config_s camqp_config_t; + +/* + * Global variables + */ +static const char *def_host = "localhost"; +static const char *def_vhost = "/"; +static const char *def_user = "guest"; +static const char *def_password = "guest"; +static const char *def_exchange = "amq.fanout"; + +static pthread_t *subscriber_threads = NULL; +static size_t subscriber_threads_num = 0; +static _Bool subscriber_threads_running = 1; + +#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f) + +/* + * Functions + */ +static void camqp_close_connection (camqp_config_t *conf) /* {{{ */ +{ + int sockfd; + + if ((conf == NULL) || (conf->connection == NULL)) + return; + + sockfd = amqp_get_sockfd (conf->connection); + amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); + close (sockfd); + conf->connection = NULL; +} /* }}} void camqp_close_connection */ + +static void camqp_config_free (void *ptr) /* {{{ */ +{ + camqp_config_t *conf = ptr; + + if (conf == NULL) + return; + + camqp_close_connection (conf); + + sfree (conf->name); + sfree (conf->host); + sfree (conf->vhost); + sfree (conf->user); + sfree (conf->password); + sfree (conf->exchange); + sfree (conf->exchange_type); + sfree (conf->queue); + sfree (conf->routing_key); + + sfree (conf); +} /* }}} void camqp_config_free */ + +static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */ +{ + char *ret; + + if ((in == NULL) || (in->bytes == NULL)) + return (NULL); + + ret = malloc (in->len + 1); + if (ret == NULL) + return (NULL); + + memcpy (ret, in->bytes, in->len); + ret[in->len] = 0; + + return (ret); +} /* }}} char *camqp_bytes_cstring */ + +static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */ +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return (0); + + return (1); +} /* }}} _Bool camqp_is_error */ + +static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ + char *buffer, size_t buffer_size) +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + switch (r.reply_type) + { + case AMQP_RESPONSE_NORMAL: + sstrncpy (buffer, "Success", sizeof (buffer)); + break; + + case AMQP_RESPONSE_NONE: + sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + return (sstrerror (r.library_errno, buffer, buffer_size)); + else + sstrncpy (buffer, "End of stream", sizeof (buffer)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) + { + amqp_connection_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server connection error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) + { + amqp_channel_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server channel error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else + { + ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32, + r.reply.id); + } + break; + + default: + ssnprintf (buffer, buffer_size, "Unknown reply type %i", + (int) r.reply_type); + } + + return (buffer); +} /* }}} char *camqp_strerror */ + +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + + if (conf->exchange_type == NULL) + return (0); + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ + +static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ +{ + amqp_queue_declare_ok_t *qd_ret; + amqp_basic_consume_ok_t *cm_ret; + + qd_ret = amqp_queue_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ (conf->queue != NULL) + ? amqp_cstring_bytes (conf->queue) + : AMQP_EMPTY_BYTES, + /* passive = */ 0, + /* durable = */ 0, + /* exclusive = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if (qd_ret == NULL) + { + ERROR ("amqp plugin: amqp_queue_declare failed."); + camqp_close_connection (conf); + return (-1); + } + + if (conf->queue == NULL) + { + conf->queue = camqp_bytes_cstring (&qd_ret->queue); + if (conf->queue == NULL) + { + ERROR ("amqp plugin: camqp_bytes_cstring failed."); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Created queue \"%s\".", conf->queue); + } + DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue); + + /* bind to an exchange */ + if (conf->exchange != NULL) + { + amqp_queue_bind_ok_t *qb_ret; + + assert (conf->queue != NULL); + qb_ret = amqp_queue_bind (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* routing_key = */ (conf->routing_key != NULL) + ? amqp_cstring_bytes (conf->routing_key) + : AMQP_EMPTY_BYTES, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((qb_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_queue_bind failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".", + conf->queue, conf->exchange); + } /* if (conf->exchange != NULL) */ + + cm_ret = amqp_basic_consume (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* consumer_tag = */ AMQP_EMPTY_BYTES, + /* no_local = */ 0, + /* no_ack = */ 1, + /* exclusive = */ 0); + if ((cm_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_basic_consume failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + return (0); +} /* }}} int camqp_setup_queue */ + +static int camqp_connect (camqp_config_t *conf) /* {{{ */ +{ + amqp_rpc_reply_t reply; + int sockfd; + int status; + + if (conf->connection != NULL) + return (0); + + conf->connection = amqp_new_connection (); + if (conf->connection == NULL) + { + ERROR ("amqp plugin: amqp_new_connection failed."); + return (ENOMEM); + } + + sockfd = amqp_open_socket (CONF(conf, host), conf->port); + if (sockfd < 0) + { + char errbuf[1024]; + status = (-1) * sockfd; + ERROR ("amqp plugin: amqp_open_socket failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } + amqp_set_sockfd (conf->connection, sockfd); + + reply = amqp_login (conf->connection, CONF(conf, vhost), + /* channel max = */ 0, + /* frame max = */ 131072, + /* heartbeat = */ 0, + /* authentication = */ AMQP_SASL_METHOD_PLAIN, + CONF(conf, user), CONF(conf, password)); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", + CONF(conf, vhost), CONF(conf, user)); + amqp_destroy_connection (conf->connection); + close (sockfd); + conf->connection = NULL; + return (1); + } + + amqp_channel_open (conf->connection, /* channel = */ 1); + /* FIXME: Is checking "reply.reply_type" really correct here? How does + * it get set? --octo */ + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_channel_open failed."); + amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (conf->connection); + close(sockfd); + conf->connection = NULL; + return (1); + } + + INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " + "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); + + status = camqp_create_exchange (conf); + if (status != 0) + return (status); + + if (!conf->publish) + return (camqp_setup_queue (conf)); + return (0); +} /* }}} int camqp_connect */ + +static int camqp_shutdown (void) /* {{{ */ +{ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + + return (0); +} /* }}} int camqp_shutdown */ + +/* + * Subscribing code + */ +static int camqp_read_body (camqp_config_t *conf, /* {{{ */ + size_t body_size, const char *content_type) +{ + char body[body_size + 1]; + char *body_ptr; + size_t received; + amqp_frame_t frame; + int status; + + memset (body, 0, sizeof (body)); + body_ptr = &body[0]; + received = 0; + + while (received < body_size) + { + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_BODY) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + if ((body_size - received) < frame.payload.body_fragment.len) + { + WARNING ("amqp plugin: Body is larger than indicated by header."); + return (-1); + } + + memcpy (body_ptr, frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + body_ptr += frame.payload.body_fragment.len; + received += frame.payload.body_fragment.len; + } /* while (received < body_size) */ + + if (strcasecmp ("text/collectd", content_type) == 0) + { + status = handle_putval (stderr, body); + if (status != 0) + ERROR ("amqp plugin: handle_putval failed with status %i.", + status); + return (status); + } + else if (strcasecmp ("application/json", content_type) == 0) + { + ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not " + "been implemented yet. FIXME!"); + return (0); + } + else + { + ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".", + content_type); + return (EINVAL); + } + + /* not reached */ + return (0); +} /* }}} int camqp_read_body */ + +static int camqp_read_header (camqp_config_t *conf) /* {{{ */ +{ + int status; + amqp_frame_t frame; + amqp_basic_properties_t *properties; + char *content_type; + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_HEADER) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + properties = frame.payload.properties.decoded; + content_type = camqp_bytes_cstring (&properties->content_type); + if (content_type == NULL) + { + ERROR ("amqp plugin: Unable to determine content type."); + return (-1); + } + + status = camqp_read_body (conf, + (size_t) frame.payload.properties.body_size, + content_type); + + sfree (content_type); + return (status); +} /* }}} int camqp_read_header */ + +static void *camqp_subscribe_thread (void *user_data) /* {{{ */ +{ + camqp_config_t *conf = user_data; + int status; + + while (subscriber_threads_running) + { + amqp_frame_t frame; + + status = camqp_connect (conf); + if (status != 0) + { + ERROR ("amqp plugin: camqp_connect failed. " + "Will sleep for %i seconds.", interval_g); + sleep (interval_g); + continue; + } + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + ERROR ("amqp plugin: amqp_simple_wait_frame failed. " + "Will sleep for %i seconds.", interval_g); + camqp_close_connection (conf); + sleep (interval_g); + continue; + } + + if (frame.frame_type != AMQP_FRAME_METHOD) + { + DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + continue; + } + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + { + DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32, + frame.payload.method.id); + continue; + } + + status = camqp_read_header (conf); + + amqp_maybe_release_buffers (conf->connection); + } /* while (subscriber_threads_running) */ + + camqp_config_free (conf); + pthread_exit (NULL); +} /* }}} void *camqp_subscribe_thread */ + +static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ +{ + int status; + pthread_t *tmp; + + tmp = realloc (subscriber_threads, + sizeof (*subscriber_threads) * (subscriber_threads_num + 1)); + if (tmp == NULL) + { + ERROR ("amqp plugin: realloc failed."); + camqp_config_free (conf); + return (ENOMEM); + } + subscriber_threads = tmp; + tmp = subscriber_threads + subscriber_threads_num; + memset (tmp, 0, sizeof (*tmp)); + + status = pthread_create (tmp, /* attr = */ NULL, + camqp_subscribe_thread, conf); + if (status != 0) + { + char errbuf[1024]; + ERROR ("amqp plugin: pthread_create failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_config_free (conf); + return (status); + } + + subscriber_threads_num++; + + return (0); +} /* }}} int camqp_subscribe_init */ + +/* + * Publishing code + */ +/* XXX: You must hold "conf->lock" when calling this function! */ +static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ + const char *buffer, const char *routing_key) +{ + amqp_basic_properties_t props; + int status; + + status = camqp_connect (conf); + if (status != 0) + return (status); + + memset (&props, 0, sizeof (props)); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG + | AMQP_BASIC_APP_ID_FLAG; + if (conf->format == CAMQP_FORMAT_COMMAND) + props.content_type = amqp_cstring_bytes("text/collectd"); + else if (conf->format == CAMQP_FORMAT_JSON) + props.content_type = amqp_cstring_bytes("application/json"); + else + assert (23 == 42); + props.delivery_mode = conf->delivery_mode; + props.app_id = amqp_cstring_bytes("collectd"); + + status = amqp_basic_publish(conf->connection, + /* channel = */ 1, + amqp_cstring_bytes(CONF(conf, exchange)), + amqp_cstring_bytes (routing_key), + /* mandatory = */ 0, + /* immediate = */ 0, + &props, + amqp_cstring_bytes(buffer)); + if (status != 0) + { + ERROR ("amqp plugin: amqp_basic_publish failed with status %i.", + status); + camqp_close_connection (conf); + } + + return (status); +} /* }}} int camqp_write_locked */ + +static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) +{ + camqp_config_t *conf = user_data->data; + char routing_key[6 * DATA_MAX_NAME_LEN]; + char buffer[4096]; + int status; + + if ((ds == NULL) || (vl == NULL) || (conf == NULL)) + return (EINVAL); + + memset (buffer, 0, sizeof (buffer)); + + if (conf->routing_key != NULL) + { + sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); + } + else + { + size_t i; + ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", + vl->host, + vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance); + + /* Switch slashes (the only character forbidden by collectd) and dots + * (the separation character used by AMQP). */ + for (i = 0; routing_key[i] != 0; i++) + { + if (routing_key[i] == '.') + routing_key[i] = '/'; + else if (routing_key[i] == '/') + routing_key[i] = '.'; + } + } + + if (conf->format == CAMQP_FORMAT_COMMAND) + { + status = create_putval (buffer, sizeof (buffer), ds, vl); + if (status != 0) + { + ERROR ("amqp plugin: create_putval failed with status %i.", + status); + return (status); + } + } + else if (conf->format == CAMQP_FORMAT_JSON) + { + size_t bfree = sizeof (buffer); + size_t bfill = 0; + + format_json_initialize (buffer, &bfill, &bfree); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); + format_json_finalize (buffer, &bfill, &bfree); + } + else + { + ERROR ("amqp plugin: Invalid format (%i).", conf->format); + return (-1); + } + + pthread_mutex_lock (&conf->lock); + status = camqp_write_locked (conf, buffer, routing_key); + pthread_mutex_unlock (&conf->lock); + + return (status); +} /* }}} int camqp_write */ + +/* + * Config handling + */ +static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ + camqp_config_t *conf) +{ + char *string; + int status; + + string = NULL; + status = cf_util_get_string (ci, &string); + if (status != 0) + return (status); + + assert (string != NULL); + if (strcasecmp ("Command", string) == 0) + conf->format = CAMQP_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + conf->format = CAMQP_FORMAT_JSON; + else + { + WARNING ("amqp plugin: Invalid format string: %s", + string); + } + + free (string); + + return (0); +} /* }}} int config_set_string */ + +static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ + _Bool publish) +{ + camqp_config_t *conf; + int status; + int i; + + conf = malloc (sizeof (*conf)); + if (conf == NULL) + { + ERROR ("amqp plugin: malloc failed."); + return (ENOMEM); + } + + /* Initialize "conf" {{{ */ + memset (conf, 0, sizeof (*conf)); + conf->publish = publish; + conf->name = NULL; + conf->format = CAMQP_FORMAT_COMMAND; + conf->host = NULL; + conf->port = 5672; + conf->vhost = NULL; + conf->user = NULL; + conf->password = NULL; + conf->exchange = NULL; + conf->routing_key = NULL; + /* publish only */ + conf->delivery_mode = CAMQP_DM_VOLATILE; + conf->store_rates = 0; + /* subscribe only */ + conf->exchange_type = NULL; + conf->queue = NULL; + /* general */ + conf->connection = NULL; + pthread_mutex_init (&conf->lock, /* attr = */ NULL); + /* }}} */ + + status = cf_util_get_string (ci, &conf->name); + if (status != 0) + { + sfree (conf); + return (status); + } + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &conf->host); + else if (strcasecmp ("Port", child->key) == 0) + { + status = cf_util_get_port_number (child); + if (status > 0) + { + conf->port = status; + status = 0; + } + } + else if (strcasecmp ("VHost", child->key) == 0) + status = cf_util_get_string (child, &conf->vhost); + else if (strcasecmp ("User", child->key) == 0) + status = cf_util_get_string (child, &conf->user); + else if (strcasecmp ("Password", child->key) == 0) + status = cf_util_get_string (child, &conf->password); + else if (strcasecmp ("Exchange", child->key) == 0) + status = cf_util_get_string (child, &conf->exchange); + else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish) + status = cf_util_get_string (child, &conf->exchange_type); + else if ((strcasecmp ("Queue", child->key) == 0) && !publish) + status = cf_util_get_string (child, &conf->queue); + else if (strcasecmp ("RoutingKey", child->key) == 0) + status = cf_util_get_string (child, &conf->routing_key); + else if ((strcasecmp ("Persistent", child->key) == 0) && publish) + { + _Bool tmp = 0; + status = cf_util_get_boolean (child, &tmp); + if (tmp) + conf->delivery_mode = CAMQP_DM_PERSISTENT; + else + conf->delivery_mode = CAMQP_DM_VOLATILE; + } + else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) + status = cf_util_get_boolean (child, &conf->store_rates); + else if ((strcasecmp ("Format", child->key) == 0) && publish) + status = camqp_config_set_format (child, conf); + else + WARNING ("amqp plugin: Ignoring unknown " + "configuration option \"%s\".", child->key); + + if (status != 0) + break; + } /* for (i = 0; i < ci->children_num; i++) */ + + if ((status == 0) && (conf->exchange == NULL)) + { + if (conf->exchange_type != NULL) + WARNING ("amqp plugin: The option \"ExchangeType\" was given " + "without the \"Exchange\" option. It will be ignored."); + + if (!publish && (conf->routing_key != NULL)) + WARNING ("amqp plugin: The option \"RoutingKey\" was given " + "without the \"Exchange\" option. It will be ignored."); + + } + + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + + if (conf->exchange != NULL) + { + DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;", + conf->exchange); + } + + if (publish) + { + char cbname[128]; + user_data_t ud = { conf, camqp_config_free }; + + ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name); + + status = plugin_register_write (cbname, camqp_write, &ud); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } + else + { + status = camqp_subscribe_init (conf); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } + + return (0); +} /* }}} int camqp_config_connection */ + +static int camqp_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Publish", child->key) == 0) + camqp_config_connection (child, /* publish = */ 1); + else if (strcasecmp ("Subscribe", child->key) == 0) + camqp_config_connection (child, /* publish = */ 0); + else + WARNING ("amqp plugin: Ignoring unknown config option \"%s\".", + child->key); + } /* for (ci->children_num) */ + + return (0); +} /* }}} int camqp_config */ + +void module_register (void) +{ + plugin_register_complex_config ("amqp", camqp_config); + plugin_register_shutdown ("amqp", camqp_shutdown); +} /* void module_register */ + +/* vim: set sw=4 sts=4 et fdm=marker : */ diff --git a/src/apache.c b/src/apache.c index 3d6d957c..506ba84e 100644 --- a/src/apache.c +++ b/src/apache.c @@ -1,6 +1,6 @@ /** * collectd - src/apache.c - * Copyright (C) 2006-2009 Florian octo Forster + * Copyright (C) 2006-2010 Florian octo Forster * Copyright (C) 2007 Florent EppO Monbillard * Copyright (C) 2009 Amit Gupta * @@ -144,6 +144,8 @@ static size_t apache_header_callback (void *buf, size_t size, size_t nmemb, st->server_type = APACHE; else if (strstr (buf, "lighttpd") != NULL) st->server_type = LIGHTTPD; + else if (strstr (buf, "IBM_HTTP_Server") != NULL) + st->server_type = APACHE; else { const char *hdr = buf; @@ -333,57 +335,22 @@ static int config (oconfig_item_t *ci) { int status = 0; int i; - oconfig_item_t *lci = NULL; /* legacy config */ for (i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Instance", child->key) == 0 && child->children_num > 0) + if (strcasecmp ("Instance", child->key) == 0) config_add (child); else - { - /* legacy mode - convert to config */ - if (lci == NULL) - { - lci = malloc (sizeof(*lci)); - if (lci == NULL) - { - ERROR ("apache plugin: malloc failed."); - return (-1); - } - memset (lci, '\0', sizeof (*lci)); - } - - lci->children_num++; - lci->children = - realloc (lci->children, - lci->children_num * sizeof (*child)); - if (lci->children == NULL) - { - ERROR ("apache plugin: realloc failed."); - return (-1); - } - memcpy (&lci->children[lci->children_num-1], child, sizeof (*child)); - } + WARNING ("apache plugin: The configuration option " + "\"%s\" is not allowed here. Did you " + "forget to add an block " + "around the configuration?", + child->key); } /* for (ci->children) */ - if (lci) - { - /* create a entry */ - lci->key = "Instance"; - lci->values_num = 1; - lci->values = (oconfig_value_t *) malloc (lci->values_num * sizeof (oconfig_value_t)); - lci->values[0].type = OCONFIG_TYPE_STRING; - lci->values[0].value.string = ""; - - status = config_add (lci); - sfree (lci->values); - sfree (lci->children); - sfree (lci); - } - - return status; + return (status); } /* int config */ /* initialize curl for each host */ @@ -420,6 +387,8 @@ static int init_host (apache_t *st) /* {{{ */ st->server_type = APACHE; else if (strcasecmp(st->server, "lighttpd") == 0) st->server_type = LIGHTTPD; + else if (strcasecmp(st->server, "ibm_http_server") == 0) + st->server_type = APACHE; else WARNING ("apache plugin: Unknown `Server' setting: %s", st->server); diff --git a/src/collectd.conf.in b/src/collectd.conf.in index cc125ddf..42addd27 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -52,6 +52,7 @@ # to missing dependencies or because they have been deactivated explicitly. # ############################################################################## +#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors @@ -150,11 +151,27 @@ # ription of those options is available in the collectd.conf(5) manual page. # ############################################################################## +# +# +# Host "localhost" +# Port "5672" +# VHost "/" +# User "guest" +# Password "guest" +# Exchange "amq.fanout" +# RoutingKey "collectd" +# Persistent false +# StoreRates false +# +# + # -# URL "http://localhost/status?auto" -# User "www-user" -# Password "secret" -# CACert "/etc/ssl/ca.crt" +# +# URL "http://localhost/status?auto" +# User "www-user" +# Password "secret" +# CACert "/etc/ssl/ca.crt" +# # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index af07cdf2..1da35982 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -181,6 +181,143 @@ A list of all plugins and a short summary for each plugin can be found in the F file shipped with the sourcecode and hopefully binary packets as well. +=head2 Plugin C + +The I can be used to communicate with other instances of +I or third party applications using an AMQP message broker. Values +are sent to or received from the broker, which handles routing, queueing and +possibly filtering or messages. + + + # Send values to an AMQP broker + + Host "localhost" + Port "5672" + VHost "/" + User "guest" + Password "guest" + Exchange "amq.fanout" + # ExchangeType "fanout" + # RoutingKey "collectd" + # Persistent false + # Format "command" + # StoreRates false + + + # Receive values from an AMQP broker + + Host "localhost" + Port "5672" + VHost "/" + User "guest" + Password "guest" + Exchange "amq.fanout" + # ExchangeType "fanout" + # Queue "queue_name" + # RoutingKey "collectd.#" + + + +The plugin's configuration consists of a number of I and I +blocks, which configure sending and receiving of values respectively. The two +blocks are very similar, so unless otherwise noted, an option can be used in +either block. The name given in the blocks starting tag is only used for +reporting messages, but may be used to support I of certain +I blocks in the future. + +=over 4 + +=item B I + +Hostname or IP-address of the AMQP broker. Defaults to the default behavior of +the underlying communications library, I, which is "localhost". + +=item B I + +Service name or port number on which the AMQP broker accepts connections. This +argument must be a string, even if the numeric form is used. Defaults to +"5672". + +=item B I + +Name of the I on the AMQP broker to use. Defaults to "/". + +=item B I + +=item B I + +Credentials used to authenticate to the AMQP broker. By default "guest"/"guest" +is used. + +=item B I + +In I blocks, this option specifies the I to send values to. +By default, "amq.fanout" will be used. + +In I blocks this option is optional. If given, a I between +the given exchange and the I is created, using the I if +configured. See the B and B options below. + +=item B I + +If given, the plugin will try to create the configured I with this +I after connecting. When in a I block, the I will then +be bound to this exchange. + +=item B I (Subscribe only) + +Configures the I name to subscribe to. If no queue name was configures +explicitly, a unique queue name will be created by the broker. + +=item B I + +In I blocks, this configures the routing key to set on all outgoing +messages. If not given, the routing key will be computed from the I +of the value. The host, plugin, type and the two instances are concatenated +together using dots as the separator and all containing dots replaced with +slashes. For example "collectd.host/example/com.cpu.0.cpu.user". This makes it +possible to receive only specific values using a "topic" exchange. + +In I blocks, configures the I used when creating a +I between an I and the I. The usual wildcards can be +used to filter messages when using a "topic" exchange. If you're only +interested in CPU statistics, you could use the routing key "collectd.*.cpu.#" +for example. + +=item B B|B (Publish only) + +Selects the I to use. If set to B, the I +mode will be used, i.e. delivery is guaranteed. If set to B (the +default), the I delivery mode will be used, i.e. messages may be +lost due to high load, overflowing queues or similar issues. + +=item B B|B (Publish only) + +Selects the format in which messages are sent to the broker. If set to +B (the default), values are sent as C commands which are +identical to the syntax used by the I and I. In this +case, the C header field will be set to C. + +If set to B, the values are encoded in the I, +an easy and straight forward exchange format. The C header field +will be set to C. + +A subscribing client I use the C header field to +determine how to decode the values. Currently, the I itself can +only decode the B format. + +=item B B|B (Publish only) + +Determines whether or not C, C and C data sources +are converted to a I (i.e. a C value). If set to B (the +default), no conversion is performed. Otherwise the conversion is performed +using the internal value cache. + +Please note that currently this option is only used if the B option has +been set to B. + +=back + =head2 Plugin C To configure the C-plugin you first need to configure the Apache @@ -199,7 +336,25 @@ Since its C module is very similar to Apache's, B is also supported. It introduces a new field, called C, to count the number of currently connected clients. This field is also supported. -The following options are accepted by the C-plugin: +The configuration of the I plugin consists of one or more +CInstanceE/E> blocks. Each block requires one string argument +as the instance name. For example: + + + + URL "http://www1.example.com/mod_status?auto" + + + URL "http://www2.example.com/mod_status?auto" + + + +The instance name will be used as the I. To emulate the old +(versionE4) behavior, you can use an empty string (""). In order for the +plugin to work correctly, each instance name must be unique. This is not +enforced by the plugin and it is your responsibility to ensure it. + +The following options are accepted within each I block: =over 4 @@ -207,7 +362,7 @@ The following options are accepted by the C-plugin: Sets the URL of the C output. This needs to be the output generated by C and it needs to be the machine readable output -generated by appending the C argument. +generated by appending the C argument. This option is I. =item B I @@ -2608,7 +2763,7 @@ operating systems. =item B I<1024-65535> Set the maximum size for datagrams received over the network. Packets larger -than this will be truncated. +than this will be truncated. Defaults to 1452Ebytes. =item B I diff --git a/src/collectd.h b/src/collectd.h index 8849b30b..6faa1a4e 100644 --- a/src/collectd.h +++ b/src/collectd.h @@ -56,21 +56,6 @@ #if HAVE_STDINT_H # include #endif -#if HAVE_STDBOOL_H -# include -#else -# ifndef HAVE__BOOL -# ifdef __cplusplus -typedef bool _Bool; -# else -# define _Bool signed char -# endif -# endif -# define bool _Bool -# define false 0 -# define true 1 -# define __bool_true_false_are_defined 1 -#endif #if HAVE_UNISTD_H # include #endif diff --git a/src/contextswitch.c b/src/contextswitch.c index 06055ca5..c207318f 100644 --- a/src/contextswitch.c +++ b/src/contextswitch.c @@ -1,6 +1,7 @@ /** * collectd - src/contextswitch.c * Copyright (C) 2009 Patrik Weiskircher + * Copyright (C) 2010 Kimo Rosenbaum * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,13 +18,26 @@ * * Authors: * Patrik Weiskircher + * Kimo Rosenbaum **/ #include "collectd.h" #include "common.h" #include "plugin.h" -#if !KERNEL_LINUX +#ifdef HAVE_SYS_SYSCTL_H +# include +#endif + +#if HAVE_SYSCTLBYNAME +/* no global variables */ +/* #endif HAVE_SYSCTLBYNAME */ + +#elif KERNEL_LINUX +/* no global variables */ +/* #endif KERNEL_LINUX */ + +#else # error "No applicable input method." #endif @@ -45,6 +59,25 @@ static void cs_submit (derive_t context_switches) static int cs_read (void) { +#if HAVE_SYSCTLBYNAME + int value = 0; + size_t value_len = sizeof (value); + int status; + + status = sysctlbyname ("vm.stats.sys.v_swtch", + &value, &value_len, + /* new pointer = */ NULL, /* new length = */ 0); + if (status != 0) + { + ERROR("contextswitch plugin: sysctlbyname " + "(vm.stats.sys.v_swtch) failed"); + return (-1); + } + + cs_submit (value); +/* #endif HAVE_SYSCTLBYNAME */ + +#elif KERNEL_LINUX FILE *fh; char buffer[64]; int numfields; @@ -88,6 +121,7 @@ static int cs_read (void) if (status == -2) ERROR ("contextswitch plugin: Unable to find context switch value."); +#endif /* KERNEL_LINUX */ return status; } diff --git a/src/curl.c b/src/curl.c index a533e147..8b95c80f 100644 --- a/src/curl.c +++ b/src/curl.c @@ -577,7 +577,6 @@ static void cc_submit (const web_page_t *wp, const web_match_t *wm, /* {{{ */ vl.values = values; vl.values_len = 1; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "curl", sizeof (vl.plugin)); sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance)); @@ -596,7 +595,6 @@ static void cc_submit_response_time (const web_page_t *wp, double seconds) /* {{ vl.values = values; vl.values_len = 1; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "curl", sizeof (vl.plugin)); sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance)); diff --git a/src/curl_json.c b/src/curl_json.c index 21deed61..fbac7ad1 100644 --- a/src/curl_json.c +++ b/src/curl_json.c @@ -98,18 +98,12 @@ static size_t cj_curl_callback (void *buf, /* {{{ */ return (0); status = yajl_parse(db->yajl, (unsigned char *)buf, len); - if (status == yajl_status_ok) - { - status = yajl_parse_complete(db->yajl); - return (len); - } - else if (status == yajl_status_insufficient_data) - return (len); - - if (status != yajl_status_ok) + if ((status != yajl_status_ok) + && (status != yajl_status_insufficient_data)) { unsigned char *msg = - yajl_get_error(db->yajl, 1, (unsigned char *)buf, len); + yajl_get_error(db->yajl, /* verbose = */ 1, + /* jsonText = */ (unsigned char *) buf, (unsigned int) len); ERROR ("curl_json plugin: yajl_parse failed: %s", msg); yajl_free_error(db->yajl, msg); return (0); /* abort write callback */ @@ -768,9 +762,14 @@ static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */ } status = curl_easy_perform (curl); - - yajl_free (db->yajl); - db->yajl = yprev; + if (status != 0) + { + ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)", + status, db->curl_errbuf, url); + yajl_free (db->yajl); + db->yajl = yprev; + return (-1); + } curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &url); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc); @@ -778,18 +777,30 @@ static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */ /* The response code is zero if a non-HTTP transport was used. */ if ((rc != 0) && (rc != 200)) { - ERROR ("curl_json plugin: curl_easy_perform failed with response code %ld (%s)", - rc, url); + ERROR ("curl_json plugin: curl_easy_perform failed with " + "response code %ld (%s)", rc, url); + yajl_free (db->yajl); + db->yajl = yprev; return (-1); } - if (status != 0) + status = yajl_parse_complete (db->yajl); + if (status != yajl_status_ok) { - ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)", - status, db->curl_errbuf, url); + unsigned char *errmsg; + + errmsg = yajl_get_error (db->yajl, /* verbose = */ 0, + /* jsonText = */ NULL, /* jsonTextLen = */ 0); + ERROR ("curl_json plugin: yajl_parse_complete failed: %s", + (char *) errmsg); + yajl_free_error (db->yajl, errmsg); + yajl_free (db->yajl); + db->yajl = yprev; return (-1); } + yajl_free (db->yajl); + db->yajl = yprev; return (0); } /* }}} int cj_curl_perform */ diff --git a/src/df.c b/src/df.c index b2be8e5e..4b3cba01 100644 --- a/src/df.c +++ b/src/df.c @@ -60,8 +60,8 @@ static ignorelist_t *il_device = NULL; static ignorelist_t *il_mountpoint = NULL; static ignorelist_t *il_fstype = NULL; -static _Bool by_device = false; -static _Bool report_inodes = false; +static _Bool by_device = 0; +static _Bool report_inodes = 0; static int df_init (void) { @@ -116,16 +116,16 @@ static int df_config (const char *key, const char *value) else if (strcasecmp (key, "ReportByDevice") == 0) { if (IS_TRUE (value)) - by_device = true; + by_device = 1; return (0); } else if (strcasecmp (key, "ReportInodes") == 0) { if (IS_TRUE (value)) - report_inodes = true; + report_inodes = 1; else - report_inodes = false; + report_inodes = 0; return (0); } diff --git a/src/memcachec.c b/src/memcachec.c index d066501c..8f51e22f 100644 --- a/src/memcachec.c +++ b/src/memcachec.c @@ -452,7 +452,6 @@ static void cmc_submit (const web_page_t *wp, const web_match_t *wm, /* {{{ */ vl.values = values; vl.values_len = 1; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "memcachec", sizeof (vl.plugin)); sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance)); diff --git a/src/memcached.c b/src/memcached.c index 348591fd..8490bf66 100644 --- a/src/memcached.c +++ b/src/memcached.c @@ -302,7 +302,6 @@ static void submit_counter2 (const char *type, const char *type_inst, vl.values = values; vl.values_len = 2; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); sstrncpy (vl.type, type, sizeof (vl.type)); @@ -323,7 +322,6 @@ static void submit_gauge (const char *type, const char *type_inst, vl.values = values; vl.values_len = 1; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); sstrncpy (vl.type, type, sizeof (vl.type)); @@ -345,7 +343,6 @@ static void submit_gauge2 (const char *type, const char *type_inst, vl.values = values; vl.values_len = 2; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); sstrncpy (vl.type, type, sizeof (vl.type)); diff --git a/src/mysql.c b/src/mysql.c index 48ad528b..a01bbe40 100644 --- a/src/mysql.c +++ b/src/mysql.c @@ -42,7 +42,6 @@ struct mysql_database_s /* {{{ */ { - /* instance == NULL => legacy mode */ char *instance; char *host; char *user; @@ -243,10 +242,10 @@ static MYSQL *getconnection (mysql_database_t *db) int err; if ((err = mysql_ping (db->con)) != 0) { - WARNING ("mysql_ping failed for %s: %s", - (db->instance != NULL) - ? db->instance - : "", + /* Assured by "mysql_config_database" */ + assert (db->instance != NULL); + WARNING ("mysql_ping failed for instance \"%s\": %s", + db->instance, mysql_error (db->con)); db->state = 0; } @@ -290,29 +289,13 @@ static MYSQL *getconnection (mysql_database_t *db) static void set_host (mysql_database_t *db, char *buf, size_t buflen) { - /* XXX legacy mode - use hostname_g */ - if (db->instance == NULL) + if ((db->host == NULL) + || (strcmp ("", db->host) == 0) + || (strcmp ("localhost", db->host) == 0)) sstrncpy (buf, hostname_g, buflen); else - { - if ((db->host == NULL) - || (strcmp ("", db->host) == 0) - || (strcmp ("localhost", db->host) == 0)) - sstrncpy (buf, hostname_g, buflen); - else - sstrncpy (buf, db->host, buflen); - } -} - -static void set_plugin_instance (mysql_database_t *db, - char *buf, size_t buflen) -{ - /* XXX legacy mode - no plugin_instance */ - if (db->instance == NULL) - sstrncpy (buf, "", buflen); - else - sstrncpy (buf, db->instance, buflen); -} + sstrncpy (buf, db->host, buflen); +} /* void set_host */ static void submit (const char *type, const char *type_instance, value_t *values, size_t values_len, mysql_database_t *db) @@ -325,7 +308,10 @@ static void submit (const char *type, const char *type_instance, set_host (db, vl.host, sizeof (vl.host)); sstrncpy (vl.plugin, "mysql", sizeof (vl.plugin)); - set_plugin_instance (db, vl.plugin_instance, sizeof (vl.plugin_instance)); + + /* Assured by "mysql_config_database" */ + assert (db->instance != NULL); + sstrncpy (vl.plugin_instance, db->instance, sizeof (vl.plugin_instance)); sstrncpy (vl.type, type, sizeof (vl.type)); if (type_instance != NULL) @@ -508,8 +494,10 @@ static int mysql_read_slave_stats (mysql_database_t *db, MYSQL *con) sql = row[SLAVE_SQL_RUNNING_IDX]; set_host (db, n.host, sizeof (n.host)); - set_plugin_instance (db, - n.plugin_instance, sizeof (n.plugin_instance)); + + /* Assured by "mysql_config_database" */ + assert (db->instance != NULL); + sstrncpy (n.plugin_instance, db->instance, sizeof (n.plugin_instance)); if (((io == NULL) || (strcasecmp (io, "yes") != 0)) && (db->slave_io_running)) diff --git a/src/network.c b/src/network.c index 73e6d92d..1544ecfb 100644 --- a/src/network.c +++ b/src/network.c @@ -258,7 +258,7 @@ typedef struct receive_list_entry_s receive_list_entry_t; * Private variables */ static int network_config_ttl = 0; -static size_t network_config_packet_size = 1024; +static size_t network_config_packet_size = 1452; static int network_config_forward = 0; static int network_config_stats = 0; @@ -319,30 +319,30 @@ static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */ /* This is a value we already sent. Don't allow it to be received again in * order to avoid looping. */ if ((status == 0) && (time_sent >= ((uint64_t) vl->time))) - return (false); + return (0); - return (true); + return (1); } /* }}} _Bool check_receive_okay */ static _Bool check_send_okay (const value_list_t *vl) /* {{{ */ { - _Bool received = false; + _Bool received = 0; int status; if (network_config_forward != 0) - return (true); + return (1); if (vl->meta == NULL) - return (true); + return (1); status = meta_data_get_boolean (vl->meta, "network:received", &received); if (status == -ENOENT) - return (true); + return (1); else if (status != 0) { ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed " "with status %i.", status); - return (true); + return (1); } /* By default, only *send* value lists that were not *received* by the @@ -383,7 +383,7 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */ return (-ENOMEM); } - status = meta_data_add_boolean (vl->meta, "network:received", true); + status = meta_data_add_boolean (vl->meta, "network:received", 1); if (status != 0) { ERROR ("network plugin: meta_data_add_boolean failed."); @@ -3256,13 +3256,13 @@ static int network_stats_read (void) /* {{{ */ static int network_init (void) { - static _Bool have_init = false; + static _Bool have_init = 0; /* Check if we were already initialized. If so, just return - there's * nothing more to do (for now, that is). */ if (have_init) return (0); - have_init = true; + have_init = 1; #if HAVE_LIBGCRYPT gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); diff --git a/src/nginx.c b/src/nginx.c index 69768427..36d3d8d2 100644 --- a/src/nginx.c +++ b/src/nginx.c @@ -38,10 +38,9 @@ static char *cacert = NULL; static CURL *curl = NULL; -#define ABUFFER_SIZE 16384 -static char nginx_buffer[ABUFFER_SIZE]; -static int nginx_buffer_len = 0; -static char nginx_curl_error[CURL_ERROR_SIZE]; +static char nginx_buffer[16384]; +static size_t nginx_buffer_len = 0; +static char nginx_curl_error[CURL_ERROR_SIZE]; static const char *config_keys[] = { @@ -59,17 +58,19 @@ static size_t nginx_curl_callback (void *buf, size_t size, size_t nmemb, { size_t len = size * nmemb; - if ((nginx_buffer_len + len) >= ABUFFER_SIZE) + /* Check if the data fits into the memory. If not, truncate it. */ + if ((nginx_buffer_len + len) >= sizeof (nginx_buffer)) { - len = (ABUFFER_SIZE - 1) - nginx_buffer_len; + assert (sizeof (nginx_buffer) > nginx_buffer_len); + len = (sizeof (nginx_buffer) - 1) - nginx_buffer_len; } if (len <= 0) return (len); - memcpy (nginx_buffer + nginx_buffer_len, (char *) buf, len); + memcpy (&nginx_buffer[nginx_buffer_len], buf, len); nginx_buffer_len += len; - nginx_buffer[nginx_buffer_len] = '\0'; + nginx_buffer[nginx_buffer_len] = 0; return (len); } diff --git a/src/perl.c b/src/perl.c index afb3ba7e..72605804 100644 --- a/src/perl.c +++ b/src/perl.c @@ -33,6 +33,10 @@ #include "configfile.h" +#if HAVE_STDBOOL_H +# include +#endif + #include #include diff --git a/src/plugin.c b/src/plugin.c index af894d54..65d3875e 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1654,7 +1654,7 @@ static int plugin_notification_meta_add (notification_t *n, } case NM_TYPE_BOOLEAN: { - meta->nm_value.nm_boolean = *((bool *) value); + meta->nm_value.nm_boolean = *((_Bool *) value); break; } default: @@ -1708,7 +1708,7 @@ int plugin_notification_meta_add_double (notification_t *n, int plugin_notification_meta_add_boolean (notification_t *n, const char *name, - bool value) + _Bool value) { return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value)); } diff --git a/src/plugin.h b/src/plugin.h index 8b9449ee..d78aa4f8 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -135,7 +135,7 @@ typedef struct notification_meta_s int64_t nm_signed_int; uint64_t nm_unsigned_int; double nm_double; - bool nm_boolean; + _Bool nm_boolean; } nm_value; struct notification_meta_s *next; } notification_meta_t; @@ -340,7 +340,7 @@ int plugin_notification_meta_add_double (notification_t *n, double value); int plugin_notification_meta_add_boolean (notification_t *n, const char *name, - bool value); + _Bool value); int plugin_notification_meta_copy (notification_t *dst, const notification_t *src); diff --git a/src/rrdtool.c b/src/rrdtool.c index 4655b96e..cb8ad593 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -303,7 +303,7 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data) pthread_mutex_lock (&queue_lock); /* Wait for values to arrive */ - while (true) + while (42) { struct timespec ts_wait; @@ -342,7 +342,7 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data) &ts_wait); if (status == ETIMEDOUT) break; - } /* while (true) */ + } /* while (42) */ /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ diff --git a/src/ted.c b/src/ted.c index 8dc00e5a..bf519bbe 100644 --- a/src/ted.c +++ b/src/ted.c @@ -271,7 +271,6 @@ static void ted_submit (char *type, double value) values[0].gauge = value; - vl.time = time (NULL); vl.values = values; vl.values_len = 1; sstrncpy (vl.host, hostname_g, sizeof (vl.host)); diff --git a/src/types.db b/src/types.db index b2c0b4fe..a994399f 100644 --- a/src/types.db +++ b/src/types.db @@ -28,6 +28,7 @@ cpufreq value:GAUGE:0:U cpu value:COUNTER:0:4294967295 current value:GAUGE:U:U current_connections value:GAUGE:0:U +current_sessions value:GAUGE:0:U delay seconds:GAUGE:-1000000:1000000 derive value:DERIVE:0:U df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623