amqp_connection_state_t conn;
amqp_basic_properties_t props;
+ /* TODO: Don't create a new connection for each value that is to be dispatched. */
conn = amqp_new_connection();
if ((sockfd = amqp_open_socket(host, port)) < 0)
{
+ ERROR ("amqp plugin: amqp_open_socket failed.");
amqp_destroy_connection(conn);
return (1);
}
/* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
+ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+ vhost, user);
amqp_destroy_connection(conn);
close(sockfd);
return (1);
amqp_channel_open(conn, 1);
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
+ ERROR ("amqp plugin: amqp_channel_open failed.");
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
close(sockfd);
/* immediate = */ 0,
&props,
amqp_cstring_bytes(buffer));
+ if (error != 0)
+ {
+ ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
+ error);
+ }
+
reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
error = 1;