当我开三台服务器去消费消费一个topic的时候,当我断开某一台服务器消费,那么会导致重复消费。
我建立了一个topic,并且建立了3个分区,我循环了1000次发送消息,但是输出却统计了1002条消息:
public static void main(String args[]) {
String topic = "dyq";
if(args.length >0) {
topic = args[0];
}
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxx:9092");
props.put("auto.commit.interval.ms", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test");
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个Topic,则在这里add进去即可
//subscribedTopics.add(topic1);
subscribedTopics.add(topic);
consumer.subscribe(subscribedTopics);
try {
int index = 0;
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
index++;
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//数据库操作操作
//....................
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
System.out.println("我现在准备入库操作了 || 累计入库数 【" + index +"】" );
}
}
} finally {
consumer.close();
}
}
你好,你的网站上有没有一个地方,是专门给你留言发问题的?现在都是在某篇文章下面给你留言的
有问答区的呀。
哦哦,我以为那个只是你网站的滚动消息呢。好了,我在那里提了问题,你有空时给我解答下哈,谢谢啦
props.put("enable.auto.commit", "false");
关闭自动提交,
其实我发现一个问题, 我们入库的代码,如果是放在这句
代码之前 ,当刚好执行数据入库,服务器挂掉,那么我们kafka的消息就并未提交,那么其他服务器就会多提交一条消息入库。反之,如果我们把入库代码放在kafka消息提交之后,再进行提交,那么当我们执行完了kafka消息提交之后,还未在我能数据入库这条消息的时候挂掉,那么就会丢失这一条消息。请我我说的这个是对的吗?因为我在并发测试的时候,把入库代码放在kafka消息提交之后处理,就丢失了一条消息。
是这样的,后成功的会跳过这个offset。
那么这个问题是没办法处理的吗?要么会多消费一条数据,要么会少消费一条数据~~
没办法的,原理如此。
我现在能想到的,就是要么在数据库端去做判断,不提交重复数据。要么就同一个group中,不做多台服务器去处理。请问我这个说法是否正确,或者有其他方式呢?
默认都认为是处理成功,如果失败,重新丢回kafka中。
不管你做什么判断,后面的程序一旦提交成功,你的offset就变了,并且你offset提交的顺序不规则,你逻辑全乱了。不仅仅是丢消息,重复消费等问题了。
你的答案