write_kafka plugin: Use the user-provided key verbatim, not its CRC32.
authorFlorian Forster <octo@collectd.org>
Tue, 1 Dec 2015 20:08:49 +0000 (21:08 +0100)
committerFlorian Forster <octo@collectd.org>
Tue, 1 Dec 2015 20:09:32 +0000 (21:09 +0100)
Fixes: #1283

src/write_kafka.c

index a5977ab..9bc958f 100644 (file)
@@ -51,8 +51,7 @@ struct kafka_topic_context {
     rd_kafka_topic_t            *topic;
     rd_kafka_conf_t             *kafka_conf;
     rd_kafka_t                  *kafka;
-    int                          has_key;
-    uint32_t                     key;
+    char                        *key;
     char                        *prefix;
     char                        *postfix;
     char                         escape_char;
@@ -148,7 +147,8 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
           user_data_t *ud)
 {
     int      status = 0;
-    uint32_t key;
+    void    *key;
+    size_t   keylen = 0;
     char     buffer[8192];
     size_t   bfree = sizeof(buffer);
     size_t   bfill = 0;
@@ -199,17 +199,13 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
         return -1;
     }
 
-    /*
-     * We partition our stream by metric name
-     */
-    if (ctx->has_key)
-        key = ctx->key;
-    else
-        key = rand();
+    key = ctx->key;
+    if (key != NULL)
+        keylen = strlen (key);
 
     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
                      RD_KAFKA_MSG_F_COPY, buffer, blen,
-                     &key, sizeof(key), NULL);
+                     key, keylen, NULL);
 
     return status;
 } /* }}} int kafka_write */
@@ -318,19 +314,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             }
 
         } else if (strcasecmp ("Key", child->key) == 0)  {
-            char *tmp_buf = NULL;
-            status = cf_util_get_string(child, &tmp_buf);
-            if (status != 0) {
-                WARNING("write_kafka plugin: invalid key supplied");
-                break;
-            }
-
-            if (strcasecmp(tmp_buf, "Random") != 0) {
-                tctx->has_key = 1;
-                tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
-            }
-            sfree(tmp_buf);
-
+            cf_util_get_string (child, &tctx->key);
         } else if (strcasecmp ("Format", child->key) == 0) {
             status = cf_util_get_string(child, &key);
             if (status != 0)