新版kafka的consumer接收消息时会丢失消息

匿名 发表于: 2017-08-21   最后更新时间: 2017-08-21 11:08:49   6,322 游览

新版的kafka,发送端能正常发送,我在做测试时循环发送10次,用客户端能收到10条,但是在程序中poll(100)时,小于10条,请问各位大牛,这是什么原因造成的?

发表于 2017-08-21
添加评论

首先确保你发送成功了10条吗?用命令来确认是否全部成功发送了。

匿名 -> 半兽人 7年前

对的,我通过命令

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties 查看接收数量和发送数量是一直的,但在java程序中总是会少

半兽人 -> 匿名 7年前

一般都是生产者少发了,你程序最后 休眠1秒。试试

匿名 -> 半兽人 7年前

生成者没少发,因为我在控制台能收到,而且生产者发送之后我都是sleep(2)秒了

半兽人 -> 匿名 7年前

消费者少收这个很少见,是用官网的例子跑的吗?

匿名 -> 半兽人 7年前

是的。while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.info("kafka ================  current topic :" + this.topic + ",  receive message:" + record.value());
}
}

半兽人 -> 匿名 7年前

恩,这样没问题,少了几条消息?

半兽人 -> 匿名 7年前

有没有规律。

匿名 -> 半兽人 7年前

我循环6次,每次发送后停2秒钟,而我consumer接收时,有时会少2条,有时会少1条。有没有可能是配置问题?

匿名 -> 半兽人 7年前

貌似没有规律,我的consumer的配置

bootstrap.servers=192.168.1.35:9092,192.168.1.36:9092
group.id=aaa
auto.commit.interval.ms=2000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=3000
max.poll.records=10
session.timeout.ms=30000
heartbeat.interval.ms=1000
request.timeout.ms=40000

半兽人 -> 匿名 7年前

什么叫循环,你把代码贴出来看下。
或者用官网提供的例子,什么都不要动。

匿名 -> 半兽人 7年前

for (i=0;i<6;i++) {
  send(msg);   //这个方法就是官网发送消息的例子,只不过抽取出来了
}

匿名 -> 半兽人 7年前

问题应该出在consumer中吧,发送没问题,接收少了

半兽人 -> 匿名 7年前

如果消费者也是官网的例子的话,消费者不是多线程,并且有休眠,消费者组名没有被其他线程同时启动,那我实在想不出来为何会丢消息了。抱歉

匿名 -> 半兽人 7年前

多线程?呃,在消费者代码我我倒是集成了线程,难道是这个原因?

匿名 -> 半兽人 7年前

继承,不是集成,写错了

半兽人 -> 匿名 7年前

你的代码不清楚,所以建议求你消费者程序还原官网的,什么都不要动,如果不丢消息,那你在检查自己的。

你的答案

查看kafka相关的其他问题或提一个您自己的问题