你kafka什么版本呢,默认情况下,不需要任何配置,时间戳是可以获取到的,除非你的kafka版本过低。
你可以关注我的微信公众号,里面分享了一个完整的例子。
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.30.79.134:6667,172.30.79.135:6667,192.168.1.219:6667,192.168.1.220:6667,192.168.1.221:6667");
//props.put("bootstrap.servers", "47.107.148.97:9092");
//props.put("bootstrap.servers", "106.12.73.246:9092");
//props.put("config","message.timestamp.type=LogAppendTime");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer consumer = new KafkaConsumer
//TopicPartition partition01 = new TopicPartition("gcgTest7", 0);
//consumer.assign(Arrays.asList(new TopicPartition[]{partition01}));
//consumer.seek(partition01,10);
consumer.subscribe(Arrays.asList(new String[]{"testGcg1"}));
while (true) {
//--消费100ms内的数据
ConsumerRecords
//--遍历得到的结果集
for (ConsumerRecord
//--得到信息 打印
System.out.println("--topic:[" + r.topic() + "]"
+ "--partition:[" + r.partition() + "]"
+ "--offset:[" + r.offset() + "]"
+ "--timestamp:[" + r.timestamp() + "]"
+ "--key:[" + r.key() + "]"
+ "--value:[" + r.value() + "]--" +
"type="+r.timestampType());}
}
}</string,></string,></string,>
请问,我在server.properties中已经加入了message.timestamp.type=LogAppendTime这个配置,重启后新建的topic中消费出来的时间还是-1 这个是咋回事