问题
kafka 数据拉取错误版本、环境、场景等上下文信息
假设生产者生产的数据是:1,2,3,4,5,6,7,8,9.
如果第一次拉取到:1,2,3,4 提交offset
第二次拉取到:5,6,7,8 不提交offset
希望,第三次拉取到:5,6,7,8相关代码
def kafka_con(): consumer = KafkaConsumer(group_id='alg', bootstrap_servers=['10.100.1.**:9092','10.100.1.**:9092','10.100.1.**:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')), auto_offset_reset='earliest', enable_auto_commit=False) partition_a = [TopicPartition('dT', 0),TopicPartition('dT',1), TopicPartition('dT',2), TopicPartition('dT',3)] consumer.assign(partition_a) f,k=open('./data.json','w',encoding='utf-8'),0 while k<3: msg = consumer.poll(timeout_ms=10000, max_records=10000, update_offsets=False) for keys, values in msg.items(): # 不同partition的结果 dic = {} for val in values: # 每个partition拉取的每行 dic['number'] = val.key.decode('utf-8') j_data = json.dumps(dic) f.write(j_data + '\n') if k==0: consumer.commit_async() else: pass f.write('一次执行完毕{}\n'.format(k)) k+=1 f.close()
报错信息
但是现在按照代码运行,结果显示为:
1,2,3,4,5,6,7,8,1,2,3,4已经尝试过哪些方法仍然没解决,操作步骤等
刚学习kafka,想解决实时拉取数据,poll数据报错:则不提交,挂起进程2s后再次拉取原数据重新处理,正常处理完数据没报错:则提交offset
昵称
0 声望
这家伙太懒,什么都没留下
java出身,大概猜测,错误的地方应该是这个:
改成同步提交。
你好,谢谢回答~但我改成consumer.commit(), 还是显示1,2,3,4, 5,6,7,8, 1,2,3,4
补充回复:抱歉,现在可以了,就是按你说的改了就欧克了,谢谢~
你的答案