# plugins enabled by default
%define with_aggregation 0%{!?_without_aggregation:1}
%define with_amqp 0%{!?_without_amqp:1}
+%define with_amqp1 0%{!?_without_amqp1:1}
%define with_apache 0%{!?_without_apache:1}
%define with_apcups 0%{!?_without_apcups:1}
%define with_ascent 0%{!?_without_ascent:1}
%if %{with_amqp}
%package amqp
-Summary: AMQP plugin for collectd
+Summary: AMQP 0.9 plugin for collectd
Group: System Environment/Daemons
Requires: %{name}%{?_isa} = %{version}-%{release}
BuildRequires: librabbitmq-devel
%description amqp
-The AMQP plugin transmits or receives values collected by collectd via the
-Advanced Message Queuing Protocol (AMQP).
+The AMQP 0.9 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v0.9 (AMQP).
+%endif
+
+%if %{with_amqp1}
+%package amqp1
+Summary: AMQP 1.0 plugin for collectd
+Group: System Environment/Daemons
+Requires: %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: qpid-proton-c-devel
+%description amqp1
+The AMQP 1.0 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v1.0 (AMQP1).
%endif
%if %{with_apache}
%define _with_amqp --disable-amqp
%endif
+%if %{with_amqp1}
+%define _with_amqp1 --enable-amqp1
+%else
+%define _with_amqp1 --disable-amqp1
+%endif
+
%if %{with_apache}
%define _with_apache --enable-apache
%else
--enable-target_v5upgrade \
%{?_with_aggregation} \
%{?_with_amqp} \
+ %{?_with_amqp1} \
%{?_with_apache} \
%{?_with_apcups} \
%{?_with_apple_sensors} \
%{_libdir}/%{name}/amqp.so
%endif
+%if %{with_amqp1}
+%files amqp1
+%{_libdir}/%{name}/amqp1.so
+%endif
+
%if %{with_apache}
%files apache
%{_libdir}/%{name}/apache.so
char *user;
char *password;
char *address;
- int retry_delay;
+ int retry_delay;
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_t {
char *postfix;
char escape_char;
_Bool pre_settle;
- char send_to[128];
+ char send_to[1024];
} amqp1_config_instance_t;
DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
typedef struct cd_message_t {
DEQ_LINKS(struct cd_message_t);
- pn_bytes_t mbuf;
+ pn_rwbytes_t mbuf;
amqp1_config_instance_t *instance;
} cd_message_t;
int event_count = 0;
pn_delivery_t *dlv;
- if (stopping){
+ if (stopping) {
return 0;
}
return event_count;
} /* }}} int amqp1_send_out_messages */
-
static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
{
if (pn_condition_is_set(cond)) {
case PN_CONNECTION_INIT: {
conn = pn_event_connection(event);
- pn_connection_set_container(conn, transport->address);
+ pn_connection_set_container(conn, transport->name);
pn_connection_open(conn);
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
while (engine_running && !stopping) {
pn_event_batch_t *events = pn_proactor_wait(proactor);
pn_event_t *e;
- while (( e = pn_event_batch_next(events))){
+ while ((e = pn_event_batch_next(events))) {
engine_running = handle(e);
if (!engine_running) {
break;
return NULL;
} /* }}} void event_thread */
-static void encqueue(cd_message_t *cdm,
- amqp1_config_instance_t *instance) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+ amqp1_config_instance_t *instance) /* {{{ */
{
size_t bufsize = BUFSIZE;
pn_data_t *body;
pn_message_t *message;
+ int status = 0;
/* encode message */
message = pn_message();
pn_message_set_address(message, instance->send_to);
body = pn_message_body(message);
pn_data_clear(body);
- pn_data_put_binary(body, cdm->mbuf);
+ pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
pn_data_exit(body);
/* put_binary copies and stores so ok to use mbuf */
cdm->mbuf.size = bufsize;
- pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+ while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+ &cdm->mbuf.size)) == PN_OVERFLOW) {
+ DEBUG("amqp1 plugin: increasing message buffer size %i",
+ (int)cdm->mbuf.size);
+ cdm->mbuf.size *= 2;
+ cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+ }
+
+ if (status != 0) {
+ ERROR("amqp1 plugin: error encoding message: %s",
+ pn_error_text(pn_message_error(message)));
+ pn_message_free(message);
+ cd_message_free(cdm);
+ return -1;
+ }
pthread_mutex_lock(&send_lock);
DEQ_INSERT_TAIL(out_messages, cdm);
pn_connection_wake(conn);
}
-} /* }}} void encqueue */
+ return 0;
+} /* }}} int encqueue */
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
cdm = NEW(cd_message_t);
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
}
/* encode message and place on outbound queue */
- encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
- return 0;
+ return status;
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
cdm = NEW(cd_message_t);
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
amqp1_config_instance_free(instance);
return status;
} else {
- char tpname[128];
- snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
- snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
- transport->address, instance->name);
+ char tpname[1024];
+ int status;
+ status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+ if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+ ERROR("amqp1 plugin: Instance name would have been truncated.");
+ return -1;
+ }
+ status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+ transport->address, instance->name);
+ if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+ ERROR("amqp1 plugin: send_to address would have been truncated.");
+ return -1;
+ }
if (instance->notify == true) {
status = plugin_register_notification(
tpname, amqp1_notify,
+/**
+ * collectd - src/utils_deq.h
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Andy Smith <ansmith@redhat.com>
+ */
+
#ifndef utils_deq_h
#define utils_deq_h 1
#define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n))
#define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n))
-//
-// If available, use aligned_alloc for cache-line-aligned allocations. Otherwise
-// fall back to plain malloc.
-//
-#define NEW_CACHE_ALIGNED(t, p) \
- do { \
- if (posix_memalign( \
- (void *)&(p), 64, \
- (sizeof(t) + (sizeof(t) % 64 ? 64 - (sizeof(t) % 64) : 0))) != 0) \
- (p) = 0; \
- } while (0)
-
-#define ALLOC_CACHE_ALIGNED(s, p) \
- do { \
- if (posix_memalign((void *)&(p), 64, \
- (s + (s % 64 ? 64 - (s % 64) : 0))) != 0) \
- (p) = 0; \
- } while (0)
-
#define ZERO(p) memset(p, 0, sizeof(*p))
#define DEQ_DECLARE(i, d) \