From fd958eb99c09d819a51dd147db7c189366e9382a Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Sun, 6 May 2018 14:00:03 -0400 Subject: [PATCH] Updates for rpm spec and review feedback --- Makefile.am | 4 ++- contrib/redhat/collectd.spec | 30 ++++++++++++++++++--- src/amqp1.c | 64 ++++++++++++++++++++++++++++++-------------- src/utils_deq.h | 45 ++++++++++++++++++------------- 4 files changed, 100 insertions(+), 43 deletions(-) diff --git a/Makefile.am b/Makefile.am index facbc976..95a0369f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -544,7 +544,9 @@ endif if BUILD_PLUGIN_AMQP1 pkglib_LTLIBRARIES += amqp1.la -amqp1_la_SOURCES = src/amqp1.c +amqp1_la_SOURCES = \ + src/amqp1.c \ + src/utils_deq.h amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS) amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS) amqp1_la_LIBADD = \ diff --git a/contrib/redhat/collectd.spec b/contrib/redhat/collectd.spec index d84b457b..6fc8e515 100644 --- a/contrib/redhat/collectd.spec +++ b/contrib/redhat/collectd.spec @@ -41,6 +41,7 @@ # plugins enabled by default %define with_aggregation 0%{!?_without_aggregation:1} %define with_amqp 0%{!?_without_amqp:1} +%define with_amqp1 0%{!?_without_amqp1:1} %define with_apache 0%{!?_without_apache:1} %define with_apcups 0%{!?_without_apcups:1} %define with_ascent 0%{!?_without_ascent:1} @@ -277,13 +278,24 @@ every 10 seconds by default. %if %{with_amqp} %package amqp -Summary: AMQP plugin for collectd +Summary: AMQP 0.9 plugin for collectd Group: System Environment/Daemons Requires: %{name}%{?_isa} = %{version}-%{release} BuildRequires: librabbitmq-devel %description amqp -The AMQP plugin transmits or receives values collected by collectd via the -Advanced Message Queuing Protocol (AMQP). +The AMQP 0.9 plugin transmits or receives values collected by collectd via the +Advanced Message Queuing Protocol v0.9 (AMQP). +%endif + +%if %{with_amqp1} +%package amqp1 +Summary: AMQP 1.0 plugin for collectd +Group: System Environment/Daemons +Requires: %{name}%{?_isa} = %{version}-%{release} +BuildRequires: qpid-proton-c-devel +%description amqp1 +The AMQP 1.0 plugin transmits or receives values collected by collectd via the +Advanced Message Queuing Protocol v1.0 (AMQP1). %endif %if %{with_apache} @@ -1015,6 +1027,12 @@ Collectd utilities %define _with_amqp --disable-amqp %endif +%if %{with_amqp1} +%define _with_amqp1 --enable-amqp1 +%else +%define _with_amqp1 --disable-amqp1 +%endif + %if %{with_apache} %define _with_apache --enable-apache %else @@ -1888,6 +1906,7 @@ Collectd utilities --enable-target_v5upgrade \ %{?_with_aggregation} \ %{?_with_amqp} \ + %{?_with_amqp1} \ %{?_with_apache} \ %{?_with_apcups} \ %{?_with_apple_sensors} \ @@ -2399,6 +2418,11 @@ fi %{_libdir}/%{name}/amqp.so %endif +%if %{with_amqp1} +%files amqp1 +%{_libdir}/%{name}/amqp1.so +%endif + %if %{with_apache} %files apache %{_libdir}/%{name}/apache.so diff --git a/src/amqp1.c b/src/amqp1.c index 3397f525..5a5d2b8e 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -62,7 +62,7 @@ typedef struct amqp1_config_transport_t { char *user; char *password; char *address; - int retry_delay; + int retry_delay; } amqp1_config_transport_t; typedef struct amqp1_config_instance_t { @@ -76,14 +76,14 @@ typedef struct amqp1_config_instance_t { char *postfix; char escape_char; _Bool pre_settle; - char send_to[128]; + char send_to[1024]; } amqp1_config_instance_t; DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); typedef struct cd_message_t { DEQ_LINKS(struct cd_message_t); - pn_bytes_t mbuf; + pn_rwbytes_t mbuf; amqp1_config_instance_t *instance; } cd_message_t; @@ -124,7 +124,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ int event_count = 0; pn_delivery_t *dlv; - if (stopping){ + if (stopping) { return 0; } @@ -166,7 +166,6 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ return event_count; } /* }}} int amqp1_send_out_messages */ - static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ { if (pn_condition_is_set(cond)) { @@ -184,7 +183,7 @@ static bool handle(pn_event_t *event) /* {{{ */ case PN_CONNECTION_INIT: { conn = pn_event_connection(event); - pn_connection_set_container(conn, transport->address); + pn_connection_set_container(conn, transport->name); pn_connection_open(conn); pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); @@ -275,7 +274,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ while (engine_running && !stopping) { pn_event_batch_t *events = pn_proactor_wait(proactor); pn_event_t *e; - while (( e = pn_event_batch_next(events))){ + while ((e = pn_event_batch_next(events))) { engine_running = handle(e); if (!engine_running) { break; @@ -308,24 +307,39 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ return NULL; } /* }}} void event_thread */ -static void encqueue(cd_message_t *cdm, - amqp1_config_instance_t *instance) /* {{{ */ +static int encqueue(cd_message_t *cdm, + amqp1_config_instance_t *instance) /* {{{ */ { size_t bufsize = BUFSIZE; pn_data_t *body; pn_message_t *message; + int status = 0; /* encode message */ message = pn_message(); pn_message_set_address(message, instance->send_to); body = pn_message_body(message); pn_data_clear(body); - pn_data_put_binary(body, cdm->mbuf); + pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); pn_data_exit(body); /* put_binary copies and stores so ok to use mbuf */ cdm->mbuf.size = bufsize; - pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); + while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + &cdm->mbuf.size)) == PN_OVERFLOW) { + DEBUG("amqp1 plugin: increasing message buffer size %i", + (int)cdm->mbuf.size); + cdm->mbuf.size *= 2; + cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + } + + if (status != 0) { + ERROR("amqp1 plugin: error encoding message: %s", + pn_error_text(pn_message_error(message))); + pn_message_free(message); + cd_message_free(cdm); + return -1; + } pthread_mutex_lock(&send_lock); DEQ_INSERT_TAIL(out_messages, cdm); @@ -338,7 +352,8 @@ static void encqueue(cd_message_t *cdm, pn_connection_wake(conn); } -} /* }}} void encqueue */ + return 0; +} /* }}} int encqueue */ static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ @@ -361,7 +376,7 @@ static int amqp1_notify(notification_t const *n, cdm = NEW(cd_message_t); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -380,9 +395,9 @@ static int amqp1_notify(notification_t const *n, } /* encode message and place on outbound queue */ - encqueue(cdm, instance); + status = encqueue(cdm, instance); - return 0; + return status; } /* }}} int amqp1_notify */ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -406,7 +421,7 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ cdm = NEW(cd_message_t); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -557,10 +572,19 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ amqp1_config_instance_free(instance); return status; } else { - char tpname[128]; - snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); - snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address, instance->name); + char tpname[1024]; + int status; + status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + if ((status < 0) || (size_t)status >= sizeof(tpname)) { + ERROR("amqp1 plugin: Instance name would have been truncated."); + return -1; + } + status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); + if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { + ERROR("amqp1 plugin: send_to address would have been truncated."); + return -1; + } if (instance->notify == true) { status = plugin_register_notification( tpname, amqp1_notify, diff --git a/src/utils_deq.h b/src/utils_deq.h index 150864b5..3182baae 100644 --- a/src/utils_deq.h +++ b/src/utils_deq.h @@ -1,3 +1,29 @@ +/** + * collectd - src/utils_deq.h + * Copyright(c) 2017 Red Hat Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Andy Smith + */ + #ifndef utils_deq_h #define utils_deq_h 1 @@ -12,25 +38,6 @@ #define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n)) #define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n)) -// -// If available, use aligned_alloc for cache-line-aligned allocations. Otherwise -// fall back to plain malloc. -// -#define NEW_CACHE_ALIGNED(t, p) \ - do { \ - if (posix_memalign( \ - (void *)&(p), 64, \ - (sizeof(t) + (sizeof(t) % 64 ? 64 - (sizeof(t) % 64) : 0))) != 0) \ - (p) = 0; \ - } while (0) - -#define ALLOC_CACHE_ALIGNED(s, p) \ - do { \ - if (posix_memalign((void *)&(p), 64, \ - (s + (s % 64 ? 64 - (s % 64) : 0))) != 0) \ - (p) = 0; \ - } while (0) - #define ZERO(p) memset(p, 0, sizeof(*p)) #define DEQ_DECLARE(i, d) \ -- 2.11.0