我一个topic分了4个区(Partition),生产者(Producer)发送消息时,现在没指定分区,所以消息都存在一个区了。请问,能自动的平均或随机分配到不同区吗?
版本是kafka_2.12-2.4.0;场景是我topic生产的消息,想让几个消费者同时并发消费这些消息,现在我的消费者有多个,但是只有一个消费者在消费(消费者group id一样),groupid相同是不想要广播似的消费。
相关代码
未指定分区的生产消息:
public void sendNoPartition(String topic, String key, MessageEntity entity) {
ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
topic,
key,
entity);
long startTime = System.currentTimeMillis();
ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
future.addCallback(new ProducerCallback(startTime, key, entity));
}
我如下修改代码,生产时带上分区号。如此,消息会随机的到不同分区,各个消费者会并发消费。我的问题是,在生产时有什么设置吗?会自动分配到不同区。或者在消费者端,怎么设置 让这些groupid相同的消费者去并发的消费这个topic的消息。
(正常)指定了分区的生产消息:
public void sendRandomPartition(String topic, String key, MessageEntity entity) {
List listThePartions = kafkaTemplate.partitionsFor(topic);
for (int i = 0; i < listThePartions.size(); i++) {
System.out.println(listThePartions.get(i));
}
Random rand = new Random();
int fenpeiPartition = rand.nextInt(listThePartions.size());
System.out.println("将分配的分区为: " + fenpeiPartition);
ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
topic,
*fenpeiPartition*,
key,
entity);
long startTime = System.currentTimeMillis();
ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
future.addCallback(new ProducerCallback(startTime, key, entity));
}
未指定分区的生产消息的代码我看是没问题的,客户端默认是轮询消息到各个分区的,你的新分区是提前创建好的,还是在生产者发送中创建的?
谢谢你,原来客户端默认是轮询消息到各个分区的。
我的新分区是通过kafka-topics.bat --alter --zookeeper 127.0.0.1:2181 --partitions 2 这种命令行提前更改好的。更改后,才生产消息,但是发现生产的消息 只到其中一个区了,其他区没有消息。
多找了一下资料,kafka生产者消息如何分区:默认是你说的轮询策略,还有随机策略,按key分区策略,其他分区策略,比如按ip地址。我现在改了一下,把key值改为随机:
String key = "key_"+getRandomString(5); simpleProducer.sendNoPartition(topic, key, message);
如此,生产的消息也会分到各个区。
key你写死不会变的嘛。。。。。
既然你已经查了资料,key的作用可不是你加了随机数这么用的(那还不如不传,null),kafka设计的时候有几个核心。
谢谢!
你的答案