X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=775e2e0934f55613d69e5521a3829a495a8a8c4a;hb=dc2eb041159b967838a2eb658cb256bc846c5264;hp=c5c7e3df9b3bac47b78b3fd216b13655b308b2c1;hpb=585e552936cc92e437ce1f58695a6d1b3caf785e;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index c5c7e3df..775e2e09 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -34,7 +34,7 @@ #include "utils_format_json.h" #include "utils_crc32.h" -#include +#include #include #include #include @@ -44,7 +44,7 @@ struct kafka_topic_context { #define KAFKA_FORMAT_JSON 0 #define KAFKA_FORMAT_COMMAND 1 #define KAFKA_FORMAT_GRAPHITE 2 - u_int8_t format; + uint8_t format; unsigned int graphite_flags; _Bool store_rates; rd_kafka_topic_conf_t *conf; @@ -52,7 +52,7 @@ struct kafka_topic_context { rd_kafka_conf_t *kafka_conf; rd_kafka_t *kafka; int has_key; - u_int32_t key; + uint32_t key; char *prefix; char *postfix; char escape_char; @@ -79,8 +79,8 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *p, void *m) { - u_int32_t key = *((u_int32_t *)keydata ); - u_int32_t target = key % partition_cnt; + uint32_t key = *((uint32_t *)keydata ); + uint32_t target = key % partition_cnt; int32_t i = partition_cnt; while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { @@ -111,6 +111,8 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */ } rd_kafka_conf_destroy(ctx->kafka_conf); + ctx->kafka_conf = NULL; + INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka)); #ifdef HAVE_LIBRDKAFKA_LOGGER @@ -132,6 +134,8 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */ } rd_kafka_topic_conf_destroy(ctx->conf); + ctx->conf = NULL; + INFO ("write_kafka plugin: handle created for topic : %s", rd_kafka_topic_name(ctx->topic)); } @@ -144,7 +148,7 @@ static int kafka_write(const data_set_t *ds, /* {{{ */ user_data_t *ud) { int status = 0; - u_int32_t key; + uint32_t key; char buffer[8192]; size_t bfree = sizeof(buffer); size_t bfill = 0;