你好,我在 consumer 订阅 topic 时使用了正则表达式,先启动consumer订阅数据,之后动态增加新的topic,并使用producer向新的topic发送数据。
我发现 consumer 要在 producer 发送数据几分钟之后才能接收到新的 topic 里的数据,前面几分钟的数据没有订阅到。
延迟大概5分钟。
请问有什么办法能解决这个延迟问题吗?
部分代码如下:
kafkaProps.put("bootstrap.servers", brokers);
kafkaProps.put("group.id", groupID);
kafkaProps.put("client.id", clientID);
kafkaProps.put("auto.offset.reset", "latest");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("enable.auto.commit", "false");
String topic = "org.*.datatype";
Pattern pattern = Pattern.compile(topic);
consumer.subscribe(pattern);
//轮询
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
......
}
非常感谢!
你可以试试从最早开始消费“ earliest”。
延迟发现目前官方没有提供相应的说明。
使用 “earliest” 也是个办法,可以获取到前面几分钟错过的数据,解决了数据丢失的问题
谢谢回复
但是注意,在重启后,也会重新消费之前的消息...
配置下这个额外参数
metadata.max.age.ms 1000
你的答案