以下为代码:
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(properties);
consumer.subscribe(Arrays.asList("kafkaSync"));
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Set<TopicPartition> assignments = consumer.assignment();
Map<TopicPartition, Long> query = new HashMap<>();
for (TopicPartition topicPartition : assignments) {
System.out.println(topicPartition); query.put(topicPartition,Long.valueOf(properties.getProperty("startTime")));
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
for(Entry<TopicPartition,OffsetAndTimestamp> entry:result.entrySet()){
System.out.println(entry);
consumer.seek(entry.getKey(), entry.getValue().offset());
}
flag = false;
}
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
返回的result结果为空
关于时间戳类型的设置,log.message.timestamp.type=LogAppendTime, 设置后没有反应,而且采用这个方法offsetsForTimes 获取出来的集合为null, 该怎么处理
你的数据还在吗?先消费出来吧时间戳打出来
数据是有的, 消费的时候时间戳打印出来了,但是是-1, 时间戳类型是CreateTime
时间戳打印出来是 -1 吗?
你邮箱多少,我把根据时间戳消费的例子私发给你,看你周末努力免费赠送(^__^) 。
好的,谢谢!!!邮箱 1508156481@qq.com 这个问题已经缠了我两天了,时间戳类型设置了也没反应
根据offsetsForTimes 获取的集合还是null, 但是key有值,value是null,怎么办
我的也是key有值,value为null,咋解决的啊
之后我试试。
我现在的问题是,在配置log.message.timestamp.type=LogAppendTime参数后,集群启用了timestamp索引。kafka数据目录会多出了一个 SegmentBaseOffset.timeindex的时间索引文件,但是我加数据后,只有log文件和offset.index文件会有变化(大小在增加),而SegmentBaseOffset.timeindex这个文件的大小始终为0,所以我怀疑consumer.offsetsForTimes返回为空,跟这个有关系。我在这个社区中(https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index) 查到了这么一句话,“For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.”提到了message format v0可能会导致时间索引文件没有任何东西插入。不知道你有何高见?
根据offsetsForTimes 获取的集合还是null, 但是key有值,value是null,怎么办
我记得我当时问题的解决方法是,把客户端也就是kafka-clients包换乘0.10.1.1也就是必须支持offsetsForTimes的版本,才可以。
我这个版本是1.0.1的
我觉得是时间戳的问题, log.message.timestamp.type=LogAppendTime 这个是直接就可以设置的吗? props.put("log.message.timestamp.type","LogAppendTime"); 是这样吗
kafka服务端(你说你的是1.0.1的) broker配置log.message.timestamp.type=LogAppendTime就可以了。客户端也就是你的生产程序也必须要用1.0.1的kafka-client.jar包。这种方式props.put("log.message.timestamp.type","LogAppendTime")在你的生产代码里不需要。
客户端用的也是1.0.1的, 代码里不需要设置,是在哪设置呢?
是在集群配置文件里设置吗
是的
在这个server.properties配置文件里吗?
是的
需要重新启动kafka吗? 里边的数据会丢失吗
更改配置当然要重新启动。数据不会丢失。
在cdh上怎么添加这个配置 log.message.timestamp.type=LogAppendTime ??
你也用cdh啊。这就简单了,在高级配置里找到kafka.properties的kafka broker高级配置代码段,这个选项,配进去就重启就可以了。
这个就是开启时间戳的, 然后用这个方法offsetsForTimes 获取偏移量?
已经出来了,万分感谢,你的指导
请问,我在server.properties中已经加入了message.timestamp.type=LogAppendTime这个配置,重启后新建的topic中消费出来的时间还是-1 这个是咋回事
请问下,假如kafka集群这个参数是默认的CreateTime,我又不想改集群配置,可以直接消费的时候指定这个参数为LogAppendTime
这个key有值value没有的情况怎么做,有没有具体例子
返回的时间是utc时间×1000+毫秒,转显示时直接转本地时间会溢出,显示为-1
处理成真实utc时间后转本地时间即可
你的答案