amqp plugin: add support for Graphite output
authorThomas Meson <zllak@hycik.org>
Fri, 10 Aug 2012 22:11:54 +0000 (00:11 +0200)
committerFlorian Forster <octo@collectd.org>
Thu, 23 Aug 2012 07:46:24 +0000 (09:46 +0200)
This commit implements "Graphite format" for AMQP Plugin.
The AMQP plugin will be able to directly output a valid
Graphite metric format (<metric name> <value> <timestamp>\n).
This is very useful when the Graphite server is directly reading
from an AMQP broker. You can then avoid having a proxy somewhere
doing the conversion between PUTVAL or JSON metrics into Graphite
format.

Signed-off-by: Florian Forster <octo@collectd.org>
src/Makefile.am
src/amqp.c
src/collectd.conf.pod
src/utils_format_graphite.c [new file with mode: 0644]
src/utils_format_graphite.h [new file with mode: 0644]

index f106fa1..f8c1f79 100644 (file)
@@ -124,7 +124,8 @@ if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
                  utils_cmd_putval.c utils_cmd_putval.h \
-                 utils_format_json.c utils_format_json.h
+                 utils_format_json.c utils_format_json.h \
+                 utils_format_graphite.c utils_format_graphite.h
 amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
 amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
 amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
index 55d2a2c..c9929dc 100644 (file)
@@ -31,6 +31,7 @@
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
+#include "utils_format_graphite.h"
 
 #include <pthread.h>
 
@@ -42,8 +43,9 @@
 #define CAMQP_DM_VOLATILE   1
 #define CAMQP_DM_PERSISTENT 2
 
-#define CAMQP_FORMAT_COMMAND 1
-#define CAMQP_FORMAT_JSON    2
+#define CAMQP_FORMAT_COMMAND    1
+#define CAMQP_FORMAT_JSON       2
+#define CAMQP_FORMAT_GRAPHITE   3
 
 #define CAMQP_CHANNEL 1
 
@@ -68,6 +70,10 @@ struct camqp_config_s
     uint8_t delivery_mode;
     _Bool   store_rates;
     int     format;
+    /* publish & graphite format only */
+    char    *prefix;
+    char    *postfix;
+    char    escape_char;
 
     /* subscribe only */
     char   *exchange_type;
@@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */
     sfree (conf->exchange_type);
     sfree (conf->queue);
     sfree (conf->routing_key);
+    sfree (conf->prefix);
+    sfree (conf->postfix);
+
 
     sfree (conf);
 } /* }}} void camqp_config_free */
@@ -698,6 +707,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         props.content_type = amqp_cstring_bytes("text/collectd");
     else if (conf->format == CAMQP_FORMAT_JSON)
         props.content_type = amqp_cstring_bytes("application/json");
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+        props.content_type = amqp_cstring_bytes("text/graphite");
     else
         assert (23 == 42);
     props.delivery_mode = conf->delivery_mode;
@@ -776,6 +787,17 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
         format_json_finalize (buffer, &bfill, &bfree);
     }
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+    {
+        status = format_graphite (buffer, sizeof (buffer), ds, vl,
+                    conf->prefix, conf->postfix, conf->escape_char);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: format_graphite failed with status %i.",
+                    status);
+            return (status);
+        }
+    }
     else
     {
         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
@@ -808,6 +830,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
         conf->format = CAMQP_FORMAT_COMMAND;
     else if (strcasecmp ("JSON", string) == 0)
         conf->format = CAMQP_FORMAT_JSON;
+    else if (strcasecmp ("Graphite", string) == 0)
+        conf->format = CAMQP_FORMAT_GRAPHITE;
     else
     {
         WARNING ("amqp plugin: Invalid format string: %s",
@@ -848,6 +872,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
+    /* publish & graphite only */
+    conf->prefix = NULL;
+    conf->postfix = NULL;
+    conf->escape_char = '_';
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
@@ -905,6 +933,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_boolean (child, &conf->store_rates);
         else if ((strcasecmp ("Format", child->key) == 0) && publish)
             status = camqp_config_set_format (child, conf);
+        else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->prefix);
+        else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->postfix);
+        else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
+        {
+            char *tmp_buff = NULL;
+            status = cf_util_get_string (child, &tmp_buff);
+            if (strlen (tmp_buff) > 1)
+                WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
+                        "only one character. Others will be ignored.");
+            conf->escape_char = tmp_buff[0];
+            sfree (tmp_buff);
+        }
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);
index e275112..5eb3af4 100644 (file)
@@ -210,6 +210,8 @@ possibly filtering or messages.
  #   Persistent false
  #   Format "command"
  #   StoreRates false
