提问说明
kafka 消费端 设置了 enable.auto.commit=true 自动提交间隔是 1秒钟, 如果一秒钟 我进行多次poll 会不会出现重复消费 ,目前出现了大量重复消费的数据
kafka-clients-0.9.0.0.jar
public BmsKafkaConsumer(String kafkaUrl, String userName) {
props = new Properties();
props.put("group.id", UUID.nameUUIDFromBytes(userName.getBytes()).toString().replaceAll("-", ""));
props.put("auto.commit.interval.ms", "1000");
props.put("bootstrap.servers", kafkaUrl);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("heartbeat.interval.ms", "29000");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
topic.add("BAYONET_VEHICLEPASS");
lisenter = new BmsKafkaLisenter();
}
/**
* 鎺ユ敹鏁版嵁
*/
public void receive() {
close();
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topic);
isReceive = true;
try {
while (isReceive) {
log.info("start get data--------------------------");
ConsumerRecords<String, String> records = consumer.poll(100);
log.info("records size is =======[{}]",records.count());
for (ConsumerRecord<String, String> record : records) {
lisenter.receive(new String(record.value().getBytes(), Variable.KAFKA_CHARSET));
}
}
} catch (Exception e) {
log.error("鎺ユ敹杩囪溅鏁版嵁鏃跺嚭鐜板紓甯革紝鏈兘姝e父鎺ユ敹鏁版嵁", e);
close();
}
}
你的设计是错的,kafka poll是长轮训,也是长链接,批次获取消息。不要关闭,由它来统一给你派发消息。
你的答案