From 480d66bbe1970d6cbb68765878f2ee6187bbd5b2 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Ritschard Date: Thu, 31 Jul 2014 09:58:06 +0200 Subject: [PATCH] conditionally use rd_kafka_conf_set_log_cb, fixes #686 --- configure.ac | 5 +++++ src/write_kafka.c | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/configure.ac b/configure.ac index 59f05710..6e04af16 100644 --- a/configure.ac +++ b/configure.ac @@ -3623,6 +3623,7 @@ fi if test "x$with_librdkafka" = "xyes" then AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"]) + AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log="yes"], [with_librdkafka_log="no (Symbol 'rd_kafka_conf_set_log_cb not found)"]) fi if test "x$with_librdkafka" = "xyes" then @@ -3633,6 +3634,10 @@ then AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS) AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS) AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.]) + if test "x$with_librdkafka_log" = "xyes" + then + AC_DEFINE(HAVE_LIBRDKAFKA_LOG, 1, [Define if librdkafka log facility is present and usable.]) + fi fi CPPFLAGS="$SAVE_CPPFLAGS" LDFLAGS="$SAVE_LDFLAGS" diff --git a/src/write_kafka.c b/src/write_kafka.c index b74fe97d..ff3176dd 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -60,6 +60,8 @@ struct kafka_topic_context { static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); + +#ifdef HAVE_LIBRDKAFKA_LOG static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -68,6 +70,8 @@ static void kafka_log(const rd_kafka_t *rkt, int level, plugin_log(level, "%s", msg); } +#endif + static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *p, void *m) @@ -182,7 +186,12 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ tctx->store_rates = 1; tctx->format = KAFKA_FORMAT_JSON; +#ifdef HAVE_LIBRDKAFKA_LOG + /* + * Some versions of rdkafka do not allow setting a log callback. + */ rd_kafka_conf_set_log_cb(conf, kafka_log); +#endif if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, sizeof(errbuf))) == NULL) { sfree(tctx); -- 2.11.0