很早就想要这个功能了每次出了问题都是换一下消费者组id,然后数据又全部重新消费一遍,很痛苦 几个场景 从当前时间往前推2个小时开始接着消费,消费完这2个小时断开 从当前时间往前推2个小时开始接着持续消费 指定个时间戳开始持续消费 指定个开始时间与结束时间消费,消费完断开
使用
KafkaConsumer.offsetsForTimes
,但要确认集群已开启log.message.timestamp.type
参数,并且clien要使用0.10.*
的客户端发送数据,数据格式和0.9
不同了。谢老哥提示。
好人一生平安
props.put("log.message.timestamp.type","LogAppendTime");
为什么设置了没反应
使用
KafkaConsumer.offsetsForTimes
。参考:kafka通过开始和结束时间,重新消费消息 - java
谢老哥提示,已经找到源码了。
可以撸代码了
源码可以发个地址吗,刚好需要
你的答案