#include "plugin.h"
#include "common.h"
#include "configfile.h"
-#include "utils_cache.h"
#include "utils_cmd_putval.h"
#include "utils_format_graphite.h"
#include "utils_format_json.h"
-#include "utils_crc32.h"
#include <stdint.h>
#include <librdkafka/rdkafka.h>
-#include <pthread.h>
-#include <zlib.h>
#include <errno.h>
struct kafka_topic_context {
}
#endif
+static uint32_t kafka_hash(const char *keydata, size_t keylen)
+{
+ uint32_t hash = 5381;
+ for (; keylen > 0; keylen--)
+ hash = ((hash << 5) + hash) + keydata[keylen - 1];
+ return hash;
+}
+
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
{
- uint32_t key = *((uint32_t *)keydata );
+ uint32_t key = kafka_hash(keydata, keylen);
uint32_t target = key % partition_cnt;
int32_t i = partition_cnt;