First implentation of an amqp output plugin
authorSebastien Pahl <sebastien.pahl@gmail.com>
Tue, 9 Feb 2010 16:15:55 +0000 (17:15 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 13:13:50 +0000 (15:13 +0200)
It's ugly for now but it works.

configure.in
src/Makefile.am
src/amqp.c [new file with mode: 0644]

index fc12c08..28e71b6 100644 (file)
@@ -1279,6 +1279,57 @@ then
 fi
 AM_CONDITIONAL(BUILD_WITH_LIBKVM_OPENFILES, test "x$with_kvm_openfiles" = "xyes")
 
+# --with-librabbitmq {{{
+with_librabbitmq_cppflags=""
+with_librabbitmq_ldflags=""
+AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
+[
+       if test "x$withval" != "xno" && test "x$withval" != "xyes"
+       then
+               with_librabbitmq_cppflags="-I$withval/include"
+               with_librabbitmq_ldflags="-L$withval/lib"
+               with_librabbitmq="yes"
+       else
+               with_librabbitmq="$withval"
+       fi
+],
+[
+       with_librabbitmq="yes"
+])
+if test "x$with_librabbitmq" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+
+       AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       SAVE_LDFLAGS="$LDFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+       LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+
+       AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+       LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+       BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
+       BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
+       BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq"
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
+       AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+# }}}
+
 # --with-libcurl {{{
 with_curl_config="curl-config"
 with_curl_cflags=""
