kafka根据时间戳消费或者消费指定时间内的数据

在名 发表于: 2018-02-23   最后更新时间: 2021-08-29 23:56:57   22,889 游览

很早就想要这个功能了
每次出了问题都是换一下消费者组id,然后数据又全部重新消费一遍,很痛苦

几个场景
  1. 从当前时间往前推2个小时开始接着消费,消费完这2个小时断开
  2. 从当前时间往前推2个小时开始接着持续消费
  3. 指定个时间戳开始持续消费
  4. 指定个开始时间与结束时间消费,消费完断开
发表于 2018-02-23
添加评论

使用KafkaConsumer.offsetsForTimes,但要确认集群已开启log.message.timestamp.type参数,并且clien要使用0.10.*的客户端发送数据,数据格式和0.9不同了。

在名 -> 漂泊的美好 6年前

谢老哥提示。
好人一生平安

 props.put("log.message.timestamp.type","LogAppendTime");

为什么设置了没反应

使用KafkaConsumer.offsetsForTimes

参考:kafka通过开始和结束时间,重新消费消息 - java

在名 -> 半兽人 6年前

谢老哥提示,已经找到源码了。
可以撸代码了

天南地北 -> 在名 6年前

源码可以发个地址吗,刚好需要

你的答案

查看kafka相关的其他问题或提一个您自己的问题