From 46fdf1fbedc20bff90fc85555e0afba193ab0a31 Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Mon, 16 Apr 2018 09:50:21 -0400 Subject: [PATCH] Add connection retry --- src/amqp1.c | 102 +++++++++++++++++++++++++++++++------------------- src/collectd.conf.in | 1 + src/collectd.conf.pod | 7 ++++ 3 files changed, 71 insertions(+), 39 deletions(-) diff --git a/src/amqp1.c b/src/amqp1.c index d7be877b..3397f525 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -62,6 +62,7 @@ typedef struct amqp1_config_transport_t { char *user; char *password; char *address; + int retry_delay; } amqp1_config_transport_t; typedef struct amqp1_config_instance_t { @@ -92,7 +93,6 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t); * Globals */ pn_connection_t *conn = NULL; -pn_session_t *ssn = NULL; pn_link_t *sender = NULL; pn_proactor_t *proactor = NULL; pthread_mutex_t send_lock; @@ -100,8 +100,8 @@ cd_message_list_t out_messages; uint64_t cd_tag = 1; uint64_t acknowledged = 0; amqp1_config_transport_t *transport = NULL; -bool finished = false; +static bool stopping = false; static int event_thread_running = 0; static pthread_t event_thread_id; @@ -124,6 +124,10 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ int event_count = 0; pn_delivery_t *dlv; + if (stopping){ + return 0; + } + DEQ_INIT(to_send); pthread_mutex_lock(&send_lock); @@ -162,6 +166,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ return event_count; } /* }}} int amqp1_send_out_messages */ + static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ { if (pn_condition_is_set(cond)) { @@ -181,7 +186,7 @@ static bool handle(pn_event_t *event) /* {{{ */ conn = pn_event_connection(event); pn_connection_set_container(conn, transport->address); pn_connection_open(conn); - ssn = pn_session(conn); + pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); sender = pn_sender(ssn, "cd-sender"); pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); @@ -206,7 +211,7 @@ static bool handle(pn_event_t *event) /* {{{ */ } case PN_CONNECTION_WAKE: { - if (!finished) { + if (!stopping) { amqp1_send_out_messages(sender); } break; @@ -250,17 +255,53 @@ static bool handle(pn_event_t *event) /* {{{ */ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ { + char addr[PN_MAX_ADDR]; + cd_message_t *cdm; + + /* setup proactor */ + proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); + + while (!stopping) { + /* make connection */ + conn = pn_connection(); + if (transport->user != NULL) { + pn_connection_set_user(conn, transport->user); + pn_connection_set_password(conn, transport->password); + } + pn_proactor_connect(proactor, conn, addr); - do { - pn_event_batch_t *events = pn_proactor_wait(proactor); - pn_event_t *e; - for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { - if (!handle(e)) { - finished = true; + bool engine_running = true; + while (engine_running && !stopping) { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_t *e; + while (( e = pn_event_batch_next(events))){ + engine_running = handle(e); + if (!engine_running) { + break; + } } + pn_proactor_done(proactor, events); + } + + pn_proactor_release_connection(conn); + + DEBUG("amqp1 plugin: retrying connection"); + int delay = transport->retry_delay; + while (delay-- > 0 && !stopping) { + sleep(1.0); } - pn_proactor_done(proactor, events); - } while (!finished); + } + + pn_proactor_disconnect(proactor, NULL); + + /* Free the remaining out_messages */ + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(cdm); + cdm = DEQ_HEAD(out_messages); + } event_thread_running = 0; @@ -554,6 +595,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ /* Initialize transport configuration {{{ */ transport->name = NULL; + transport->retry_delay = 1; status = cf_util_get_string(ci, &transport->name); if (status != 0) { @@ -574,6 +616,8 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_string(child, &transport->password); else if (strcasecmp("Address", child->key) == 0) status = cf_util_get_string(child, &transport->address); + else if (strcasecmp("RetryDelay", child->key) == 0) + status = cf_util_get_int(child, &transport->retry_delay); else if (strcasecmp("Instance", child->key) == 0) amqp1_config_instance(child); else @@ -610,7 +654,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ static int amqp1_init(void) /* {{{ */ { - char addr[PN_MAX_ADDR]; int status; char errbuf[1024]; @@ -621,20 +664,12 @@ static int amqp1_init(void) /* {{{ */ if (proactor == NULL) { pthread_mutex_init(&send_lock, /* attr = */ NULL); - proactor = pn_proactor(); - pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); - conn = pn_connection(); - if (transport->user != NULL) { - pn_connection_set_user(conn, transport->user); - pn_connection_set_password(conn, transport->password); - } - pn_proactor_connect(proactor, conn, addr); /* start_thread */ status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, event_thread, NULL /* no argument */, "handle"); if (status != 0) { - ERROR("amqp1: pthread_create failed: %s", + ERROR("amqp1 plugin: pthread_create failed: %s", sstrerror(errno, errbuf, sizeof(errbuf))); } else { event_thread_running = 1; @@ -645,28 +680,17 @@ static int amqp1_init(void) /* {{{ */ static int amqp1_shutdown(void) /* {{{ */ { - cd_message_t *cdm; + stopping = true; /* Stop the proactor thread */ - if (event_thread_running != 0) { - finished = true; - /* activate the event thread */ + if (event_thread_running == 1) { + DEBUG("amqp1 plugin: Shutting down proactor thread."); pn_connection_wake(conn); - pthread_join(event_thread_id, NULL /* no return value */); - memset(&event_thread_id, 0, sizeof(event_thread_id)); - } - - /* Free the remaining out_messages */ - cdm = DEQ_HEAD(out_messages); - while (cdm) { - DEQ_REMOVE_HEAD(out_messages); - cd_message_free(cdm); - cdm = DEQ_HEAD(out_messages); } + pthread_join(event_thread_id, NULL /* no return value */); + memset(&event_thread_id, 0, sizeof(event_thread_id)); - if (proactor != NULL) { - pn_proactor_free(proactor); - } + DEBUG("amqp1 plugin: proactor thread exited."); if (transport != NULL) { amqp1_config_transport_free(transport); diff --git a/src/collectd.conf.in b/src/collectd.conf.in index ce1c00bb..4d8403a0 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -277,6 +277,7 @@ # User "guest" # Password "guest" # Address "collectd" +# RetryDelay 1 # # Format JSON # PreSettle false diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 4d6c38e4..f0f51da7 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -756,6 +756,7 @@ B User "guest" Password "guest" Address "collectd" +# RetryDelay 1 Format "command" PreSettle false @@ -806,6 +807,12 @@ default "guest"/"guest" is used. This option specifies the prefix for the send-to value in the message. By default, "collectd" will be used. +=item B I + +When the AMQP1 connection is lost, defines the time in seconds to wait +before attempting to reconnect. Defaults to 1, which implies attempt +to reconnect at 1 second intervals. + =back The following options are accepted within each I block: -- 2.11.0