+static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {{{ */
+{
+ size_t bufsize = BUFSIZE;
+ pn_data_t *body;
+ pn_message_t *message;
+
+ /* encode message */
+ message = pn_message();
+ pn_message_set_address(message, instance->send_to);
+ body = pn_message_body(message);
+ pn_data_clear(body);
+ pn_data_put_binary(body, cdm->mbuf);
+ pn_data_exit(body);
+
+ /* put_binary copies and stores so ok to use mbuf */
+ cdm->mbuf.size = bufsize;
+ pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+
+ pthread_mutex_lock(&send_lock);
+ DEQ_INSERT_TAIL(out_messages, cdm);
+ pthread_mutex_unlock(&send_lock);
+
+ pn_message_free(message);
+
+ /* activate the sender */
+ if (conn != NULL) {
+ pn_connection_wake(conn);
+ }
+
+} /* }}} void encqueue */
+
+static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */
+{
+ amqp1_config_instance_t *instance;
+ int status = 0;
+ size_t bfree = BUFSIZE;
+ size_t bfill = 0;
+ cd_message_t *cdm;
+ size_t bufsize = BUFSIZE;
+
+ if ((n == NULL) || (user_data == NULL))
+ return EINVAL;
+
+ instance = user_data->data;
+
+ if (instance->notify != true) {
+ ERROR("amqp1 plugin: write notification failed");
+ }
+
+ cdm = NEW(cd_message_t);
+ DEQ_ITEM_INIT(cdm);
+ cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
+ cdm->instance = instance;
+
+ switch (instance->format) {
+ case AMQP1_FORMAT_JSON:
+ format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
+ status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+ if (status != 0) {
+ ERROR("amqp1 plugin: formatting notification failed");
+ return status;
+ }
+ cdm->mbuf.size = strlen(cdm->mbuf.start);
+ break;
+ default:
+ ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+ return -1;
+ }
+
+ /* encode message and place on outbound queue */
+ encqueue(cdm, instance);
+
+ return 0;
+} /* }}} int amqp1_notify */
+