zookeeper版本3.4.10, kafka版本1.0.1
问题主要是, 手动提交方式的情况下,拉取到几条数据,未提交, 下次打开拉取不到数据。
关闭消费者,生产者发送几条数据,再打开消费者,拉取不到数据.
消费者打开的情况下生产者发送消息能拉取到,从偏移量能看出关闭消费者时发送的那几条数据
异常信息
打开zookeeper和kafka后zookeeper的异常信息
Established session 0x16435b8d5a50000 with negotiated timeout 6000 for client /127.0.0.1:10017
这个之前问过,不确定是长链接sesison问题,10017端口外部地址是2181.而且重启电脑也没用
[ProcessThread(sid:0 cport:2181)::PrepRequestProcess@648] - Got user-level KeeperException when processing sessionid:0x16435b8d5a50000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Errors:KeeperErrorCode = NoNode for /brokers
大同小异几个错误提示, Error Path 不一样, 现在有brokers,config,admin,cluster,constroller_epoch,还有两个稍微有点不一样的错,Error Path是/brokers和/brokers/ids, KeeperErrorCode是NodeExists for XXX
网上查这个问题解决方法是删除
zookeeper version-2
文件夹,试过,没用,能清的一系列文件夹log,dir等都删过.目前kafka无异常, 不过有一个地方让人在意
Creating /brokers/ids/1001 (is it secure? false)
创建topic时又会报2上面的错,
Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0
因为是两个分区所以来了两遍
生产者发送消息,没啥问题,一打开消费者,__consumer-offsets就会被创建
并生成50个分区..然后每个分区都报2的错,再往后就是开始说的问题了.
ps.之前测试时都是一个分区
自动提交设置为false了吗?
自动提交设置的是false, 然后现在测试又发现一个问题, 发送者发送十条消息, 偏移量不连续, 消费者接受数据时也不按偏移量顺序来
哦, 这个偏移量顺序不是问题, 因为两个分区
生产者和消费者配置基本和你写的例子一样, 我先打开消费者用KafkaConsumer.position 查看偏移量, 值为150, 关闭消费者后生产者发送20条数据, 再打开消费者, 偏移量直接为170, 也没拉取到数据
你先确保消息成功发送到topic里了,先开个命令窗口去消费。
生产者发送消息有用回调方法,能看到偏移量, 消费者接收消息时也输出了偏移量, 消息都是全部发送成功的, 而且topic是一个分区的,现在问题就像是, 消息发送成功后就被消费了一样
你把手动提交关了,换成自动提交。有可能消息跳过了。
比如:10条消息,0,1,2,3,4,5,6....10。如果你提交3的偏移量,则0,1,2也认为是消费了。
可以了...取消手动提交就能拉取到数据了, 也就是在 先关闭消费者的情况下发送消息再打开,现在可以拉取到数据了... 可是这个消息跳过的问题该怎么解决, (因为项目需求用手动提交会比较好,防止丢失消息)
没办法的,kafka的机制如此,是按照下标来的。
手动提交模式,并没有执行提交相关方法...是怎么导致消息跳过的...
程序是你写的额。你得检查了,如果有后面的消息调用了提交,就会导致之前报错的消息也跟着被提交。
消费者配置
bootstrap.servers 192.168.0.222:9092
group.id 11
enable.auto.commit false
auto.commit.interval.ms 1000
session.timeout.ms 3000
key/value.deserializer org.apache.kafka.common.serialization.StringDeserializer
你的答案