@@ -4403,6 +4454,7 @@ AC_ARG_ENABLE([all-plugins],
 
 m4_divert_once([HELP_ENABLE], [])
 
+AC_PLUGIN([amqp],        [$with_librabbitmq],  [AMQP output plugin])
 AC_PLUGIN([apache],      [$with_libcurl],      [Apache httpd statistics])
 AC_PLUGIN([apcups],      [yes],                [Statistics of UPSes by APC])
 AC_PLUGIN([apple_sensors], [$with_libiokit],   [Apple's hardware sensors])
@@ -4699,6 +4751,7 @@ Configuration:
     libperl . . . . . . . $with_libperl
     libpq . . . . . . . . $with_libpq
     libpthread  . . . . . $with_libpthread
+    librabbitmq . . . . . $with_librabbitmq
     librouteros . . . . . $with_librouteros
     librrd  . . . . . . . $with_librrd
     libsensors  . . . . . $with_libsensors
@@ -4723,6 +4776,7 @@ Configuration:
     perl  . . . . . . . . $with_perl_bindings
 
   Modules:
+    amqp    . . . . . . . $enable_amqp
     apache  . . . . . . . $enable_apache
     apcups  . . . . . . . $enable_apcups
     apple_sensors . . . . $enable_apple_sensors
index 00d0e20..cabdc04 100644 (file)
@@ -110,6 +110,16 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES = 
 CLEANFILES = 
 
+if BUILD_PLUGIN_AMQP
+pkglib_LTLIBRARIES += amqp.la
+amqp_la_SOURCES = amqp.c utils_format_json.c utils_format_json.h
+amqp_la_LDFLAGS = -module -avoid-version
+amqp_la_CFLAGS = $(BUILD_WITH_LIBRABBITMQ_CFLAGS)
+amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
+collectd_LDADD += "-dlopen" amqp.la
+collectd_DEPENDENCIES += amqp.la
+endif
+
 if BUILD_PLUGIN_APACHE
 pkglib_LTLIBRARIES += apache.la
 apache_la_SOURCES = apache.c
diff --git a/src/amqp.c b/src/amqp.c
new file mode 100644 (file)
index 0000000..0b470e1
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+**
+** collectd-amqp
+** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
+**
+** Permission is hereby granted, free of charge, to any person
+** obtaining a copy of this software and associated documentation
+** files (the "Software"), to deal in the Software without
+** restriction, including without limitation the rights to use,
+** copy, modify, merge, publish, distribute, sublicense, and/or sell
+** copies of the Software, and to permit persons to whom the
+** Software is furnished to do so, subject to the following
+** conditions:
+**
+** The above copyright notice and this permission notice shall be
+** included in all copies or substantial portions of the Software.
+**
+** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+** OTHER DEALINGS IN THE SOFTWARE.
+**
+*/
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <strings.h>
+
+#include <collectd.h>
+#include <common.h>
+#include <plugin.h>
+#include <utils_format_json.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+#define PLUGIN_NAME "amqp"
+
+static int  port;
+static char *host       = NULL;
+static char *vhost      = NULL;
+static char *user       = NULL;
+static char *password   = NULL;
+static char *exchange   = NULL;
+static char *routingkey = NULL;
+
+static const char *config_keys[] =
+{
+    "Host",
+    "Port",
+    "VHost",
+    "User",
+    "Password",
+    "Exchange",
+    "RoutingKey"
+};
+
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+
+static void config_free(char *var)
+{
+    if (var != NULL)
+        free(var);
+}
+
+static int config_set(char **var, const char *value)
+{
+    config_free(*var);
+    if ((*var = strdup(value)) == NULL)
+        return (1);
+    return (0);
+}
+
+static int config(const char *key, const char *value)
+{
+    if (strcasecmp(key, "host") == 0)
+        return (config_set(&host, value));
+    else if(strcasecmp(key, "port") == 0)
+    {
+        port = atoi(value);
+        return (0);
+    }
+    else if (strcasecmp(key, "vhost") == 0)
+        return (config_set(&vhost, value));
+    else if (strcasecmp(key, "user") == 0)
+        return (config_set(&user, value));
+    else if (strcasecmp(key, "password") == 0)
+        return (config_set(&password, value));
+    else if (strcasecmp(key, "exchange") == 0)
+        return (config_set(&exchange, value));
+    else if (strcasecmp(key, "routingkey") == 0)
+        return (config_set(&routingkey, value));
+    return (-1);
+}
+
+static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)
+{
+    int error;
+    int sockfd;
+    size_t bfree;
+    size_t bfill;
+    char buffer[4096];
+    amqp_rpc_reply_t reply;
+    amqp_connection_state_t conn;
+    amqp_basic_properties_t props;
+
+    conn = amqp_new_connection();
+    if ((sockfd = amqp_open_socket(host, port)) < 0)
+    {
+        amqp_destroy_connection(conn);
+        return (1);
+    }
+    amqp_set_sockfd(conn, sockfd);
+    reply = amqp_login(conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, password);
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        amqp_destroy_connection(conn);
+        close(sockfd);
+        return (1);
+    }
+    amqp_channel_open(conn, 1);
+    if (amqp_rpc_reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection(conn);
+        close(sockfd);
+        return (1);
+    }
+    error = 0;
+    memset(buffer, 0, sizeof(buffer));
+    bfree = sizeof(buffer);
+    bfill = 0;
+    format_json_initialize(buffer, &bfill, &bfree);
+    format_json_value_list(buffer, &bfill, &bfree, ds, vl);
+    format_json_finalize(buffer, &bfill, &bfree);
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props.content_type = amqp_cstring_bytes("application/json");
+    props.delivery_mode = 2; // persistent delivery mode
+    error = amqp_basic_publish(conn,
+                1,
+                amqp_cstring_bytes(exchange),
+                amqp_cstring_bytes(routingkey),
+                0,
+                0,
+                &props,
+                amqp_cstring_bytes(buffer));
+    reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+        error = 1;
+    reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+        error = 1;
+    amqp_destroy_connection(conn);
+    if (close(sockfd) < 0)
+        error = 1;
+    return (error);
+}
+
+static int shutdown(void)
+{
+    config_free(host);
+    config_free(vhost);
+    config_free(user);
+    config_free(password);
+    config_free(exchange);
+    config_free(routingkey);
+    return (0);
+}
+
+void module_register(void)
+{
+    plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num);
+    plugin_register_write(PLUGIN_NAME, amqp_write, NULL);
+    plugin_register_shutdown(PLUGIN_NAME, shutdown);
+}
+