kafka消费者数据丢失. 手动提交方式未提交,消费者未打开情况下数据已被消费

发表于: 2018-06-25   最后更新时间: 2018-06-25 16:18:21   5,456 游览

zookeeper版本3.4.10, kafka版本1.0.1

问题主要是, 手动提交方式的情况下,拉取到几条数据,未提交, 下次打开拉取不到数据。

关闭消费者,生产者发送几条数据,再打开消费者,拉取不到数据.

消费者打开的情况下生产者发送消息能拉取到,从偏移量能看出关闭消费者时发送的那几条数据

异常信息

  1. 打开zookeeper和kafka后zookeeper的异常信息

     Established session 0x16435b8d5a50000 with negotiated timeout 6000 for client /127.0.0.1:10017
    

    这个之前问过,不确定是长链接sesison问题,10017端口外部地址是2181.而且重启电脑也没用

  2.  [ProcessThread(sid:0 cport:2181)::PrepRequestProcess@648] - Got user-level KeeperException when processing sessionid:0x16435b8d5a50000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Errors:KeeperErrorCode = NoNode for /brokers
    

    大同小异几个错误提示, Error Path 不一样, 现在有brokers,config,admin,cluster,constroller_epoch,还有两个稍微有点不一样的错,Error Path是/brokers和/brokers/ids, KeeperErrorCode是NodeExists for XXX

    网上查这个问题解决方法是删除zookeeper version-2文件夹,试过,没用,能清的一系列文件夹log,dir等都删过.

    目前kafka无异常, 不过有一个地方让人在意

     Creating /brokers/ids/1001 (is it secure? false)
    
  3. 创建topic时又会报2上面的错,

       Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0
    

    因为是两个分区所以来了两遍

  4. 生产者发送消息,没啥问题,一打开消费者,__consumer-offsets就会被创建

    并生成50个分区..然后每个分区都报2的错,再往后就是开始说的问题了.

ps.之前测试时都是一个分区

发表于 2018-06-25
添加评论

自动提交设置为false了吗?

-> 半兽人 6年前

自动提交设置的是false, 然后现在测试又发现一个问题, 发送者发送十条消息, 偏移量不连续, 消费者接受数据时也不按偏移量顺序来

-> 6年前

哦, 这个偏移量顺序不是问题, 因为两个分区

-> 半兽人 6年前

生产者和消费者配置基本和你写的例子一样, 我先打开消费者用KafkaConsumer.position 查看偏移量, 值为150, 关闭消费者后生产者发送20条数据, 再打开消费者, 偏移量直接为170, 也没拉取到数据

半兽人 -> 6年前

你先确保消息成功发送到topic里了,先开个命令窗口去消费。

-> 半兽人 6年前

生产者发送消息有用回调方法,能看到偏移量, 消费者接收消息时也输出了偏移量, 消息都是全部发送成功的, 而且topic是一个分区的,现在问题就像是, 消息发送成功后就被消费了一样

半兽人 -> 6年前

你把手动提交关了,换成自动提交。有可能消息跳过了。
比如:10条消息,0,1,2,3,4,5,6....10。如果你提交3的偏移量,则0,1,2也认为是消费了。  

-> 半兽人 6年前

可以了...取消手动提交就能拉取到数据了, 也就是在 先关闭消费者的情况下发送消息再打开,现在可以拉取到数据了... 可是这个消息跳过的问题该怎么解决, (因为项目需求用手动提交会比较好,防止丢失消息)

半兽人 -> 6年前

没办法的,kafka的机制如此,是按照下标来的。

-> 半兽人 6年前

手动提交模式,并没有执行提交相关方法...是怎么导致消息跳过的...

半兽人 -> 6年前

程序是你写的额。你得检查了,如果有后面的消息调用了提交,就会导致之前报错的消息也跟着被提交。

消费者配置
bootstrap.servers 192.168.0.222:9092
group.id 11
enable.auto.commit false
auto.commit.interval.ms 1000
session.timeout.ms 3000
key/value.deserializer  org.apache.kafka.common.serialization.StringDeserializer

你的答案

查看kafka相关的其他问题或提一个您自己的问题