From 3ae42434d74ae3c815b252f393bd0cfe59be83b9 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Wed, 4 Aug 2010 23:08:16 +0200 Subject: [PATCH] amqp plugin: Put the connecting code into a separate function. --- src/amqp.c | 112 +++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 61 insertions(+), 51 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index a4324253..488fbac7 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -119,69 +119,79 @@ static int config(const char *key, const char *value) return (-1); } -static int amqp_write_locked (const char *buffer) +static int amqp_connect (void) { amqp_rpc_reply_t reply; - amqp_basic_properties_t props; + int sockfd; int status; + if (amqp_conn != NULL) + return (0); + + amqp_conn = amqp_new_connection (); if (amqp_conn == NULL) { - int sockfd; + ERROR ("amqp plugin: amqp_new_connection failed."); + return (ENOMEM); + } - amqp_conn = amqp_new_connection (); - if (amqp_conn == NULL) - { - ERROR ("amqp plugin: amqp_new_connection failed."); - return (ENOMEM); - } + sockfd = amqp_open_socket (host, port); + if (sockfd < 0) + { + char errbuf[1024]; + status = (-1) * sockfd; + ERROR ("amqp plugin: amqp_open_socket failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection(amqp_conn); + amqp_conn = NULL; + return (status); + } - sockfd = amqp_open_socket (host, port); - if (sockfd < 0) - { - char errbuf[1024]; - status = (-1) * sockfd; - ERROR ("amqp plugin: amqp_open_socket failed: %s", - sstrerror (status, errbuf, sizeof (errbuf))); - amqp_destroy_connection(amqp_conn); - amqp_conn = NULL; - return (status); - } + amqp_set_sockfd (amqp_conn, sockfd); - amqp_set_sockfd (amqp_conn, sockfd); + reply = amqp_login(amqp_conn, vhost, + /* channel max = */ 0, + /* frame max = */ 131072, + /* heartbeat = */ 0, + /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", + vhost, user); + amqp_destroy_connection(amqp_conn); + close(sockfd); + amqp_conn = NULL; + return (1); + } - reply = amqp_login(amqp_conn, vhost, - /* channel max = */ 0, - /* frame max = */ 131072, - /* heartbeat = */ 0, - /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password); - if (reply.reply_type != AMQP_RESPONSE_NORMAL) - { - ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", - vhost, user); - amqp_destroy_connection(amqp_conn); - close(sockfd); - amqp_conn = NULL; - return (1); - } + amqp_channel_open (amqp_conn, /* channel = */ 1); + /* FIXME: Is checking "reply.reply_type" really correct here? How does + * it get set? --octo */ + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_channel_open failed."); + amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(amqp_conn); + close(sockfd); + amqp_conn = NULL; + return (1); + } - amqp_channel_open (amqp_conn, /* channel = */ 1); - /* FIXME: Is checking "reply.reply_type" really correct here? How does - * it get set? --octo */ - if (reply.reply_type != AMQP_RESPONSE_NORMAL) - { - ERROR ("amqp plugin: amqp_channel_open failed."); - amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(amqp_conn); - close(sockfd); - amqp_conn = NULL; - return (1); - } + INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " + "on %s:%i.", vhost, host, port); + return (0); +} /* int amqp_connect */ - INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " - "on %s:%i.", vhost, host, port); - } /* if (amqp_conn == NULL) */ +static int amqp_write_locked (const char *buffer) +{ + amqp_basic_properties_t props; + int status; + + status = amqp_connect (); + if (status != 0) + return (status); + memset (&props, 0, sizeof (props)); props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("application/json"); props.delivery_mode = delivery_mode; @@ -205,7 +215,7 @@ static int amqp_write_locked (const char *buffer) amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection (amqp_conn); - close(sockfd); + close (sockfd); amqp_conn = NULL; } -- 2.11.0