+ #   GraphitePrefix "collectd."
+ #   GraphiteEscapeChar "_"
    </Publish>
    
    # Receive values from an AMQP broker
@@ -310,6 +312,10 @@ If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
 an easy and straight forward exchange format. The C<Content-Type> header field
 will be set to C<application/json>.
 
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
 A subscribing client I<should> use the C<Content-Type> header field to
 determine how to decode the values. Currently, the I<AMQP plugin> itself can
 only decode the B<Command> format.
@@ -324,6 +330,25 @@ using the internal value cache.
 Please note that currently this option is only used if the B<Format> option has
 been set to B<JSON>.
 
+=item B<GraphitePrefix> (Publish and B<Format>=I<Graphite> only)
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix> (Publish and B<Format>=I<Graphite> only)
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar> (Publish and B<Format>=I<Graphite> only)
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
 =back
 
 =head2 Plugin C<apache>
diff --git a/src/utils_format_graphite.c b/src/utils_format_graphite.c
new file mode 100644 (file)
index 0000000..b0ebc1e
--- /dev/null
@@ -0,0 +1,225 @@
+/**
+ * collectd - src/utils_format_graphite.c
+ * Copyright (C) 2012  Thomas Meson
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Thomas Meson <zllak at hycik.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+
+#include "utils_cache.h"
+#include "utils_format_json.h"
+#include "utils_parse_option.h"
+
+/* Utils functions to format data sets in graphite format.
+ * Largely taken from write_graphite.c as it remains the same formatting */
+
+static int gr_format_values (char *ret, size_t ret_len,
+        int ds_num, const data_set_t *ds, const value_list_t *vl)
+{
+    size_t offset = 0;
+    int status;
+
+    assert (0 == strcmp (ds->type, vl->type));
+
+    memset (ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+    status = ssnprintf (ret + offset, ret_len - offset, \
+            __VA_ARGS__); \
+    if (status < 1) \
+    { \
+        return (-1); \
+    } \
+    else if (((size_t) status) >= (ret_len - offset)) \
+    { \
+        return (-1); \
+    } \
+    else \
+    offset += ((size_t) status); \
+} while (0)
+
+    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+        BUFFER_ADD ("%f", vl->values[ds_num].gauge);
+    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+        BUFFER_ADD ("%llu", vl->values[ds_num].counter);
+    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+        BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
+    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+        BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
+    else
+    {
+        ERROR ("gr_format_values plugin: Unknown data source type: %i",
+                ds->ds[ds_num].type);
+        return (-1);
+    }
+
+#undef BUFFER_ADD
+
+    return (0);
+}
+
+static void copy_escape_part (char *dst, const char *src, size_t dst_len,
+    char escape_char)
+{
+    size_t i;
+
+    memset (dst, 0, dst_len);
+
+    if (src == NULL)
+        return;
+
+    for (i = 0; i < dst_len; i++)
+    {
+        if (src[i] == 0)
+        {
+            dst[i] = 0;
+            break;
+        }
+
+        if ((src[i] == '.')
+                || isspace ((int) src[i])
+                || iscntrl ((int) src[i]))
+            dst[i] = escape_char;
+        else
+            dst[i] = src[i];
+    }
+}
+
+static int gr_format_name (char *ret, int ret_len,
+        const value_list_t *vl,
+        const char *ds_name,
+        char *prefix,
+        char *postfix,
+        char escape_char)
+{
+    char n_host[DATA_MAX_NAME_LEN];
+    char n_plugin[DATA_MAX_NAME_LEN];
+    char n_plugin_instance[DATA_MAX_NAME_LEN];
+    char n_type[DATA_MAX_NAME_LEN];
+    char n_type_instance[DATA_MAX_NAME_LEN];
+
+    char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
+    char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
+
+    if (prefix == NULL)
+        prefix = "";
+
+    if (postfix == NULL)
+        postfix = "";
+
+    copy_escape_part (n_host, vl->host,
+            sizeof (n_host), escape_char);
+    copy_escape_part (n_plugin, vl->plugin,
+            sizeof (n_plugin), escape_char);
+    copy_escape_part (n_plugin_instance, vl->plugin_instance,
+            sizeof (n_plugin_instance), escape_char);
+    copy_escape_part (n_type, vl->type,
+            sizeof (n_type), escape_char);
+    copy_escape_part (n_type_instance, vl->type_instance,
+            sizeof (n_type_instance), escape_char);
+
+    if (n_plugin_instance[0] != '\0')
+        ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
+            n_plugin,
+            '-',
+            n_plugin_instance);
+    else
+        sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
+
+    if (n_type_instance[0] != '\0')
+        ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
+            n_type,
+            '-',
+            n_type_instance);
+    else
+        sstrncpy (tmp_type, n_type, sizeof (tmp_type));
+
+    if (ds_name != NULL)
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
+    else
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type);
+
+    return (0);
+}
+
+int format_graphite (char *buffer, size_t buffer_size,
+    const data_set_t *ds, const value_list_t *vl, char *prefix,
+    char *postfix, char escape_char)
+{
+    int status = 0;
+    int i;
+    int buffer_pos = 0;
+
+    for (i = 0; i < ds->ds_num; i++)
+    {
+        const char *ds_name = NULL;
+        char        key[10*DATA_MAX_NAME_LEN];
+        char        values[512];
+        size_t      message_len;
+        char        message[1024];
+
+        ds_name = ds->ds[i].name;
+
+        /* Copy the identifier to `key' and escape it. */
+        status = gr_format_name (key, sizeof (key), vl, ds_name,
+                    prefix, postfix, escape_char);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: error with gr_format_name");
+            return (status);
+        }
+
+        escape_string (key, sizeof (key));
+        /* Convert the values to an ASCII representation and put that into
+         * `values'. */
+        status = gr_format_values (values, sizeof (values), i, ds, vl);
+        if (status != 0)
+        {
+            ERROR ("format_graphite: error with gr_format_values");
+            return (status);
+        }
+
+        /* Compute the graphite command */
+        message_len = (size_t) ssnprintf (message, sizeof (message),
+                "%s %s %u\r\n",
+                key,
+                values,
+                (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+        if (message_len >= sizeof (message)) {
+            ERROR ("format_graphite: message buffer too small: "
+                    "Need %zu bytes.", message_len + 1);
+            return (-ENOMEM);
+        }
+
+        /* Append it in case we got multiple data set */
+        if ((buffer_pos + message_len) >= buffer_size)
+        {
+            ERROR ("format_graphite: target buffer too small");
+            return (-ENOMEM);
+        }
+        memcpy((void *) (buffer + buffer_pos), message, message_len);
+        buffer_pos += message_len;
+    }
+    return (status);
+} /* int format_graphite */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
diff --git a/src/utils_format_graphite.h b/src/utils_format_graphite.h
new file mode 100644 (file)
index 0000000..a3c4d85
--- /dev/null
@@ -0,0 +1,33 @@
+/**
+ * collectd - src/utils_format_graphite.h
+ * Copyright (C) 2012  Thomas Meson
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Thomas Meson <zllak at hycik.org>
+ **/
+
+#ifndef UTILS_FORMAT_GRAPHITE_H
+#define UTILS_FORMAT_GRAPHITE_H 1
+
+#include "collectd.h"
+#include "plugin.h"
+
+int format_graphite (char *buffer,
+    size_t buffer_size, const data_set_t *ds,
+    const value_list_t *vl, const char *prefix,
+    const char *postfix, const char escape_char);
+
+#endif /* UTILS_FORMAT_GRAPHITE_H */