+ /* encode message */
+ pn_message_t *message = pn_message();
+ pn_message_set_address(message, instance->send_to);
+ pn_data_t *body = pn_message_body(message);
+ pn_data_clear(body);
+ pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
+ pn_data_exit(body);
+
+ /* put_binary copies and stores so ok to use mbuf */
+ cdm->mbuf.size = BUFSIZE;
+
+ int status;
+ while ((status = pn_message_encode(message, cdm->mbuf.start,
+ &cdm->mbuf.size)) == PN_OVERFLOW) {
+ DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
+ cdm->mbuf.size *= 2;
+ cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ }
+
+ if (status != 0) {
+ ERROR("amqp1 plugin: error encoding message: %s",
+ pn_error_text(pn_message_error(message)));
+ pn_message_free(message);
+ cd_message_free(cdm);
+ return -1;
+ }
+
+ pthread_mutex_lock(&send_lock);
+ DEQ_INSERT_TAIL(out_messages, cdm);
+ pthread_mutex_unlock(&send_lock);
+
+ pn_message_free(message);
+
+ /* activate the sender */
+ if (conn) {
+ pn_connection_wake(conn);
+ }
+
+ return 0;
+} /* }}} int encqueue */
+
+static int amqp1_notify(notification_t const *n,
+ user_data_t *user_data) /* {{{ */
+{
+ size_t bfree = BUFSIZE;
+ size_t bfill = 0;
+ size_t bufsize = BUFSIZE;
+
+ if (n == NULL || user_data == NULL)
+ return EINVAL;
+