先说明下,有问题的kafka 是3个partition。
每次隔个三天五天的,lag监控就会报警,每次去看lag的消费情况,基本都是单partition的lag上升,去看业务日志,发现消费改partition的消费者不消费了(日志不继续打了,感觉是僵死了),消费的进程还在(机器资源没有明显上升,cpu、内存、io等都没)。
这时重启一下消费者脚本就可继续消费,lag会慢慢下降,想问下这个是什么问题,困扰很久了,一直报警。辛苦各位大佬帮忙解答下。
业务线用的kafka集群版本是kafka_2.11-0.10.1.0,代码引入的客户端版本是
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
kafka消费者线程已经从你的java进程里结束了,所以会导致这个进程永远不会重连kafka进行消费了。
解决:kafka获取到消息之后,业务处理时,捕获异常,不要向上抛
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try{ // 你的业务 } catch (Exception e) { // 忽视异常,不要向上抛 } } }
先谢大佬回答。除了这种情况,还有额外其他可能的情况吗?因为项目代码就是这样写的,而且数据量应该是比较大的,最起码不小,应该不会存在因partition没有数据而终止。此外,在lag升高时,看日志并没有明显的异常日志,应该也不会存在因为异常导致的中断情况。再次感谢大佬回复!
下附业务消费者代码,仔细看了下,业务逻辑内部有捕获异常代码。
// 消费者代码 public boolean waitTeminal() { try { check(); final ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topicFrom, threadnum); // 一次从主题中获取一个数据 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> msgStreamList = messageStreams .get(topicFrom); logger.debug(String.format("The Kafka Stream List @size: %s", msgStreamList.size())); for (int i = 0; i < msgStreamList.size(); i++) { final KafkaStream<byte[], byte[]> stream = msgStreamList.get(i); ConsumerPool.execute(new Runnable() { @Override public void run() { logger.info(String.format( "kafka consumer started! commitBuffer: %s", String.valueOf(commitBuffer))); ConsumerIterator<byte[], byte[]> iterator = stream .iterator(); int commitCount = 0;// 批量提交计数器 List<E> events = new ArrayList<>(); while (iterator.hasNext()) { long startTime = System.currentTimeMillis(); // 消息 byte[] msg = iterator.next().message(); if (msg == null || msg.length <= 0) { continue; } // 事件校验 E e = null; try { e = parse(new String(msg).trim()); } catch (Exception except) { logger.error("json格式不合法:" + Arrays.toString(msg), except); continue; } if (!validate(e)) { continue; } if (events.size() < 5) { events.add(e); } if (events.size() == 5){ List<O> os = transferList(events); for (O out : os){ if (null != out) { BusinessLogger.sendEvent(topicTo, kafkaPartitionBy(out), getBytes(out)); } } events.clear(); } // O out = transfer(e); // commit计数累加 commitCount++; if (commitCount >= commitBuffer) { consumer.commitOffsets(); commitCount = 0; logger.info("@batchCommitSize:" + commitBuffer + "\t@times:" + (System.currentTimeMillis() - startTime)); } } } }); } return true; } catch (Exception e) { e.printStackTrace(); logger.info(String .format("the committing was wrong cause top reason")); return false; } }
// 业务逻辑代码,这里这样做,是因为之前认为是单线程消费导致处理不过来导致的,之后想采用多线程的方式处理,但是offse不好控制,就简单这样写了,效果是有的,报警确实比较少了,但是还会有。 @Override public List<Event> transferList(List<Event> events) { List<Event> results = new ArrayList<>(); Future<Event> eventFuture0 = addTask(events.get(0)); Future<Event> eventFuture1 = addTask(events.get(1)); Future<Event> eventFuture2 = addTask(events.get(2)); Future<Event> eventFuture3 = addTask(events.get(3)); Future<Event> eventFuture4 = addTask(events.get(4)); try { results.add(eventFuture0.get()); results.add(eventFuture1.get()); results.add(eventFuture2.get()); results.add(eventFuture3.get()); results.add(eventFuture4.get()); logger.info("eventResult = " + JSON.toJSONString(results)); return results; } catch (Exception e) { e.printStackTrace(); } finally { eventFuture0.isCancelled(); eventFuture1.isCancelled(); eventFuture2.isCancelled(); eventFuture3.isCancelled(); eventFuture4.isCancelled(); } return null; }
首先,你这个客户端版本太老了,已经弃用很久了,不排除版本兼容性问题。
参考新版的消费者方式:kafka消费者Java客户端
或者我之前写的例子:kafka客户端 - java
其次,多线程消费导致的,这块非常容易出问题,线程池必须是阻塞式的,就是你多线程池子虽然设置的上限是5个,但是其实你可以无限往里面丢(比如默认有1万的队列),但是在里面排队,真正处理的是5个,也会导致这个问题。
再次感谢大佬回答,
1、我也担心客户端版本的影响,所以贴出了版本信息,有没有办法能确定客户端的版本是否有影响哇。
2、多线程消费在写这块代码的时候,考虑到了无线往里面丢的情况,但是我认为这里的代码是不会发生的。因为处理业务逻辑之前是同步分批次拿五条数据,一块丢进去,多线程处理完毕再出来,循环取后五个(外层是同步的,同步取数据赛给业务逻辑多线程处理)。对于整体的处理速度确实有提升,而且跑了差不多一个多月了,并未发现数据上的异常(应该是可行的)。
验证不了哎,这个太底层了。
jdk版本,客户端版本与服务端不匹配都会造成这种现象。
我给你说的是最常见的2种情况导致的,据我所知目前是没有其他的原因会导致这个现象了。
公司的kafka版本还不能乱升,我再调研调研,再试试,谢谢大佬了。
你的答案