kafka的consumer.offsetsForTimes根据timestamp查找offset返回结果为空

漂泊的美好 发表于: 2017-12-07   最后更新时间: 2021-07-20 23:56:27   12,728 游览

以下为代码:

KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(properties);
consumer.subscribe(Arrays.asList("kafkaSync"));

boolean flag = true;

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);

if (flag) {
    Set<TopicPartition> assignments = consumer.assignment();
    Map<TopicPartition, Long> query = new HashMap<>();
    for (TopicPartition topicPartition : assignments) {
        System.out.println(topicPartition);                    query.put(topicPartition,Long.valueOf(properties.getProperty("startTime")));
    }

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
    for(Entry<TopicPartition,OffsetAndTimestamp> entry:result.entrySet()){
        System.out.println(entry);
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
    flag = false;
}

for (ConsumerRecord<String, String> record : records){
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

返回的result结果为空

发表于 2017-12-07
添加评论

关于时间戳类型的设置,log.message.timestamp.type=LogAppendTime, 设置后没有反应,而且采用这个方法offsetsForTimes 获取出来的集合为null, 该怎么处理

你的数据还在吗?先消费出来吧时间戳打出来

数据是有的, 消费的时候时间戳打印出来了,但是是-1, 时间戳类型是CreateTime

时间戳打印出来是 -1 吗?
你邮箱多少,我把根据时间戳消费的例子私发给你,看你周末努力免费赠送(^__^) 。

好的,谢谢!!!邮箱 1508156481@qq.com 这个问题已经缠了我两天了,时间戳类型设置了也没反应

根据offsetsForTimes 获取的集合还是null, 但是key有值,value是null,怎么办

我的也是key有值,value为null,咋解决的啊

之后我试试。

我现在的问题是,在配置log.message.timestamp.type=LogAppendTime参数后,集群启用了timestamp索引。kafka数据目录会多出了一个 SegmentBaseOffset.timeindex的时间索引文件,但是我加数据后,只有log文件和offset.index文件会有变化(大小在增加),而SegmentBaseOffset.timeindex这个文件的大小始终为0,所以我怀疑consumer.offsetsForTimes返回为空,跟这个有关系。我在这个社区中(https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index) 查到了这么一句话,“For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.”提到了message format v0可能会导致时间索引文件没有任何东西插入。不知道你有何高见?

根据offsetsForTimes 获取的集合还是null, 但是key有值,value是null,怎么办

我记得我当时问题的解决方法是,把客户端也就是kafka-clients包换乘0.10.1.1也就是必须支持offsetsForTimes的版本,才可以。

我这个版本是1.0.1的

我觉得是时间戳的问题, log.message.timestamp.type=LogAppendTime 这个是直接就可以设置的吗? props.put("log.message.timestamp.type","LogAppendTime"); 是这样吗

kafka服务端(你说你的是1.0.1的) broker配置log.message.timestamp.type=LogAppendTime就可以了。客户端也就是你的生产程序也必须要用1.0.1的kafka-client.jar包。这种方式props.put("log.message.timestamp.type","LogAppendTime")在你的生产代码里不需要。

客户端用的也是1.0.1的, 代码里不需要设置,是在哪设置呢?

是在集群配置文件里设置吗

在这个server.properties配置文件里吗?

需要重新启动kafka吗? 里边的数据会丢失吗

更改配置当然要重新启动。数据不会丢失。

在cdh上怎么添加这个配置 log.message.timestamp.type=LogAppendTime ??

你也用cdh啊。这就简单了,在高级配置里找到kafka.properties的kafka broker高级配置代码段,这个选项,配进去就重启就可以了。

这个就是开启时间戳的, 然后用这个方法offsetsForTimes 获取偏移量?

已经出来了,万分感谢,你的指导

Paddy -> 漂泊的美好 5年前

请问,我在server.properties中已经加入了message.timestamp.type=LogAppendTime这个配置,重启后新建的topic中消费出来的时间还是-1 这个是咋回事

请问下,假如kafka集群这个参数是默认的CreateTime,我又不想改集群配置,可以直接消费的时候指定这个参数为LogAppendTime

这个key有值value没有的情况怎么做,有没有具体例子

返回的时间是utc时间×1000+毫秒,转显示时直接转本地时间会溢出,显示为-1
处理成真实utc时间后转本地时间即可

你的答案

查看kafka相关的其他问题或提一个您自己的问题