char *user;
char *password;
char *address;
+ int retry_delay;
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_t {
* Globals
*/
pn_connection_t *conn = NULL;
-pn_session_t *ssn = NULL;
pn_link_t *sender = NULL;
pn_proactor_t *proactor = NULL;
pthread_mutex_t send_lock;
uint64_t cd_tag = 1;
uint64_t acknowledged = 0;
amqp1_config_transport_t *transport = NULL;
-bool finished = false;
+static bool stopping = false;
static int event_thread_running = 0;
static pthread_t event_thread_id;
int event_count = 0;
pn_delivery_t *dlv;
+ if (stopping){
+ return 0;
+ }
+
DEQ_INIT(to_send);
pthread_mutex_lock(&send_lock);
return event_count;
} /* }}} int amqp1_send_out_messages */
+
static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
{
if (pn_condition_is_set(cond)) {
conn = pn_event_connection(event);
pn_connection_set_container(conn, transport->address);
pn_connection_open(conn);
- ssn = pn_session(conn);
+ pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
sender = pn_sender(ssn, "cd-sender");
pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
}
case PN_CONNECTION_WAKE: {
- if (!finished) {
+ if (!stopping) {
amqp1_send_out_messages(sender);
}
break;
static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
{
+ char addr[PN_MAX_ADDR];
+ cd_message_t *cdm;
+
+ /* setup proactor */
+ proactor = pn_proactor();
+ pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
+
+ while (!stopping) {
+ /* make connection */
+ conn = pn_connection();
+ if (transport->user != NULL) {
+ pn_connection_set_user(conn, transport->user);
+ pn_connection_set_password(conn, transport->password);
+ }
+ pn_proactor_connect(proactor, conn, addr);
- do {
- pn_event_batch_t *events = pn_proactor_wait(proactor);
- pn_event_t *e;
- for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
- if (!handle(e)) {
- finished = true;
+ bool engine_running = true;
+ while (engine_running && !stopping) {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_t *e;
+ while (( e = pn_event_batch_next(events))){
+ engine_running = handle(e);
+ if (!engine_running) {
+ break;
+ }
}
+ pn_proactor_done(proactor, events);
+ }
+
+ pn_proactor_release_connection(conn);
+
+ DEBUG("amqp1 plugin: retrying connection");
+ int delay = transport->retry_delay;
+ while (delay-- > 0 && !stopping) {
+ sleep(1.0);
}
- pn_proactor_done(proactor, events);
- } while (!finished);
+ }
+
+ pn_proactor_disconnect(proactor, NULL);
+
+ /* Free the remaining out_messages */
+ cdm = DEQ_HEAD(out_messages);
+ while (cdm) {
+ DEQ_REMOVE_HEAD(out_messages);
+ cd_message_free(cdm);
+ cdm = DEQ_HEAD(out_messages);
+ }
event_thread_running = 0;
/* Initialize transport configuration {{{ */
transport->name = NULL;
+ transport->retry_delay = 1;
status = cf_util_get_string(ci, &transport->name);
if (status != 0) {
status = cf_util_get_string(child, &transport->password);
else if (strcasecmp("Address", child->key) == 0)
status = cf_util_get_string(child, &transport->address);
+ else if (strcasecmp("RetryDelay", child->key) == 0)
+ status = cf_util_get_int(child, &transport->retry_delay);
else if (strcasecmp("Instance", child->key) == 0)
amqp1_config_instance(child);
else
static int amqp1_init(void) /* {{{ */
{
- char addr[PN_MAX_ADDR];
int status;
char errbuf[1024];
if (proactor == NULL) {
pthread_mutex_init(&send_lock, /* attr = */ NULL);
- proactor = pn_proactor();
- pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
- conn = pn_connection();
- if (transport->user != NULL) {
- pn_connection_set_user(conn, transport->user);
- pn_connection_set_password(conn, transport->password);
- }
- pn_proactor_connect(proactor, conn, addr);
/* start_thread */
status =
plugin_thread_create(&event_thread_id, NULL /* no attributes */,
event_thread, NULL /* no argument */, "handle");
if (status != 0) {
- ERROR("amqp1: pthread_create failed: %s",
+ ERROR("amqp1 plugin: pthread_create failed: %s",
sstrerror(errno, errbuf, sizeof(errbuf)));
} else {
event_thread_running = 1;
static int amqp1_shutdown(void) /* {{{ */
{
- cd_message_t *cdm;
+ stopping = true;
/* Stop the proactor thread */
- if (event_thread_running != 0) {
- finished = true;
- /* activate the event thread */
+ if (event_thread_running == 1) {
+ DEBUG("amqp1 plugin: Shutting down proactor thread.");
pn_connection_wake(conn);
- pthread_join(event_thread_id, NULL /* no return value */);
- memset(&event_thread_id, 0, sizeof(event_thread_id));
- }
-
- /* Free the remaining out_messages */
- cdm = DEQ_HEAD(out_messages);
- while (cdm) {
- DEQ_REMOVE_HEAD(out_messages);
- cd_message_free(cdm);
- cdm = DEQ_HEAD(out_messages);
}
+ pthread_join(event_thread_id, NULL /* no return value */);
+ memset(&event_thread_id, 0, sizeof(event_thread_id));
- if (proactor != NULL) {
- pn_proactor_free(proactor);
- }
+ DEBUG("amqp1 plugin: proactor thread exited.");
if (transport != NULL) {
amqp1_config_transport_free(transport);