#include "collectd.h"
#include "common.h"
#include "plugin.h"
+#include "utils_cmd_putval.h"
#include "utils_format_json.h"
#include <amqp.h>
#define CAMQP_DM_VOLATILE 1
#define CAMQP_DM_PERSISTENT 2
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON 2
+
#define CAMQP_CHANNEL 1
/*
{
_Bool publish;
char *name;
+ int format;
char *host;
int port;
return (0);
} /* }}} int camqp_connect */
+static int shutdown (void) /* {{{ */
+{
+ size_t i;
+
+ DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+ subscriber_threads_num);
+
+ subscriber_threads_running = 0;
+ for (i = 0; i < subscriber_threads_num; i++)
+ {
+ /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+ * to use a timeout in the thread and check for the variable in regular
+ * intervals. */
+ pthread_kill (subscriber_threads[i], SIGTERM);
+ pthread_join (subscriber_threads[i], /* retval = */ NULL);
+ }
+
+ subscriber_threads_num = 0;
+ sfree (subscriber_threads);
+
+ DEBUG ("amqp plugin: All subscriber threads exited.");
+
+ return (0);
+} /* }}} int shutdown */
+
+/*
+ * Subscribing code
+ */
static int camqp_read_body (camqp_config_t *conf, /* {{{ */
size_t body_size)
{
return (0);
} /* }}} int camqp_subscribe_init */
+/*
+ * Publishing code
+ */
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
const char *buffer)
{
bfree = sizeof (buffer);
bfill = 0;
- format_json_initialize (buffer, &bfill, &bfree);
- format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
- format_json_finalize (buffer, &bfill, &bfree);
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ {
+ status = create_putval (buffer, sizeof (buffer), ds, vl);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: create_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ }
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ {
+ format_json_initialize (buffer, &bfill, &bfree);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+ format_json_finalize (buffer, &bfill, &bfree);
+ }
+ else
+ {
+ ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+ return (-1);
+ }
pthread_mutex_lock (&conf->lock);
status = camqp_write_locked (conf, buffer);
return (status);
} /* }}} int camqp_write */
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+ camqp_config_t *conf)
+{
+ char *string;
+ int status;
+
+ string = NULL;
+ status = cf_util_get_string (ci, &string);
+ if (status != 0)
+ return (status);
+
+ assert (string != NULL);
+ if (strcasecmp ("Command", string) == 0)
+ conf->format = CAMQP_FORMAT_COMMAND;
+ else if (strcasecmp ("JSON", string) == 0)
+ conf->format = CAMQP_FORMAT_JSON;
+ else
+ {
+ WARNING ("amqp plugin: Invalid format string: %s",
+ string);
+ }
+
+ free (string);
+
+ return (0);
+} /* }}} int config_set_string */
+
static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
_Bool publish)
{
memset (conf, 0, sizeof (*conf));
conf->publish = publish;
conf->name = NULL;
+ conf->format = CAMQP_FORMAT_COMMAND;
conf->host = NULL;
conf->port = 5672;
conf->vhost = NULL;
{
oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Host", child->key) == 0)
+ if (strcasecmp ("Format", child->key) == 0)
+ status = camqp_config_set_format (child, conf);
+ else if (strcasecmp ("Host", child->key) == 0)
status = cf_util_get_string (child, &conf->host);
else if (strcasecmp ("Port", child->key) == 0)
{
return (0);
} /* }}} int camqp_config */
-static int shutdown (void) /* {{{ */
-{
- size_t i;
-
- DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
- subscriber_threads_num);
-
- subscriber_threads_running = 0;
- for (i = 0; i < subscriber_threads_num; i++)
- {
- /* FIXME: Sending a signal is not very elegant here. Maybe find out how
- * to use a timeout in the thread and check for the variable in regular
- * intervals. */
- pthread_kill (subscriber_threads[i], SIGTERM);
- pthread_join (subscriber_threads[i], /* retval = */ NULL);
- }
-
- subscriber_threads_num = 0;
- sfree (subscriber_threads);
-
- DEBUG ("amqp plugin: All subscriber threads exited.");
-
- return (0);
-} /* }}} int shutdown */
-
void module_register (void)
{
plugin_register_complex_config ("amqp", camqp_config);