新版的kafka,发送端能正常发送,我在做测试时循环发送10次,用客户端能收到10条,但是在程序中poll(100)时,小于10条,请问各位大牛,这是什么原因造成的?
首先确保你发送成功了10条吗?用命令来确认是否全部成功发送了。
对的,我通过命令bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties 查看接收数量和发送数量是一直的,但在java程序中总是会少
一般都是生产者少发了,你程序最后 休眠1秒。试试
生成者没少发,因为我在控制台能收到,而且生产者发送之后我都是sleep(2)秒了
消费者少收这个很少见,是用官网的例子跑的吗?
是的。while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { log.info("kafka ================ current topic :" + this.topic + ", receive message:" + record.value()); } }
恩,这样没问题,少了几条消息?
有没有规律。
我循环6次,每次发送后停2秒钟,而我consumer接收时,有时会少2条,有时会少1条。有没有可能是配置问题?
貌似没有规律,我的consumer的配置bootstrap.servers=192.168.1.35:9092,192.168.1.36:9092group.id=aaaauto.commit.interval.ms=2000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializermax.poll.interval.ms=3000max.poll.records=10session.timeout.ms=30000heartbeat.interval.ms=1000request.timeout.ms=40000
什么叫循环,你把代码贴出来看下。或者用官网提供的例子,什么都不要动。
for (i=0;i<6;i++) { send(msg); //这个方法就是官网发送消息的例子,只不过抽取出来了}
问题应该出在consumer中吧,发送没问题,接收少了
如果消费者也是官网的例子的话,消费者不是多线程,并且有休眠,消费者组名没有被其他线程同时启动,那我实在想不出来为何会丢消息了。抱歉
多线程?呃,在消费者代码我我倒是集成了线程,难道是这个原因?
继承,不是集成,写错了
你的代码不清楚,所以建议求你消费者程序还原官网的,什么都不要动,如果不丢消息,那你在检查自己的。
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
首先确保你发送成功了10条吗?用命令来确认是否全部成功发送了。
对的,我通过命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties 查看接收数量和发送数量是一直的,但在java程序中总是会少
一般都是生产者少发了,你程序最后 休眠1秒。试试
生成者没少发,因为我在控制台能收到,而且生产者发送之后我都是sleep(2)秒了
消费者少收这个很少见,是用官网的例子跑的吗?
是的。while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.info("kafka ================ current topic :" + this.topic + ", receive message:" + record.value());
}
}
恩,这样没问题,少了几条消息?
有没有规律。
我循环6次,每次发送后停2秒钟,而我consumer接收时,有时会少2条,有时会少1条。有没有可能是配置问题?
貌似没有规律,我的consumer的配置
bootstrap.servers=192.168.1.35:9092,192.168.1.36:9092
group.id=aaa
auto.commit.interval.ms=2000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=3000
max.poll.records=10
session.timeout.ms=30000
heartbeat.interval.ms=1000
request.timeout.ms=40000
什么叫循环,你把代码贴出来看下。
或者用官网提供的例子,什么都不要动。
for (i=0;i<6;i++) {
send(msg); //这个方法就是官网发送消息的例子,只不过抽取出来了
}
问题应该出在consumer中吧,发送没问题,接收少了
如果消费者也是官网的例子的话,消费者不是多线程,并且有休眠,消费者组名没有被其他线程同时启动,那我实在想不出来为何会丢消息了。抱歉
多线程?呃,在消费者代码我我倒是集成了线程,难道是这个原因?
继承,不是集成,写错了
你的代码不清楚,所以建议求你消费者程序还原官网的,什么都不要动,如果不丢消息,那你在检查自己的。
你的答案