提问说明 创建topic时用--config指定creataTime或者LogApppendTime获取timeStamp都为-1 server.proerties中已加入message.timestamp.type配置。获取到的timeStamp仍为-1
log.message.timestamp.type配置好后,你是怎么获取的?
public static void main(String[] args) {(props); rs = consumer.poll(100); r : rs) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.30.79.134:6667,172.30.79.135:6667,192.168.1.219:6667,192.168.1.220:6667,192.168.1.221:6667");
//props.put("bootstrap.servers", "47.107.148.97:9092");
//props.put("bootstrap.servers", "106.12.73.246:9092");
//props.put("config","message.timestamp.type=LogAppendTime");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer consumer = new KafkaConsumer
//TopicPartition partition01 = new TopicPartition("gcgTest7", 0);
//consumer.assign(Arrays.asList(new TopicPartition[]{partition01}));
//consumer.seek(partition01,10);
consumer.subscribe(Arrays.asList(new String[]{"testGcg1"}));
while (true) {
//--消费100ms内的数据
ConsumerRecords
//--遍历得到的结果集
for (ConsumerRecord
//--得到信息 打印
System.out.println("--topic:[" + r.topic() + "]"
+ "--partition:[" + r.partition() + "]" + "--offset:[" + r.offset() + "]" + "--timestamp:[" + r.timestamp() + "]" + "--key:[" + r.key() + "]" + "--value:[" + r.value() + "]--" + "type="+r.timestampType());} } }</string,></string,></string,>
你kafka什么版本呢,默认情况下,不需要任何配置,时间戳是可以获取到的,除非你的kafka版本过低。
你可以关注我的微信公众号,里面分享了一个完整的例子。
使用的是2.10-0.10.1.2的版本 时间索引文件是有的就是数据进不去
你要不要换个新版的试试,我忘了这个有没有时间戳。
你的答案