return (-1);
}
-static int amqp_write_locked (const char *buffer)
+static int amqp_connect (void)
{
amqp_rpc_reply_t reply;
- amqp_basic_properties_t props;
+ int sockfd;
int status;
+ if (amqp_conn != NULL)
+ return (0);
+
+ amqp_conn = amqp_new_connection ();
if (amqp_conn == NULL)
{
- int sockfd;
+ ERROR ("amqp plugin: amqp_new_connection failed.");
+ return (ENOMEM);
+ }
- amqp_conn = amqp_new_connection ();
- if (amqp_conn == NULL)
- {
- ERROR ("amqp plugin: amqp_new_connection failed.");
- return (ENOMEM);
- }
+ sockfd = amqp_open_socket (host, port);
+ if (sockfd < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * sockfd;
+ ERROR ("amqp plugin: amqp_open_socket failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ amqp_destroy_connection(amqp_conn);
+ amqp_conn = NULL;
+ return (status);
+ }
- sockfd = amqp_open_socket (host, port);
- if (sockfd < 0)
- {
- char errbuf[1024];
- status = (-1) * sockfd;
- ERROR ("amqp plugin: amqp_open_socket failed: %s",
- sstrerror (status, errbuf, sizeof (errbuf)));
- amqp_destroy_connection(amqp_conn);
- amqp_conn = NULL;
- return (status);
- }
+ amqp_set_sockfd (amqp_conn, sockfd);
- amqp_set_sockfd (amqp_conn, sockfd);
+ reply = amqp_login(amqp_conn, vhost,
+ /* channel max = */ 0,
+ /* frame max = */ 131072,
+ /* heartbeat = */ 0,
+ /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+ vhost, user);
+ amqp_destroy_connection(amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
+ return (1);
+ }
- reply = amqp_login(amqp_conn, vhost,
- /* channel max = */ 0,
- /* frame max = */ 131072,
- /* heartbeat = */ 0,
- /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
- {
- ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
- vhost, user);
- amqp_destroy_connection(amqp_conn);
- close(sockfd);
- amqp_conn = NULL;
- return (1);
- }
+ amqp_channel_open (amqp_conn, /* channel = */ 1);
+ /* FIXME: Is checking "reply.reply_type" really correct here? How does
+ * it get set? --octo */
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_channel_open failed.");
+ amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
+ return (1);
+ }
- amqp_channel_open (amqp_conn, /* channel = */ 1);
- /* FIXME: Is checking "reply.reply_type" really correct here? How does
- * it get set? --octo */
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
- {
- ERROR ("amqp plugin: amqp_channel_open failed.");
- amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection(amqp_conn);
- close(sockfd);
- amqp_conn = NULL;
- return (1);
- }
+ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+ "on %s:%i.", vhost, host, port);
+ return (0);
+} /* int amqp_connect */
- INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
- "on %s:%i.", vhost, host, port);
- } /* if (amqp_conn == NULL) */
+static int amqp_write_locked (const char *buffer)
+{
+ amqp_basic_properties_t props;
+ int status;
+
+ status = amqp_connect ();
+ if (status != 0)
+ return (status);
+ memset (&props, 0, sizeof (props));
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = delivery_mode;
amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (amqp_conn);
- close(sockfd);
+ close (sockfd);
amqp_conn = NULL;
}