Enforce JSON as the default format on a topic
authorPierre-Yves Ritschard <pyr@spootnik.org>
Wed, 30 Jul 2014 05:20:25 +0000 (07:20 +0200)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Wed, 30 Jul 2014 05:20:25 +0000 (07:20 +0200)
src/write_kafka.c

index 5dc4364..b74fe97 100644 (file)
@@ -40,9 +40,9 @@
 #include <zlib.h>
 
 struct kafka_topic_context {
+#define KAFKA_FORMAT_JSON        0
 #define KAFKA_FORMAT_COMMAND     1
 #define KAFKA_FORMAT_GRAPHITE    2
-#define KAFKA_FORMAT_JSON        3
     u_int8_t                     format;
     unsigned int                 graphite_flags;
     _Bool                        store_rates;
@@ -165,7 +165,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     int                          status;
     int                          i;
     struct kafka_topic_context  *tctx;
-    char                        *key;
+    char                        *key = NULL;
     char                        *val;
     char                         callback_name[DATA_MAX_NAME_LEN];
     char                         errbuf[1024];
@@ -180,6 +180,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
 
     tctx->escape_char = '.';
     tctx->store_rates = 1;
+    tctx->format = KAFKA_FORMAT_JSON;
 
     rd_kafka_conf_set_log_cb(conf, kafka_log);
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
@@ -262,7 +263,6 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             assert(key != NULL);
 
             if (strcasecmp(key, "Command") == 0) {
-
                 tctx->format = KAFKA_FORMAT_COMMAND;
 
             } else if (strcasecmp(key, "Graphite") == 0) {
@@ -275,6 +275,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
                 WARNING ("write_kafka plugin: Invalid format string: %s",
                          key);
             }
+
             sfree(key);
 
         } else if (strcasecmp ("StoreRates", child->key) == 0) {