/* Defines for the delivery mode. I have no idea why they're not defined by the
* library.. */
-#define AMQP_DM_VOLATILE 1
-#define AMQP_DM_PERSISTENT 2
+#define CAMQP_DM_VOLATILE 1
+#define CAMQP_DM_PERSISTENT 2
+
+#define CAMQP_CHANNEL 1
+
+/*
+ * Data types
+ */
+struct camqp_config_s
+{
+ _Bool publish;
+ char *name;
+
+ char *host;
+ int port;
+ char *vhost;
+ char *user;
+ char *password;
+
+ char *exchange;
+ char *exchange_type;
+ char *queue;
+ char *routingkey;
+ uint8_t delivery_mode;
+
+ _Bool store_rates;
+
+ amqp_connection_state_t connection;
+ pthread_mutex_t lock;
+};
+typedef struct camqp_config_s camqp_config_t;
/*
* Global variables
static const char *def_exchange = "amq.fanout";
static const char *def_routingkey = "collectd";
-static char *conf_host = NULL;
-static char *conf_vhost = NULL;
-static char *conf_user = NULL;
-static char *conf_password = NULL;
-static char *conf_exchange = NULL;
-static char *conf_routingkey = NULL;
-static int conf_port = 5672;
-static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
-static _Bool conf_store_rates = 0;
-
-#define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
-
-static amqp_connection_state_t amqp_conn = NULL;
-static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
-
-static const char *config_keys[] =
-{
- "Host",
- "Port",
- "VHost",
- "User",
- "Password",
- "Exchange",
- "RoutingKey",
- "Persistent",
- "StoreRates"
-};
-static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
/*
* Functions
*/
-static int config_set(char **var, const char *value)
+static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
{
- sfree(*var);
- if ((*var = strdup(value)) == NULL)
- return (1);
- return (0);
-} /* int config_set */
+ int sockfd;
+
+ if ((conf == NULL) || (conf->connection == NULL))
+ return;
-static int config(const char *key, const char *value)
+ sockfd = amqp_get_sockfd (conf->connection);
+ amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
+ close (sockfd);
+ conf->connection = NULL;
+} /* }}} void camqp_close_connection */
+
+static void camqp_config_free (void *ptr) /* {{{ */
{
- if (strcasecmp(key, "host") == 0)
- return (config_set(&conf_host, value));
- else if(strcasecmp(key, "port") == 0)
- {
- int tmp;
+ camqp_config_t *conf = ptr;
- tmp = service_name_to_port_number (value);
- if (tmp <= 0)
- {
- ERROR ("AMQP plugin: Cannot parse `%s' as a "
- "service name (port number).", value);
- return (1);
- }
+ if (conf == NULL)
+ return;
- conf_port = tmp;
- return (0);
- }
- else if (strcasecmp(key, "vhost") == 0)
- return (config_set(&conf_vhost, value));
- else if (strcasecmp(key, "user") == 0)
- return (config_set(&conf_user, value));
- else if (strcasecmp(key, "password") == 0)
- return (config_set(&conf_password, value));
- else if (strcasecmp(key, "exchange") == 0)
- return (config_set(&conf_exchange, value));
- else if (strcasecmp(key, "routingkey") == 0)
- return (config_set(&conf_routingkey, value));
- else if (strcasecmp ("Persistent", key) == 0)
- {
- if (IS_TRUE (value))
- conf_delivery_mode = AMQP_DM_PERSISTENT;
- else
- conf_delivery_mode = AMQP_DM_VOLATILE;
- return (0);
- }
- else if (strcasecmp ("StoreRates", key) == 0)
- {
- if (IS_TRUE (value))
- conf_store_rates = 1;
- else
- conf_store_rates = 0;
- return (0);
- }
- return (-1);
-} /* int config */
+ camqp_close_connection (conf);
+
+ sfree (conf->name);
+ sfree (conf->host);
+ sfree (conf->vhost);
+ sfree (conf->user);
+ sfree (conf->password);
+ sfree (conf->exchange);
+ sfree (conf->exchange_type);
+ sfree (conf->queue);
+ sfree (conf->routingkey);
-static int amqp_connect (void)
+ sfree (conf);
+} /* }}} void camqp_config_free */
+
+static int amqp_connect (camqp_config_t *conf) /* {{{ */
{
amqp_rpc_reply_t reply;
int sockfd;
int status;
- if (amqp_conn != NULL)
+ if (conf->connection != NULL)
return (0);
- amqp_conn = amqp_new_connection ();
- if (amqp_conn == NULL)
+ conf->connection = amqp_new_connection ();
+ if (conf->connection == NULL)
{
ERROR ("amqp plugin: amqp_new_connection failed.");
return (ENOMEM);
}
- sockfd = amqp_open_socket (CONF(host), conf_port);
+ sockfd = amqp_open_socket (CONF(conf, host), conf->port);
if (sockfd < 0)
{
char errbuf[1024];
status = (-1) * sockfd;
ERROR ("amqp plugin: amqp_open_socket failed: %s",
sstrerror (status, errbuf, sizeof (errbuf)));
- amqp_destroy_connection(amqp_conn);
- amqp_conn = NULL;
+ amqp_destroy_connection (conf->connection);
+ conf->connection = NULL;
return (status);
}
+ amqp_set_sockfd (conf->connection, sockfd);
- amqp_set_sockfd (amqp_conn, sockfd);
-
- reply = amqp_login (amqp_conn, CONF(vhost),
+ reply = amqp_login (conf->connection, CONF(conf, vhost),
/* channel max = */ 0,
/* frame max = */ 131072,
/* heartbeat = */ 0,
/* authentication = */ AMQP_SASL_METHOD_PLAIN,
- CONF(user), CONF(password));
+ CONF(conf, user), CONF(conf, password));
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
- CONF(vhost), CONF(user));
- amqp_destroy_connection (amqp_conn);
+ CONF(conf, vhost), CONF(conf, user));
+ amqp_destroy_connection (conf->connection);
close (sockfd);
- amqp_conn = NULL;
+ conf->connection = NULL;
return (1);
}
- amqp_channel_open (amqp_conn, /* channel = */ 1);
+ amqp_channel_open (conf->connection, /* channel = */ 1);
/* FIXME: Is checking "reply.reply_type" really correct here? How does
* it get set? --octo */
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
ERROR ("amqp plugin: amqp_channel_open failed.");
- amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection(amqp_conn);
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
close(sockfd);
- amqp_conn = NULL;
+ conf->connection = NULL;
return (1);
}
INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
- "on %s:%i.", CONF(vhost), CONF(host), conf_port);
+ "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
return (0);
-} /* int amqp_connect */
+} /* }}} int amqp_connect */
-static int amqp_write_locked (const char *buffer)
+static int amqp_write_locked (camqp_config_t *conf, /* {{{ */
+ const char *buffer)
{
amqp_basic_properties_t props;
int status;
- status = amqp_connect ();
+ status = amqp_connect (conf);
if (status != 0)
return (status);
| AMQP_BASIC_DELIVERY_MODE_FLAG
| AMQP_BASIC_APP_ID_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
- props.delivery_mode = conf_delivery_mode;
+ props.delivery_mode = conf->delivery_mode;
props.app_id = amqp_cstring_bytes("collectd");
- status = amqp_basic_publish(amqp_conn,
+ status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
- amqp_cstring_bytes(CONF(exchange)),
- amqp_cstring_bytes(CONF(routingkey)),
+ amqp_cstring_bytes(CONF(conf, exchange)),
+ amqp_cstring_bytes(CONF(conf, routingkey)),
/* mandatory = */ 0,
/* immediate = */ 0,
&props,
amqp_cstring_bytes(buffer));
if (status != 0)
{
- int sockfd;
-
ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
status);
-
- sockfd = amqp_get_sockfd (amqp_conn);
- amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
- amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection (amqp_conn);
- close (sockfd);
- amqp_conn = NULL;
+ camqp_close_connection (conf);
}
return (status);
-} /* int amqp_write_locked */
+} /* }}} int amqp_write_locked */
-static int amqp_write (const data_set_t *ds, const value_list_t *vl,
- __attribute__((unused)) user_data_t *user_data)
+static int amqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ user_data_t *user_data)
{
+ camqp_config_t *conf = user_data->data;
char buffer[4096];
size_t bfree;
size_t bfill;
int status;
- if ((ds == NULL) || (vl == NULL))
+ if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
memset (buffer, 0, sizeof (buffer));
bfill = 0;
format_json_initialize (buffer, &bfill, &bfree);
- format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
format_json_finalize (buffer, &bfill, &bfree);
- pthread_mutex_lock (&amqp_conn_lock);
- status = amqp_write_locked (buffer);
- pthread_mutex_unlock (&amqp_conn_lock);
+ pthread_mutex_lock (&conf->lock);
+ status = amqp_write_locked (conf, buffer);
+ pthread_mutex_unlock (&conf->lock);
return (status);
-} /* int amqp_write */
+} /* }}} int amqp_write */
-static int shutdown (void)
+static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
+ _Bool publish)
{
- pthread_mutex_lock (&amqp_conn_lock);
- if (amqp_conn != NULL)
+ camqp_config_t *conf;
+ int status;
+ int i;
+
+ conf = malloc (sizeof (*conf));
+ if (conf == NULL)
{
- int sockfd;
+ ERROR ("amqp plugin: malloc failed.");
+ return (ENOMEM);
+ }
- sockfd = amqp_get_sockfd (amqp_conn);
- amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
- amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection (amqp_conn);
- close(sockfd);
- amqp_conn = NULL;
+ /* Initialize "conf" {{{ */
+ memset (conf, 0, sizeof (*conf));
+ conf->publish = publish;
+ conf->name = NULL;
+ conf->host = NULL;
+ conf->port = 5672;
+ conf->vhost = NULL;
+ conf->user = NULL;
+ conf->password = NULL;
+ conf->exchange = NULL;
+ conf->exchange_type = NULL;
+ conf->queue = NULL;
+ conf->routingkey = NULL;
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ conf->store_rates = 0;
+ conf->connection = NULL;
+ pthread_mutex_init (&conf->lock, /* attr = */ NULL);
+ /* }}} */
+
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ sfree (conf);
+ return (status);
}
- pthread_mutex_unlock (&amqp_conn_lock);
- sfree(conf_host);
- sfree(conf_vhost);
- sfree(conf_user);
- sfree(conf_password);
- sfree(conf_exchange);
- sfree(conf_routingkey);
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ status = cf_util_get_port_number (child);
+ if (status > 0)
+ {
+ conf->port = status;
+ status = 0;
+ }
+ }
+ else if (strcasecmp ("VHost", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->vhost);
+ else if (strcasecmp ("User", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->user);
+ else if (strcasecmp ("Password", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->password);
+ else if (strcasecmp ("Exchange", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->exchange);
+ else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
+ status = cf_util_get_string (ci, &conf->exchange_type);
+ else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
+ status = cf_util_get_string (ci, &conf->queue);
+ else if (strcasecmp ("RoutingKey", child->key) == 0)
+ status = cf_util_get_string (ci, &conf->routingkey);
+ else if (strcasecmp ("Persistent", child->key) == 0)
+ {
+ _Bool tmp = 0;
+ status = cf_util_get_boolean (ci, &tmp);
+ if (tmp)
+ conf->delivery_mode = CAMQP_DM_PERSISTENT;
+ else
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ }
+ else if (strcasecmp ("StoreRates", child->key) == 0)
+ status = cf_util_get_boolean (ci, &conf->store_rates);
+ else
+ WARNING ("amqp plugin: Ignoring unknown "
+ "configuration option \"%s\".", child->key);
+
+ if (status != 0)
+ break;
+ } /* for (i = 0; i < ci->children_num; i++) */
+
+ if ((status == 0) && !publish && (conf->exchange == NULL))
+ {
+ if (conf->routingkey != NULL)
+ WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+
+ if (conf->exchange_type != NULL)
+ WARNING ("amqp plugin: The option \"ExchangeType\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+ }
+
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+
+ if (publish)
+ {
+ char cbname[128];
+ user_data_t ud = { conf, camqp_config_free };
+
+ ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
+
+ status = plugin_register_write (cbname, amqp_write, &ud);
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+ }
+
+ return (0);
+} /* }}} int camqp_config_connection */
+
+static int camqp_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Publish", child->key) == 0)
+ camqp_config_connection (child, /* publish = */ 1);
+ else
+ WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
+ child->key);
+ } /* for (ci->children_num) */
+
+ return (0);
+} /* }}} int camqp_config */
+
+static int shutdown (void) /* {{{ */
+{
+ /* FIXME: Set a global shutdown variable here. */
return (0);
-} /* int shutdown */
+} /* }}} int shutdown */
void module_register (void)
{
- plugin_register_config ("amqp", config, config_keys, config_keys_num);
- plugin_register_write ("amqp", amqp_write, NULL);
+ plugin_register_complex_config ("amqp", camqp_config);
plugin_register_shutdown ("amqp", shutdown);
} /* void module_register */
-/* vim: set sw=4 sts=4 et : */
+/* vim: set sw=4 sts=4 et fdm=marker : */