static int encqueue(cd_message_t *cdm,
amqp1_config_instance_t *instance) /* {{{ */
{
- size_t bufsize = BUFSIZE;
- pn_data_t *body;
- pn_message_t *message;
- int status = 0;
-
/* encode message */
- message = pn_message();
+ pn_message_t *message = pn_message();
pn_message_set_address(message, instance->send_to);
- body = pn_message_body(message);
+ pn_data_t *body = pn_message_body(message);
pn_data_clear(body);
pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
pn_data_exit(body);
/* put_binary copies and stores so ok to use mbuf */
- cdm->mbuf.size = bufsize;
- while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+ cdm->mbuf.size = BUFSIZE;
+
+ int status;
+ while ((status = pn_message_encode(message, cdm->mbuf.start,
&cdm->mbuf.size)) == PN_OVERFLOW) {
- DEBUG("amqp1 plugin: increasing message buffer size %i",
- (int)cdm->mbuf.size);
+ DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
cdm->mbuf.size *= 2;
cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
}
pn_message_free(message);
/* activate the sender */
- if (conn != NULL) {
+ if (conn) {
pn_connection_wake(conn);
}
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
{
- amqp1_config_instance_t *instance;
- int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
- cd_message_t *cdm;
size_t bufsize = BUFSIZE;
- if ((n == NULL) || (user_data == NULL))
+ if (n == NULL || user_data == NULL)
return EINVAL;
- instance = user_data->data;
+ amqp1_config_instance_t *instance = user_data->data;
if (instance->notify != true) {
ERROR("amqp1 plugin: write notification failed");
}
- cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+ cd_message_t *cdm = malloc(sizeof(*cdm));
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
case AMQP1_FORMAT_JSON:
- format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
- status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+ format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+ int status = format_json_notification(cdm->mbuf.start, bufsize, n);
if (status != 0) {
ERROR("amqp1 plugin: formatting notification failed");
return status;
}
/* encode message and place on outbound queue */
- status = encqueue(cdm, instance);
+ return encqueue(cdm, instance);
- return status;
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data) {
- amqp1_config_instance_t *instance;
int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
- cd_message_t *cdm;
size_t bufsize = BUFSIZE;
- if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
- (user_data == NULL))
+ if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
return EINVAL;
- instance = user_data->data;
+ amqp1_config_instance_t *instance = user_data->data;
if (instance->notify != false) {
ERROR("amqp1 plugin: write failed");
}
- cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+ cd_message_t *cdm = malloc(sizeof(*cdm));
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
return -1;
}
- /* encode message and place on outboud queue */
- encqueue(cdm, instance);
+ /* encode message and place on outbound queue */
+ return encqueue(cdm, instance);
- return 0;
} /* }}} int amqp1_write */
static void amqp1_config_transport_free(void *ptr) /* {{{ */
static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
{
- int status = 0;
- char *key = NULL;
- amqp1_config_instance_t *instance;
-
- instance = calloc(1, sizeof(*instance));
+ amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
if (instance == NULL) {
ERROR("amqp1 plugin: calloc failed.");
return ENOMEM;
}
- status = cf_util_get_string(ci, &instance->name);
+ int status = cf_util_get_string(ci, &instance->name);
if (status != 0) {
sfree(instance);
return status;
else if (strcasecmp("Notify", child->key) == 0)
status = cf_util_get_boolean(child, &instance->notify);
else if (strcasecmp("Format", child->key) == 0) {
+ char *key = NULL;
status = cf_util_get_string(child, &key);
if (status != 0)
return status;
- /* TODO: goto errout */
- // goto errout;
assert(key != NULL);
if (strcasecmp(key, "Command") == 0) {
instance->format = AMQP1_FORMAT_COMMAND;
ERROR("amqp1 plugin: send_to address would have been truncated.");
return -1;
}
- if (instance->notify == true) {
+ if (instance->notify) {
status = plugin_register_notification(
tpname, amqp1_notify,
&(user_data_t){
static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
{
- int status = 0;
-
transport = calloc(1, sizeof(*transport));
if (transport == NULL) {
ERROR("amqp1 plugin: calloc failed.");
/* Initialize transport configuration {{{ */
transport->retry_delay = 1;
- status = cf_util_get_string(ci, &transport->name);
+ int status = cf_util_get_string(ci, &transport->name);
if (status != 0) {
sfree(transport);
return status;
static int amqp1_init(void) /* {{{ */
{
- int status;
- char errbuf[1024];
-
if (transport == NULL) {
ERROR("amqp1: init failed, no transport configured");
return -1;
if (proactor == NULL) {
pthread_mutex_init(&send_lock, /* attr = */ NULL);
/* start_thread */
- status =
+ int status =
plugin_thread_create(&event_thread_id, NULL /* no attributes */,
event_thread, NULL /* no argument */, "handle");
if (status != 0) {
- ERROR("amqp1 plugin: pthread_create failed: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
} else {
event_thread_running = true;
}
DEBUG("amqp1 plugin: proactor thread exited.");
- if (transport != NULL) {
+ if (transport) {
amqp1_config_transport_free(transport);
}