消费端在消费消息时,有时一批消息开头几条消息消费端感觉像没有拉取到,猜测会不会是上一次拉去消息的时候把后面几个偏移量消费了?但是这不应该把。不知道具体是什么问题,该如何找问题呢?我是用的同步+异步的方式提交的偏移量。新建了一个servlet项目启动时初始化servlet的init方法来启动的线程。servlet的destroy调用时也就是项目关闭时去结束while循环,然后调用consumer.commitSync();同步提交的方法。
代码如下
public Consumer(String servers, String topicName, boolean flag) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");//获取记录数
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//处理消息最大间隔时间
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.flag=flag;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
logger.info("---------kafka消费端开始消费---------");
Gson gson = new Gson();
try {
while(flag){
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList) {
String reqJson = record.value();
ProposalSaveFor3105RequestDto baseRequestDto = null;
try {
baseRequestDto = gson.fromJson(reqJson,ProposalSaveFor3105RequestDto.class);
String proposalNo = baseRequestDto.getPolicyDataDto().getInsuredItemProcutDtoList().get(0).getInsuredDto().getProposalNo();
logger.info("kafka拉取了单号:"+proposalNo);
}catch (Exception e){
e.printStackTrace();
logger.error("kafka消费端异常---转换报文报错---报文内容:"+reqJson+",异常信息:",e);
}
if(baseRequestDto!=null){
try {
KafkaProposalSaveFor3105ServiceImpl service = new KafkaProposalSaveFor3105ServiceImpl();
service.saveProposal(baseRequestDto);
}catch (Exception e){
e.printStackTrace();
logger.error("kafka消费端业务逻辑执行异常---异常信息:",e);
}
}
}
consumer.commitAsync();
}
}
} catch (Exception e) {
logger.error("kafka消费端消费信息时异常---异常信息:",e);
e.printStackTrace();
} finally {
try {
consumer.commitSync();
}catch (Exception e1){
e1.printStackTrace();
}finally {
consumer.close();
}
}
}
程序没有报错。
偶尔会出现这样的问题,不知道是不是我消费端写的哪里有什么问题。求大神分析分析,谢谢。
各位大佬支支招,感谢
消费者是定时拉起来消费,消费完成后就关闭?等待下次消费?
是一个死循环一直在拉取消息,项目服务停止的时候会结束循环
消费者是批量拉取消息,也就是说在你停止项目的时候,如果此时刚拉取了消息,提交offsr,但程序只处理一部分,那剩下的消息就会漏掉。
我设置的是一次只拉去一条消息,服务没有停,也出现了这样的问题
commitAsync()
或commitSync()
是批量行为,你如果一条一条的消费,那就一条一条的提交。比如:public <T> void poll(String topicName, Object obj, Class<T> clas) { // 订阅一个主题 consumer.subscribe(Arrays.asList(topicName)); while (ConsumerService.flag) { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { long start = System.currentTimeMillis(); try { consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1))); } catch (Exception e) { logger.warn("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},error:{}", topicName, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e); break; } catch (Throwable e) { logger.warn("fatal Error:kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},message:{},error:{}", topicName, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e); break; } // 调用业务逻辑 try { BizClassUtils.get(obj).doBiz(JSON.parseObject(record.value(), clas)); } catch (Exception e) { logger.error("a message Exception: message:{},topicName:{},error:{}", record.value(), e); } catch (Throwable e) { logger.error("a message throwable: message:{},topicName:{},error:{}", record.value(), e); } finally { long endTime = (System.currentTimeMillis() - start); if (endTime > 20000) logger.debug("Business processed single a message used time:{}ms,message total:{}", (System.currentTimeMillis() - start), partitionRecords.size()); } } } } if (ConsumerService.flag == false) logger.info("【kafka消费者线程结束】...."); }
参考:https://www.orchome.com/1056
我一次只拉取一条消息,用异步+同步的方式提交可能会出现问题吗?现在程序运行不是每次都会出现问题
想找出问题出现在哪里
你的代码看了,没问题,你是没有停止的情况下也有消息丢失。
但是只有在强杀的情况下,就会跟上面我说的场景一样,程序只处理一部分,剩下的消息就会漏掉。下次启动跳过了这些消息。
好的,感谢
重新验证了一下,可能不是消费端丢失消息。分区只有一个,而且消费端打出的日志中偏移量并没有中断过,偏移量是连续正确的,但是生产端发送了5条消息,但消费端只消费了3条消息,说明应该是生产端发送消息时丢失了2条消息,可以得出这样的结论吗?
可以,生产者会停止吗 还是一直发送?还是发够5条停?
生产者是在被触发了某种操作后会发送消息,生产者不会停,发送条数每次也可能不同
问题找到了吗(ps:注意如果有问题的点,尽量详细一点,或者贴一下相关代码,不然我只能猜)
谢谢。有个新问题想咨询一下,昨天我查看日志发现我的主题所有分区都从头开始重新消费了一次,因为kafka服务不是我在维护,怀疑可能是谁操作了kafka服务,请问知道可能做了什么操作吗?
新开个问题吧
__consumer_offsets
,如果它清空了,有可能会导致。你的答案