kafka 拉取数据顺序错误/offset提交问题?

钚瑬銘 发表于: 2021-03-30   最后更新时间: 2021-03-30 21:36:27   1,622 游览
  1. 问题
    kafka 数据拉取错误

  2. 版本、环境、场景等上下文信息

    假设生产者生产的数据是:1,2,3,4,5,6,7,8,9.
    如果第一次拉取到:1,2,3,4 提交offset
    第二次拉取到:5,6,7,8 不提交offset
    希望,第三次拉取到:5,6,7,8

  3. 相关代码

    def  kafka_con():
     consumer = KafkaConsumer(group_id='alg',
                             bootstrap_servers=['10.100.1.**:9092','10.100.1.**:9092','10.100.1.**:9092'],
                              value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                auto_offset_reset='earliest', enable_auto_commit=False)
     partition_a = [TopicPartition('dT', 0),TopicPartition('dT',1),
                    TopicPartition('dT',2), TopicPartition('dT',3)]
     consumer.assign(partition_a)
     f,k=open('./data.json','w',encoding='utf-8'),0
     while k<3:
         msg = consumer.poll(timeout_ms=10000, max_records=10000, update_offsets=False)
         for keys, values in msg.items():  # 不同partition的结果
             dic = {}
             for val in values:  # 每个partition拉取的每行
                 dic['number'] = val.key.decode('utf-8')
                 j_data = json.dumps(dic)
                 f.write(j_data + '\n')
         if k==0:
             consumer.commit_async()
         else:
             pass
         f.write('一次执行完毕{}\n'.format(k))
         k+=1
     f.close()
    
  4. 报错信息

    但是现在按照代码运行,结果显示为:
    1,2,3,4,5,6,7,8,1,2,3,4

  5. 已经尝试过哪些方法仍然没解决,操作步骤等
    刚学习kafka,想解决实时拉取数据,poll数据报错:则不提交,挂起进程2s后再次拉取原数据重新处理,正常处理完数据没报错:则提交offset

发表于 2021-03-30
添加评论

java出身,大概猜测,错误的地方应该是这个:

consumer.commit_async()

改成同步提交。

钚瑬銘 -> 半兽人 3年前

你好,谢谢回答~但我改成consumer.commit(), 还是显示1,2,3,4, 5,6,7,8, 1,2,3,4

钚瑬銘 -> 半兽人 3年前

补充回复:抱歉,现在可以了,就是按你说的改了就欧克了,谢谢~

你的答案

查看kafka相关的其他问题或提一个您自己的问题