#define AMQP1_FORMAT_COMMAND 1
#define AMQP1_FORMAT_GRAPHITE 2
-typedef struct amqp1_config_transport_t {
- DEQ_LINKS(struct amqp1_config_transport_t);
+typedef struct amqp1_config_transport_s {
+ DEQ_LINKS(struct amqp1_config_transport_s);
char *name;
char *host;
char *port;
int retry_delay;
} amqp1_config_transport_t;
-typedef struct amqp1_config_instance_t {
- DEQ_LINKS(struct amqp1_config_instance_t);
+typedef struct amqp1_config_instance_s {
+ DEQ_LINKS(struct amqp1_config_instance_s);
char *name;
- _Bool notify;
+ bool notify;
uint8_t format;
unsigned int graphite_flags;
- _Bool store_rates;
+ bool store_rates;
char *prefix;
char *postfix;
char escape_char;
- _Bool pre_settle;
+ bool pre_settle;
char send_to[1024];
} amqp1_config_instance_t;
DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
-typedef struct cd_message_t {
- DEQ_LINKS(struct cd_message_t);
+typedef struct cd_message_s {
+ DEQ_LINKS(struct cd_message_s);
pn_rwbytes_t mbuf;
amqp1_config_instance_t *instance;
} cd_message_t;
/*
* Globals
*/
-pn_connection_t *conn = NULL;
-pn_link_t *sender = NULL;
-pn_proactor_t *proactor = NULL;
-pthread_mutex_t send_lock;
-cd_message_list_t out_messages;
-uint64_t cd_tag = 1;
-uint64_t acknowledged = 0;
-amqp1_config_transport_t *transport = NULL;
-
+static pn_connection_t *conn = NULL;
+static pn_link_t *sender = NULL;
+static pn_proactor_t *proactor = NULL;
+static pthread_mutex_t send_lock;
+static cd_message_list_t out_messages;
+static uint64_t cd_tag = 1;
+static uint64_t acknowledged = 0;
+static amqp1_config_transport_t *transport = NULL;
static bool stopping = false;
static int event_thread_running = 0;
static pthread_t event_thread_id;
* Functions
*/
static void cd_message_free(cd_message_t *cdm) {
- if (cdm->mbuf.start) {
- free((void *)cdm->mbuf.start);
- }
+ free(cdm->mbuf.start);
free(cdm);
} /* }}} void cd_message_free */
ERROR("amqp1 plugin: write notification failed");
}
- cdm = NEW(cd_message_t);
+ cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
DEQ_ITEM_INIT(cdm);
cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
ERROR("amqp1 plugin: write failed");
}
- cdm = NEW(cd_message_t);
+ cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
DEQ_ITEM_INIT(cdm);
cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
return ENOMEM;
}
- /* Initialize instance configuration {{{ */
- instance->name = NULL;
-
status = cf_util_get_string(ci, &instance->name);
if (status != 0) {
sfree(instance);
else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
char *tmp_buff = NULL;
status = cf_util_get_string(child, &tmp_buff);
- if (strlen(tmp_buff) > 1)
- WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
- "only one character. Others will be ignored.");
- instance->escape_char = tmp_buff[0];
+ if (status == 0) {
+ if (strlen(tmp_buff) > 1)
+ WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+ "only one character. Others will be ignored.");
+ instance->escape_char = tmp_buff[0];
+ }
sfree(tmp_buff);
} else
WARNING("amqp1 plugin: Ignoring unknown "
amqp1_config_instance_free(instance);
return status;
} else {
- char tpname[1024];
- int status;
+ char tpname[DATA_MAX_NAME_LEN];
status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
if ((status < 0) || (size_t)status >= sizeof(tpname)) {
ERROR("amqp1 plugin: Instance name would have been truncated.");
}
/* Initialize transport configuration {{{ */
- transport->name = NULL;
transport->retry_delay = 1;
status = cf_util_get_string(ci, &transport->name);
if (strcasecmp("Transport", child->key) == 0)
amqp1_config_transport(child);
else
- WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
+ WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
child->key);
}