--- /dev/null
+/**
+ * collectd - src/amqp1.c
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Andy Smith <ansmith@redhat.com>
+ */
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_format_graphite.h"
+#include "utils_format_json.h"
+#include "utils_random.h"
+#include "utils_deq.h"
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <stdint.h>
+
+#define BUFSIZE 8192
+#define AMQP1_FORMAT_JSON 0
+#define AMQP1_FORMAT_COMMAND 1
+#define AMQP1_FORMAT_GRAPHITE 2
+
+typedef struct amqp1_config_transport_t {
+ DEQ_LINKS(struct amqp1_config_transport_t);
+ char *name;
+ char *host;
+ char *port;
+ char *user;
+ char *password;
+ char *address;
+} amqp1_config_transport_t;
+
+typedef struct amqp1_config_instance_t {
+ DEQ_LINKS(struct amqp1_config_instance_t);
+ char *name;
+ uint8_t format;
+ unsigned int graphite_flags;
+ _Bool store_rates;
+ char *prefix;
+ char *postfix;
+ char escape_char;
+ _Bool pre_settle;
+ char send_to[128];
+} amqp1_config_instance_t;
+
+DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
+
+typedef struct cd_message_t {
+ DEQ_LINKS(struct cd_message_t);
+ pn_bytes_t mbuf;
+ amqp1_config_instance_t *instance;
+} cd_message_t;
+
+DEQ_DECLARE(cd_message_t, cd_message_list_t);
+
+/*
+ * Globals
+ */
+pn_connection_t *conn = NULL;
+pn_session_t *ssn = NULL;
+pn_link_t *sender = NULL;
+pn_proactor_t *proactor = NULL;
+pthread_mutex_t send_lock;
+cd_message_list_t out_messages;
+uint64_t cd_tag = 1;
+uint64_t acknowledged = 0;
+amqp1_config_transport_t *transport = NULL;
+bool finished = false;
+
+static int event_thread_running = 0;
+static pthread_t event_thread_id;
+
+/*
+ * Functions
+ */
+static void cd_message_free(cd_message_t *cdm)
+{
+ if (cdm->mbuf.start) {
+ free((void *)cdm->mbuf.start);
+ }
+ free(cdm);
+} /* }}} void cd_message_free */
+
+static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
+{
+ uint64_t dtag;
+ cd_message_list_t to_send;
+ cd_message_t *cdm;
+ int link_credit = pn_link_credit(link);
+ int event_count = 0;
+ pn_delivery_t *dlv;
+
+ DEQ_INIT(to_send);
+
+ pthread_mutex_lock(&send_lock);
+
+ if (link_credit > 0) {
+ dtag = cd_tag;
+ cdm = DEQ_HEAD(out_messages);
+ while (cdm) {
+ DEQ_REMOVE_HEAD(out_messages);
+ DEQ_INSERT_TAIL(to_send, cdm);
+ if (DEQ_SIZE(to_send) == link_credit)
+ break;
+ cdm = DEQ_HEAD(out_messages);
+ }
+ cd_tag += DEQ_SIZE(to_send);
+ }
+
+ pthread_mutex_unlock(&send_lock);
+
+ /* message is already formatted and encoded */
+ cdm = DEQ_HEAD(to_send);
+ while (cdm) {
+ DEQ_REMOVE_HEAD(to_send);
+ dtag++;
+ dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag)));
+ pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
+ pn_link_advance(link);
+ if (cdm->instance->pre_settle == true) {
+ pn_delivery_settle(dlv);
+ }
+ event_count++;
+ cd_message_free(cdm);
+ cdm = DEQ_HEAD(to_send);
+ }
+
+ return event_count;
+} /* }}} int amqp1_send_out_messages */
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
+{
+ if (pn_condition_is_set(cond)) {
+ ERROR("amqp1 plugin: %s: %s: %s",
+ pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond),
+ pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ conn = NULL;
+ }
+} /* }}} void check_condition */
+
+static bool handle(pn_event_t *event) /* {{{ */
+{
+
+ switch (pn_event_type(event)) {
+
+ case PN_CONNECTION_INIT:{
+ conn = pn_event_connection(event);
+ pn_connection_set_container(conn, transport->address);
+ pn_connection_open(conn);
+ ssn = pn_session(conn);
+ pn_session_open(ssn);
+ sender = pn_sender(ssn, "cd-sender");
+ pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
+ pn_link_open(sender);
+ break;
+ }
+
+ case PN_LINK_FLOW: {
+ /* peer has given us credit, send outbound messages */
+ amqp1_send_out_messages(sender);
+ break;
+ }
+
+ case PN_DELIVERY: {
+ /* acknowledgement from peer that a message was delivered */
+ pn_delivery_t * dlv = pn_event_delivery(event);
+ if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
+ acknowledged++;
+ }
+ break;
+ }
+
+ case PN_CONNECTION_WAKE: {
+ if (!finished) {
+ amqp1_send_out_messages(sender);
+ }
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED: {
+ check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ break;
+ }
+
+ case PN_CONNECTION_REMOTE_CLOSE: {
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+ }
+
+ case PN_SESSION_REMOTE_CLOSE: {
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+ }
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH: {
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+ }
+
+ case PN_PROACTOR_INACTIVE: {
+ return false;
+ }
+
+ default: break;
+ }
+ return true;
+} /* }}} bool handle */
+
+static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
+{
+
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_t *e;
+ for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(e)) {
+ finished = true;
+ }
+ }
+ pn_proactor_done(proactor, events);
+ } while (!finished);
+
+ event_thread_running = 0;
+
+ return NULL;
+} /* }}} void event_thread */
+
+static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ user_data_t *user_data)
+{
+ amqp1_config_instance_t *instance = user_data->data;
+ int status = 0;
+ size_t bfree = BUFSIZE;
+ size_t bfill = 0;
+ cd_message_t *cdm;
+ size_t bufsize = BUFSIZE;
+ pn_data_t *body;
+ pn_message_t *message;
+
+ if ((ds == NULL) || (vl == NULL) || (transport == NULL))
+ return EINVAL;
+
+ cdm = NEW(cd_message_t);
+ DEQ_ITEM_INIT(cdm);
+ cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
+ cdm->instance = instance;
+
+ switch (instance->format) {
+ case AMQP1_FORMAT_COMMAND:
+ status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
+ if (status != 0) {
+ ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+ return status;
+ }
+ cdm->mbuf.size = strlen(cdm->mbuf.start);
+ break;
+ case AMQP1_FORMAT_JSON:
+ format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
+ format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
+ instance->store_rates);
+ format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ cdm->mbuf.size = strlen(cdm->mbuf.start);
+ break;
+ case AMQP1_FORMAT_GRAPHITE:
+ status =
+ format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix,
+ instance->postfix, instance->escape_char, instance->graphite_flags);
+ if (status != 0) {
+ ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+ return status;
+ }
+ cdm->mbuf.size = strlen(cdm->mbuf.start);
+ break;
+ default:
+ ERROR("amqp1 plugin: Invalid format (%i).", instance->format);
+ return -1;
+ }
+
+ /* encode message */
+ message = pn_message();
+ pn_message_set_address(message, instance->send_to);
+ body = pn_message_body(message);
+ pn_data_clear(body);
+ pn_data_put_binary(body, cdm->mbuf);
+ pn_data_exit(body);
+
+ /* put_binary copies and stores so ok to use mbuf */
+ cdm->mbuf.size = bufsize;
+ pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+
+ pthread_mutex_lock(&send_lock);
+ DEQ_INSERT_TAIL(out_messages, cdm);
+ pthread_mutex_unlock(&send_lock);
+
+ pn_message_free(message);
+
+ /* activate the sender */
+ if (conn != NULL) {
+ pn_connection_wake(conn);
+ }
+
+ return 0;
+} /* }}} int amqp_write1 */
+
+static void amqp1_config_transport_free(void *ptr) /* {{{ */
+{
+ amqp1_config_transport_t *transport = ptr;
+
+ if (transport == NULL)
+ return;
+
+ sfree(transport->name);
+ sfree(transport->host);
+ sfree(transport->user);
+ sfree(transport->password);
+ sfree(transport->address);
+
+ sfree(transport);
+} /* }}} void amqp1_config_transport_free */
+
+static void amqp1_config_instance_free(void *ptr) /* {{{ */
+{
+ amqp1_config_instance_t *instance = ptr;
+
+ if (instance == NULL)
+ return;
+
+ sfree(instance->name);
+ sfree(instance->prefix);
+ sfree(instance->postfix);
+
+ sfree(instance);
+} /* }}} void amqp1_config_instance_free */
+
+static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
+{
+ int status=0;
+ char *key = NULL;
+ amqp1_config_instance_t *instance;
+
+ instance = calloc(1, sizeof(*instance));
+ if (instance == NULL) {
+ ERROR("amqp1 plugin: calloc failed.");
+ return ENOMEM;
+ }
+
+ /* Initialize instance configuration {{{ */
+ instance->name = NULL;
+
+ status = cf_util_get_string(ci, &instance->name);
+ if (status != 0) {
+ sfree(instance);
+ return status;
+ }
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("PreSettle", child->key) == 0)
+ status = cf_util_get_boolean(child, &instance->pre_settle);
+ else if (strcasecmp("Format", child->key) == 0) {
+ status = cf_util_get_string(child, &key);
+ if (status != 0)
+ return status;
+ /* TODO: goto errout */
+ // goto errout;
+ assert(key != NULL);
+ if (strcasecmp(key, "Command") == 0) {
+ instance->format = AMQP1_FORMAT_COMMAND;
+ } else if (strcasecmp(key, "Graphite") == 0) {
+ instance->format = AMQP1_FORMAT_GRAPHITE;
+ } else if (strcasecmp(key, "JSON") == 0) {
+ instance->format = AMQP1_FORMAT_JSON;
+ } else {
+ WARNING("amqp1 plugin: Invalid format string: %s", key);
+ }
+ sfree(key);
+ }
+ else if (strcasecmp("StoreRates", child->key) == 0)
+ status = cf_util_get_boolean(child, &instance->store_rates);
+ else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
+ status = cf_util_get_flag(child, &instance->graphite_flags,
+ GRAPHITE_SEPARATE_INSTANCES);
+ else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
+ status = cf_util_get_flag(child, &instance->graphite_flags,
+ GRAPHITE_ALWAYS_APPEND_DS);
+ else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
+ status = cf_util_get_flag(child, &instance->graphite_flags,
+ GRAPHITE_PRESERVE_SEPARATOR);
+ else if (strcasecmp("GraphitePrefix", child->key) == 0)
+ status = cf_util_get_string(child, &instance->prefix);
+ else if (strcasecmp("GraphitePostfix", child->key) == 0)
+ status = cf_util_get_string(child, &instance->postfix);
+ else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
+ char *tmp_buff = NULL;
+ status = cf_util_get_string(child, &tmp_buff);
+ if (strlen(tmp_buff) > 1)
+ WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+ "only one character. Others will be ignored.");
+ instance->escape_char = tmp_buff[0];
+ sfree(tmp_buff);
+ }
+ else
+ WARNING("amqp1 plugin: Ignoring unknown "
+ "instance configuration option "
+ "\%s\".", child->key);
+ if (status != 0)
+ break;
+ }
+
+ if (status != 0) {
+ amqp1_config_instance_free(instance);
+ return status;
+ } else {
+ char tpname[128];
+ snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+ snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+ transport->address,instance->name);
+ status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
+ .data = instance, .free_func = amqp1_config_instance_free, });
+ if (status != 0) {
+ amqp1_config_instance_free(instance);
+ }
+ }
+
+ return status;
+} /* }}} int amqp1_config_instance */
+
+static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
+{
+ int status=0;
+
+ transport = calloc(1, sizeof(*transport));
+ if (transport == NULL) {
+ ERROR("amqp1 plugin: calloc failed.");
+ return ENOMEM;
+ }
+
+ /* Initialize transport configuration {{{ */
+ transport->name = NULL;
+
+ status = cf_util_get_string(ci, &transport->name);
+ if (status != 0) {
+ sfree(transport);
+ return status;
+ }
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Host", child->key) == 0)
+ status = cf_util_get_string(child, &transport->host);
+ else if (strcasecmp("Port", child->key) == 0)
+ status = cf_util_get_string(child, &transport->port);
+ else if (strcasecmp("User", child->key) == 0)
+ status = cf_util_get_string(child, &transport->user);
+ else if (strcasecmp("Password", child->key) == 0)
+ status = cf_util_get_string(child, &transport->password);
+ else if (strcasecmp("Address", child->key) == 0)
+ status = cf_util_get_string(child, &transport->address);
+ else if (strcasecmp("Instance",child->key) == 0)
+ amqp1_config_instance(child);
+ else
+ WARNING("amqp1 plugin: Ignoring unknown "
+ "transport configuration option "
+ "\%s\".", child->key);
+
+ if (status != 0)
+ break;
+ }
+
+ if (status != 0) {
+ amqp1_config_transport_free(transport);
+ }
+ return status;
+} /* }}} int amqp1_config_transport */
+
+static int amqp1_config(oconfig_item_t *ci) /* {{{ */
+{
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Transport", child->key) == 0)
+ amqp1_config_transport(child);
+ else
+ WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
+ child->key);
+ }
+
+ return 0;
+} /* }}} int amqp1_config */
+
+static int amqp1_init(void) /* {{{ */
+{
+ char addr[PN_MAX_ADDR];
+ int status;
+ char errbuf[1024];
+
+ if (transport == NULL) {
+ ERROR("amqp1: init failed, no transport configured");
+ return -1;
+ }
+
+ if (proactor == NULL) {
+ pthread_mutex_init(&send_lock, /* attr = */ NULL);
+ proactor = pn_proactor();
+ pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port);
+ conn = pn_connection();
+ if (transport->user != NULL) {
+ pn_connection_set_user(conn, transport->user);
+ pn_connection_set_password(conn, transport->password);
+ }
+ pn_proactor_connect(proactor, conn, addr);
+ /* start_thread */
+ status = plugin_thread_create(&event_thread_id, NULL /* no attributes */,
+ event_thread, NULL /* no argument */,
+ "handle");
+ if (status != 0) {
+ ERROR("amqp1: pthread_create failed: %s",
+ sstrerror(errno, errbuf, sizeof(errbuf)));
+ } else {
+ event_thread_running = 1;
+ }
+ }
+ return 0;
+} /* }}} int amqp1_init */
+
+static int amqp1_shutdown
+(void) /* {{{ */
+{
+ cd_message_t *cdm;
+
+ /* Stop the proactor thread */
+ if (event_thread_running != 0) {
+ finished=true;
+ /* activate the event thread */
+ pn_connection_wake(conn);
+ pthread_join(event_thread_id, NULL /* no return value */);
+ memset(&event_thread_id, 0, sizeof(event_thread_id));
+ }
+
+ /* Free the remaining out_messages */
+ cdm = DEQ_HEAD(out_messages);
+ while (cdm) {
+ DEQ_REMOVE_HEAD(out_messages);
+ cd_message_free(cdm);
+ cdm = DEQ_HEAD(out_messages);
+ }
+
+ if (proactor != NULL) {
+ pn_proactor_free(proactor);
+ }
+
+ if (transport != NULL) {
+ amqp1_config_transport_free(transport);
+ }
+
+ return 0;
+} /* }}} int amqp1_shutdown */
+
+void module_register(void)
+{
+ plugin_register_complex_config("amqp1", amqp1_config);
+ plugin_register_init("amqp1", amqp1_init);
+ plugin_register_shutdown("amqp1",amqp1_shutdown);
+} /* void module_register */
=head2 Plugin C<amqp>
The I<AMQP plugin> can be used to communicate with other instances of
-I<collectd> or third party applications using an AMQP message broker. Values
-are sent to or received from the broker, which handles routing, queueing and
-possibly filtering out messages.
+I<collectd> or third party applications using an AMQP 0.9.1 message broker.
+Values are sent to or received from the broker, which handles routing,
+queueing and possibly filtering out messages.
B<Synopsis:>
=back
+=head2 Plugin C<amqp1>
+
+The I<AMQP1 plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP 1.0 message
+intermediary. Values are sent to the messaging intermediary which
+may handle direct messaging or queue based transfer.
+
+B<Synopsis:>
+
+ <Plugin "amqp1">
+ # Send values to an AMQP 1.0 intermediary
+ <Transport "name">
+ Host "localhost"
+ Port "5672"
+ User "guest"
+ Password "guest"
+ Address "collectd"
+ <Instance "some_name">
+ Format "command"
+ PreSettle false
+ # StoreRates false
+ # GraphitePrefix "collectd."
+ # GraphiteEscapeChar "_"
+ # GraphiteSeparateInstances false
+ # GraphiteAlwaysAppendDS false
+ # GraphitePreserveSeparator false
+ </Instance>
+ </Transport>
+ </Plugin>
+
+The plugin's configuration consists of a I<Transport> that configures
+communications to the AMQP 1.0 messaging bus and one or more I<Instance>
+corresponding to publishers to the messaging system. The address in
+the I<Transport> block concatenated with the name given int the
+I<Instance> block starting tag will be used as the send-to address for
+communications over the messaging link.
+
+The following options are accepted within each I<Transport> block:
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP 1.0 intermediary. Defaults to the
+default behavior of the underlying communications library,
+I<libqpid-proton>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP 1.0 intermediary accepts
+connections. This argument must be a string, even if the numeric form
+is used. Defaults to "5672".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP 1.0 intermediary. By
+default "guest"/"guest" is used.
+
+=item B<Address> I<Address>
+
+This option specifies the prefix for the send-to value in the message.
+By default, "collectd" will be used.
+
+=back
+
+The following options are accepted within each I<Instance> block:
+
+=over 4
+
+=item B<Format> B<Command>|B<JSON>|B<Graphite>
+
+Selects the format in which messages are sent to the intermediary. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values.
+
+=item B<PreSettle> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will wait for a message
+acknowledgement from the messaging bus before sending the next
+message. This indicates transfer of ownership to the messaging
+system. If set to B<true>, the plugin will not wait for a message
+acknowledgement and the message may be dropped prior to transfer of
+ownership.
+
+=item B<StoreRates> B<true>|B<false>
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=item B<GraphitePrefix>
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix>
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar>
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
+=item B<GraphiteSeparateInstances> B<true>|B<false>
+
+If set to B<true>, the plugin instance and type instance will be in their own
+path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+default), the plugin and plugin instance (and likewise the type and type
+instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=item B<GraphitePreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<GraphiteEscapeChar>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
+=back
+
=head2 Plugin C<apache>
To configure the C<apache>-plugin you first need to configure the Apache
--- /dev/null
+#ifndef utils_deq_h
+#define utils_deq_h 1
+
+#include <stdlib.h>
+#include <assert.h>
+#include <memory.h>
+
+#define CT_ASSERT(exp) { assert(exp); }
+
+#define NEW(t) (t*) malloc(sizeof(t))
+#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n))
+#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n))
+
+//
+// If available, use aligned_alloc for cache-line-aligned allocations. Otherwise
+// fall back to plain malloc.
+//
+#define NEW_CACHE_ALIGNED(t,p) \
+do { \
+ if (posix_memalign((void*) &(p), 64, (sizeof(t) + (sizeof(t) % 64 ? 64 - (sizeof(t) % 64) : 0))) != 0) (p) = 0; \
+} while (0)
+
+#define ALLOC_CACHE_ALIGNED(s,p) \
+do { \
+ if (posix_memalign((void*) &(p), 64, (s + (s % 64 ? 64 - (s % 64) : 0))) != 0) (p) = 0; \
+} while (0)
+
+#define ZERO(p) memset(p, 0, sizeof(*p))
+
+#define DEQ_DECLARE(i,d) typedef struct { \
+ i *head; \
+ i *tail; \
+ i *scratch; \
+ size_t size; \
+ } d
+
+#define DEQ_LINKS_N(n,t) t *prev##n; t *next##n
+#define DEQ_LINKS(t) DEQ_LINKS_N(,t)
+#define DEQ_EMPTY {0,0,0,0}
+
+#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
+#define DEQ_IS_EMPTY(d) ((d).head == 0)
+#define DEQ_ITEM_INIT_N(n,i) do { (i)->next##n = 0; (i)->prev##n = 0; } while(0)
+#define DEQ_ITEM_INIT(i) DEQ_ITEM_INIT_N(,i)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
+#define DEQ_NEXT_N(n,i) (i)->next##n
+#define DEQ_NEXT(i) DEQ_NEXT_N(,i)
+#define DEQ_PREV_N(n,i) (i)->prev##n
+#define DEQ_PREV(i) DEQ_PREV_N(,i)
+#define DEQ_MOVE(d1,d2) do {d2 = d1; DEQ_INIT(d1);} while (0)
+/**
+ *@pre ptr points to first element of deq
+ *@post ptr points to first element of deq that passes test, or 0. Test should involve ptr.
+ */
+#define DEQ_FIND_N(n,ptr,test) while((ptr) && !(test)) ptr = DEQ_NEXT_N(n,ptr);
+#define DEQ_FIND(ptr,test) DEQ_FIND_N(,ptr,test)
+
+#define DEQ_INSERT_HEAD_N(n,d,i) \
+do { \
+ CT_ASSERT((i)->next##n == 0); \
+ CT_ASSERT((i)->prev##n == 0); \
+ if ((d).head) { \
+ (i)->next##n = (d).head; \
+ (d).head->prev##n = i; \
+ } else { \
+ (d).tail = i; \
+ (i)->next##n = 0; \
+ CT_ASSERT((d).size == 0); \
+ } \
+ (i)->prev##n = 0; \
+ (d).head = i; \
+ (d).size++; \
+} while (0)
+#define DEQ_INSERT_HEAD(d,i) DEQ_INSERT_HEAD_N(,d,i)
+
+#define DEQ_INSERT_TAIL_N(n,d,i) \
+do { \
+ CT_ASSERT((i)->next##n == 0); \
+ CT_ASSERT((i)->prev##n == 0); \
+ if ((d).tail) { \
+ (i)->prev##n = (d).tail; \
+ (d).tail->next##n = i; \
+ } else { \
+ (d).head = i; \
+ (i)->prev##n = 0; \
+ CT_ASSERT((d).size == 0); \
+ } \
+ (i)->next##n = 0; \
+ (d).tail = i; \
+ (d).size++; \
+} while (0)
+#define DEQ_INSERT_TAIL(d,i) DEQ_INSERT_TAIL_N(,d,i)
+
+#define DEQ_REMOVE_HEAD_N(n,d) \
+do { \
+ CT_ASSERT((d).head); \
+ if ((d).head) { \
+ (d).scratch = (d).head; \
+ (d).head = (d).head->next##n; \
+ if ((d).head == 0) { \
+ (d).tail = 0; \
+ CT_ASSERT((d).size == 1); \
+ } else \
+ (d).head->prev##n = 0; \
+ (d).size--; \
+ (d).scratch->next##n = 0; \
+ (d).scratch->prev##n = 0; \
+ } \
+} while (0)
+#define DEQ_REMOVE_HEAD(d) DEQ_REMOVE_HEAD_N(,d)
+
+#define DEQ_REMOVE_TAIL_N(n,d) \
+do { \
+ CT_ASSERT((d).tail); \
+ if ((d).tail) { \
+ (d).scratch = (d).tail; \
+ (d).tail = (d).tail->prev##n; \
+ if ((d).tail == 0) { \
+ (d).head = 0; \
+ CT_ASSERT((d).size == 1); \
+ } else \
+ (d).tail->next##n = 0; \
+ (d).size--; \
+ (d).scratch->next##n = 0; \
+ (d).scratch->prev##n = 0; \
+ } \
+} while (0)
+#define DEQ_REMOVE_TAIL(d) DEQ_REMOVE_TAIL_N(,d)
+
+#define DEQ_INSERT_AFTER_N(n,d,i,a) \
+do { \
+ CT_ASSERT((i)->next##n == 0); \
+ CT_ASSERT((i)->prev##n == 0); \
+ CT_ASSERT(a); \
+ if ((a)->next##n) \
+ (a)->next##n->prev##n = (i); \
+ else \
+ (d).tail = (i); \
+ (i)->next##n = (a)->next##n; \
+ (i)->prev##n = (a); \
+ (a)->next##n = (i); \
+ (d).size++; \
+} while (0)
+#define DEQ_INSERT_AFTER(d,i,a) DEQ_INSERT_AFTER_N(,d,i,a)
+
+#define DEQ_REMOVE_N(n,d,i) \
+do { \
+ if ((i)->next##n) \
+ (i)->next##n->prev##n = (i)->prev##n; \
+ else \
+ (d).tail = (i)->prev##n; \
+ if ((i)->prev##n) \
+ (i)->prev##n->next##n = (i)->next##n; \
+ else \
+ (d).head = (i)->next##n; \
+ CT_ASSERT((d).size > 0); \
+ (d).size--; \
+ (i)->next##n = 0; \
+ (i)->prev##n = 0; \
+ CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
+} while (0)
+#define DEQ_REMOVE(d,i) DEQ_REMOVE_N(,d,i)
+
+#define DEQ_APPEND_N(n,d1,d2) \
+do { \
+ if (!(d1).head) \
+ (d1) = (d2); \
+ else if ((d2).head) { \
+ (d1).tail->next##n = (d2).head; \
+ (d2).head->prev##n = (d1).tail; \
+ (d1).tail = (d2).tail; \
+ (d1).size += (d2).size; \
+ } \
+ DEQ_INIT(d2); \
+} while (0)
+#define DEQ_APPEND(d1,d2) DEQ_APPEND_N(,d1,d2)
+
+#endif