From 91c1ebab3b54ca32b34c4658dfaa7fa96830db7c Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Mon, 15 Jan 2018 09:09:21 -0500 Subject: [PATCH] Add notify handler --- src/amqp1.c | 153 ++++++++++++++++++++++++++++++++++++-------------- src/collectd.conf.pod | 18 ++++-- 2 files changed, 124 insertions(+), 47 deletions(-) diff --git a/src/amqp1.c b/src/amqp1.c index dcd17dd0..24746644 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -67,6 +67,7 @@ typedef struct amqp1_config_transport_t { typedef struct amqp1_config_instance_t { DEQ_LINKS(struct amqp1_config_instance_t); char *name; + _Bool notify; uint8_t format; unsigned int graphite_flags; _Bool store_rates; @@ -87,7 +88,7 @@ typedef struct cd_message_t { DEQ_DECLARE(cd_message_t, cd_message_list_t); -/* +/* * Globals */ pn_connection_t *conn = NULL; @@ -153,7 +154,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ pn_link_advance(link); if (cdm->instance->pre_settle == true) { pn_delivery_settle(dlv); - } + } event_count++; cd_message_free(cdm); cdm = DEQ_HEAD(to_send); @@ -212,7 +213,7 @@ static bool handle(pn_event_t *event) /* {{{ */ } break; } - + case PN_TRANSPORT_CLOSED: { check_condition(event, pn_transport_condition(pn_event_transport(event))); break; @@ -235,7 +236,7 @@ static bool handle(pn_event_t *event) /* {{{ */ check_condition(event, pn_link_remote_condition(pn_event_link(event))); pn_connection_close(pn_event_connection(event)); break; - } + } case PN_PROACTOR_INACTIVE: { return false; @@ -265,21 +266,100 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ return NULL; } /* }}} void event_thread */ +static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {{{ */ +{ + size_t bufsize = BUFSIZE; + pn_data_t *body; + pn_message_t *message; + + /* encode message */ + message = pn_message(); + pn_message_set_address(message, instance->send_to); + body = pn_message_body(message); + pn_data_clear(body); + pn_data_put_binary(body, cdm->mbuf); + pn_data_exit(body); + + /* put_binary copies and stores so ok to use mbuf */ + cdm->mbuf.size = bufsize; + pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); + + pthread_mutex_lock(&send_lock); + DEQ_INSERT_TAIL(out_messages, cdm); + pthread_mutex_unlock(&send_lock); + + pn_message_free(message); + + /* activate the sender */ + if (conn != NULL) { + pn_connection_wake(conn); + } + +} /* }}} void encqueue */ + +static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ +{ + amqp1_config_instance_t *instance; + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + cd_message_t *cdm; + size_t bufsize = BUFSIZE; + + if ((n == NULL) || (user_data == NULL)) + return EINVAL; + + instance = user_data->data; + + if (instance->notify != true) { + ERROR("amqp1 plugin: write notification failed"); + } + + cdm = NEW(cd_message_t); + DEQ_ITEM_INIT(cdm); + cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize)); + cdm->instance = instance; + + switch (instance->format) { + case AMQP1_FORMAT_JSON: + format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); + status = format_json_notification((char *)cdm->mbuf.start, bufsize, n); + if (status != 0) { + ERROR("amqp1 plugin: formatting notification failed"); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + default: + ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format); + return -1; + } + + /* encode message and place on outbound queue */ + encqueue(cdm, instance); + + return 0; +} /* }}} int amqp1_notify */ + static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ - user_data_t *user_data) + user_data_t *user_data) { - amqp1_config_instance_t *instance = user_data->data; + amqp1_config_instance_t *instance; int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; cd_message_t *cdm; size_t bufsize = BUFSIZE; - pn_data_t *body; - pn_message_t *message; - if ((ds == NULL) || (vl == NULL) || (transport == NULL)) + if ((ds == NULL) || (vl == NULL) || (transport == NULL) || (user_data == NULL)) return EINVAL; + instance = user_data->data; + + if (instance->notify != false) { + ERROR("amqp1 plugin: write failed"); + } + cdm = NEW(cd_message_t); DEQ_ITEM_INIT(cdm); cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize)); @@ -312,40 +392,20 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ cdm->mbuf.size = strlen(cdm->mbuf.start); break; default: - ERROR("amqp1 plugin: Invalid format (%i).", instance->format); + ERROR("amqp1 plugin: Invalid write format (%i).", instance->format); return -1; } - /* encode message */ - message = pn_message(); - pn_message_set_address(message, instance->send_to); - body = pn_message_body(message); - pn_data_clear(body); - pn_data_put_binary(body, cdm->mbuf); - pn_data_exit(body); - - /* put_binary copies and stores so ok to use mbuf */ - cdm->mbuf.size = bufsize; - pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); - - pthread_mutex_lock(&send_lock); - DEQ_INSERT_TAIL(out_messages, cdm); - pthread_mutex_unlock(&send_lock); - - pn_message_free(message); - - /* activate the sender */ - if (conn != NULL) { - pn_connection_wake(conn); - } + /* encode message and place on outboud queue */ + encqueue(cdm, instance); return 0; -} /* }}} int amqp_write1 */ +} /* }}} int amqp1_write */ static void amqp1_config_transport_free(void *ptr) /* {{{ */ { amqp1_config_transport_t *transport = ptr; - + if (transport == NULL) return; @@ -361,7 +421,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */ static void amqp1_config_instance_free(void *ptr) /* {{{ */ { amqp1_config_instance_t *instance = ptr; - + if (instance == NULL) return; @@ -398,6 +458,8 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("PreSettle", child->key) == 0) status = cf_util_get_boolean(child, &instance->pre_settle); + else if (strcasecmp("Notify", child->key) == 0) + status = cf_util_get_boolean(child, &instance->notify); else if (strcasecmp("Format", child->key) == 0) { status = cf_util_get_string(child, &key); if (status != 0) @@ -456,8 +518,14 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", transport->address,instance->name); - status = plugin_register_write(tpname, amqp1_write, &(user_data_t) { - .data = instance, .free_func = amqp1_config_instance_free, }); + if (instance->notify == true) { + status = plugin_register_notification(tpname, amqp1_notify, &(user_data_t) { + .data = instance, .free_func = amqp1_config_instance_free, }); + } else { + status = plugin_register_write(tpname, amqp1_write, &(user_data_t) { + .data = instance, .free_func = amqp1_config_instance_free, }); + } + if (status != 0) { amqp1_config_instance_free(instance); } @@ -465,11 +533,11 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ return status; } /* }}} int amqp1_config_instance */ - + static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ { int status=0; - + transport = calloc(1, sizeof(*transport)); if (transport == NULL) { ERROR("amqp1 plugin: calloc failed."); @@ -484,7 +552,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ sfree(transport); return status; } - + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -504,7 +572,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ WARNING("amqp1 plugin: Ignoring unknown " "transport configuration option " "\%s\".", child->key); - + if (status != 0) break; } @@ -566,8 +634,7 @@ static int amqp1_init(void) /* {{{ */ return 0; } /* }}} int amqp1_init */ -static int amqp1_shutdown -(void) /* {{{ */ +static int amqp1_shutdown(void) /* {{{ */ { cd_message_t *cdm; diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 2dca2f53..4d6c38e4 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -742,8 +742,9 @@ is preserved, i.e. passed through. The I can be used to communicate with other instances of I or third party applications using an AMQP 1.0 message -intermediary. Values are sent to the messaging intermediary which -may handle direct messaging or queue based transfer. +intermediary. Metric values or notifications are sent to the +messaging intermediary which may handle direct messaging or +queue based transfer. B @@ -758,6 +759,7 @@ B Format "command" PreSettle false + Notify false # StoreRates false # GraphitePrefix "collectd." # GraphiteEscapeChar "_" @@ -770,8 +772,9 @@ B The plugin's configuration consists of a I that configures communications to the AMQP 1.0 messaging bus and one or more I -corresponding to publishers to the messaging system. The address in -the I block concatenated with the name given int the +corresponding to metric or event publishers to the messaging system. + +The address in the I block concatenated with the name given in the I block starting tag will be used as the send-to address for communications over the messaging link. @@ -836,6 +839,13 @@ system. If set to B, the plugin will not wait for a message acknowledgement and the message may be dropped prior to transfer of ownership. +=item B B|B + +If set to B (the default), the plugin will service the +instance write call back as a value list. If set to B the +plugin will service the instance as a write notification callback +for alert formatting. + =item B B|B Determines whether or not C, C and C data sources -- 2.11.0