cdm->mbuf.size = BUFSIZE;
int status;
+ char *start;
while ((status = pn_message_encode(message, cdm->mbuf.start,
&cdm->mbuf.size)) == PN_OVERFLOW) {
DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
cdm->mbuf.size *= 2;
- cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ if (start == NULL) {
+ status = -1;
+ break;
+ } else {
+ cdm->mbuf.start = start;
+ }
}
if (status != 0) {
ERROR("amqp1 plugin: error encoding message: %s",
pn_error_text(pn_message_error(message)));
pn_message_free(message);
- cd_message_free(cdm);
return -1;
}
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
{
+ int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
size_t bufsize = BUFSIZE;
}
cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL ){
+ ERROR("amqp1 plugin: notify failed");
+ return -1;
+ }
+
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
case AMQP1_FORMAT_JSON:
format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
- int status = format_json_notification(cdm->mbuf.start, bufsize, n);
+ status = format_json_notification(cdm->mbuf.start, bufsize, n);
if (status != 0) {
ERROR("amqp1 plugin: formatting notification failed");
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: notify format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
/* encode message and place on outbound queue */
- return encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: notify enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
} /* }}} int amqp1_notify */
}
cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ return -1;
+ }
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
if (status != 0) {
ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format cmd failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
instance->store_rates);
- format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ status= format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ if (status != 0) {
+ ERROR("amqp1 plugin: format_json_finalize failed with status %i.", status);
+ cd_message_free(cdm);
+ return(status);
+ }
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format graphite failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_GRAPHITE:
status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
instance->escape_char, instance->graphite_flags);
if (status != 0) {
ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format graphite failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
/* encode message and place on outbound queue */
- return encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: write enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
} /* }}} int amqp1_write */
sfree(transport->name);
sfree(transport->host);
+ sfree(transport->port);
sfree(transport->user);
sfree(transport->password);
sfree(transport->address);
} else
WARNING("amqp1 plugin: Ignoring unknown "
"instance configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
break;
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
if (strcasecmp("Transport", child->key) == 0)
amqp1_config_transport(child);
else
- WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+ WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
child->key);
}