kafka consumer通过seek的方式可以找到历史数据从那里开始读取,在读取一段时间后服务器自动重启了。如果重新运行这个程序的话,又是seek到之前的位置重复消费。我该如何从上次程序中断那个offset那里消费呢?
package com.example.demo.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class Consumer {
public static void main(String[] args) throws ParseException {
// hello();
hi();
}
private static long timeToTimestamp(String time) throws ParseException {
SimpleDateFormat fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dt = fm.parse(time);
long ts = dt.getTime();
return ts;
}
private static void hello() throws ParseException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "tstest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "tstest";
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
long fetchDataTime = timeToTimestamp("2021-03-16 17:16:00");
for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("开始设置各分区初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("设置各分区初始偏移量结束...");
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());}
}
}
private static void hi() throws ParseException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "tstest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "tstest";
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
long fetchDataTime = timeToTimestamp("2021-03-16 17:16:00");
for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
// timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
/*
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("开始设置各分区初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("设置各分区初始偏移量结束...");
*/
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());}
}
}
}
不要seek了,如果你提交了offset,消费者组重新启动会从最后一次提交的地方开始。
我也感觉是这样的,但是实际上跑出来没有从上次的地方继续,是参数哪里设置错误了吗。
我把consumer的代码贴上。首先执行了hello,让他5点16的数据开始消费。然后按了停止,改为执行hi。好像因为auto.commit.reset默认是latest所以没有消费到数据。
怀疑你的offset没有提交成功,你先执行hello,然后看看消费者组的lag:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group tstest
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tstest ts 0 3350064 3350064 0 - - -
tstest tstest 2 3333998 3333998 0 - - -
tstest tstest 1 3333398 3333398 0 - - -
tstest tstest 0 3332604 3332604 0 - - -
tstest ts 2 3350248 3350248 0 - - -
tstest ts 1 3349688 3349688 0 - - -
你的offset是最新的。
我代码贴在问题上面了,group id设置的应该是一样的啊,为什么并没有同步呢
是不是应该把第一次seek之后的数据手动提交一下,貌似是这里没提交上
试试看
你的答案