if test "x$with_librdkafka" = "xyes"
then
AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"])
- AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log="yes"], [with_librdkafka_log="no (Symbol 'rd_kafka_conf_set_log_cb not found)"])
+ AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log_cb="yes"], [with_librdkafka_log_cb="no"])
+ AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_logger, [with_librdkafka_logger="yes"], [with_librdkafka_logger="no"])
fi
if test "x$with_librdkafka" = "xyes"
then
AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS)
AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.])
- if test "x$with_librdkafka_log" = "xyes"
+ if test "x$with_librdkafka_log_cb" = "xyes"
then
- AC_DEFINE(HAVE_LIBRDKAFKA_LOG, 1, [Define if librdkafka log facility is present and usable.])
+ AC_DEFINE(HAVE_LIBRDKAFKA_LOG_CB, 1, [Define if librdkafka log facility is present and usable.])
+ fi
+ if test "x$with_librdkafka_logger" = "xyes"
+ then
+ AC_DEFINE(HAVE_LIBRDKAFKA_LOGGER, 1, [Define if librdkafka log facility is present and usable.])
fi
fi
CPPFLAGS="$SAVE_CPPFLAGS"
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
-#ifdef HAVE_LIBRDKAFKA_LOG
static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
static void kafka_log(const rd_kafka_t *rkt, int level,
plugin_log(level, "%s", msg);
}
-#endif
-
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
tctx->store_rates = 1;
tctx->format = KAFKA_FORMAT_JSON;
-#ifdef HAVE_LIBRDKAFKA_LOG
- /*
- * Some versions of rdkafka do not allow setting a log callback.
- */
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
rd_kafka_conf_set_log_cb(conf, kafka_log);
#endif
if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
ERROR("write_kafka plugin: cannot create kafka handle.");
return;
}
+#ifdef HAVE_LIBRDKAFKA_LOGGER
+ rd_kafka_conf_set_logger(tctx->kafka, kafka_log);
+#endif
conf = NULL;
if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {