大神,若是手动提交offset,如果consumer一次性pull了100条消息出来,那么consumer.commitSync()这个方法一次性提交的是多少条消息的offset呢?
没看出来差异,我给你看下我们的。
/*
从[生产者连接池]获取一条通道,然后发送[message]到broker集群
@param topicName 主题名称
@param message 消息
@param <T> 消息类型
*/
public <T> void sendMessage(String topicName, T message) throws Exception {
// 连接
Producer producer = null;
// 阻塞调用消息发送
try {
producer = producerConnPool.getProducerConn();
producer.send(new ProducerRecord<String, T>(topicName, message)).get();
} catch (InterruptedException e) {
logger.debug("send a message InterruptedException:", e);
throw e;
} catch (ExecutionException e) {
logger.debug("send a message ExecutionException:", e);
throw e;
} catch (Exception e) {
logger.error("fetch a producer connection Exception:", e);
} finally {
// 释放连接
producerConnPool.releaseConn(producer);
}
}