[
with_librabbitmq="yes"
])
+SAVE_CPPFLAGS="$CPPFLAGS"
+SAVE_LDFLAGS="$LDFLAGS"
+CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
if test "x$with_librabbitmq" = "xyes"
then
- SAVE_CPPFLAGS="$CPPFLAGS"
- CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
-
AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
-
- CPPFLAGS="$SAVE_CPPFLAGS"
fi
if test "x$with_librabbitmq" = "xyes"
then
- SAVE_CPPFLAGS="$CPPFLAGS"
- SAVE_LDFLAGS="$LDFLAGS"
- CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
- LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
-
+ # librabbitmq up to version 0.9.1 provides "library_errno", later
+ # versions use "library_error". The library does not provide a version
+ # macro :( Use "AC_CHECK_MEMBERS" (plural) for automatic defines.
+ AC_CHECK_MEMBERS([amqp_rpc_reply_t.library_errno],,,
+ [
+#if HAVE_STDLIB_H
+# include <stdlib.h>
+#endif
+#if HAVE_STDIO_H
+# include <stdio.h>
+#endif
+#if HAVE_STDINT_H
+# include <stdint.h>
+#endif
+#if HAVE_INTTYPES_H
+# include <inttypes.h>
+#endif
+#include <amqp.h>
+ ])
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
-
- CPPFLAGS="$SAVE_CPPFLAGS"
- LDFLAGS="$SAVE_LDFLAGS"
fi
if test "x$with_librabbitmq" = "xyes"
then
AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
fi
+CPPFLAGS="$SAVE_CPPFLAGS"
+LDFLAGS="$SAVE_LDFLAGS"
AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
# }}}
/**
* collectd - src/amqp.c
- * Copyright (C) 2009 Sebastien Pahl
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2009 Sebastien Pahl
+ * Copyright (C) 2010-2012 Florian Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
if (r.library_errno)
return (sstrerror (r.library_errno, buffer, buffer_size));
+#else
+ if (r.library_error)
+ return (sstrerror (r.library_error, buffer, buffer_size));
+#endif
else
sstrncpy (buffer, "End of stream", sizeof (buffer));
break;
return (buffer);
} /* }}} char *camqp_strerror */
+#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
{
amqp_exchange_declare_ok_t *ed_ret;
return (0);
} /* }}} int camqp_create_exchange */
+#else
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+ amqp_exchange_declare_ok_t *ed_ret;
+ amqp_table_t argument_table;
+ struct amqp_table_entry_t_ argument_table_entries[1];
+
+ if (conf->exchange_type == NULL)
+ return (0);
+
+ /* Valid arguments: "auto_delete", "internal" */
+ argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries);
+ argument_table.entries = argument_table_entries;
+ argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete");
+ argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
+ argument_table_entries[0].value.value.boolean = 1;
+
+ ed_ret = amqp_exchange_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* type = */ amqp_cstring_bytes (conf->exchange_type),
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* arguments = */ argument_table);
+ if ((ed_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Successfully created exchange \"%s\" "
+ "with type \"%s\".",
+ conf->exchange, conf->exchange_type);
+
+ return (0);
+} /* }}} int camqp_create_exchange */
+#endif
static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
{
/* consumer_tag = */ AMQP_EMPTY_BYTES,
/* no_local = */ 0,
/* no_ack = */ 1,
- /* exclusive = */ 0);
+ /* exclusive = */ 0,
+ /* arguments = */ AMQP_EMPTY_TABLE
+ );
if ((cm_ret == NULL) && camqp_is_error (conf))
{
char errbuf[1024];