这种也不是报错,为什么重复fetch不存在的offset呢
@KafkaListener (
id = "${spring.kafka.consumer.listener-id}", idIsGroup = false,
groupId = "${spring.kafka.consumer.group-id}",
topics = "#{'${srm.xdp.topics}'.split(',')}",
containerFactory = "consumerFactory"
)
public void onDealMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
StopWatch stopWatch = new StopWatch();
try {
List<HuaweiDRSVO> insertList = new ArrayList<>();
List<HuaweiDRSVO> tempList = new ArrayList<>();
Assert.notEmpty(records, "本次拉取消息为空");
LOGGER.info("[start consume]开始本次消费.....");
stopWatch.start();
for (ConsumerRecord<String, String> record : records) {
HuaweiDRSVO huaweiDRS = JacksonUtils.json2Obj(record.value(), HuaweiDRSVO.class);
//跳过业务逻辑,测试直接报错
throw new Exception();
}
ack.acknowledge();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("消费异常信息:{}", e.getMessage());
} finally {
stopWatch.stop();
LOGGER.info("[end consume]本次处理总耗时:{}ms", stopWatch.getTotalTimeMillis());
}
}
2021-12-08 20:58:27.734 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:27.751 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:27.793 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:27.808 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:29.167 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:29.199 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:29.786 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 7389 for partition DRS-test_drs_insert-1 returned fetch data (error=NONE, highWaterMark=7389, lastStableOffset = -1, logStartOffset = 7389, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:29.786 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-1 at offset 7389 to node 127.0.0.1:9092 (id: 1 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:29.786 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-1] to broker 127.0.0.1:9092 (id: 1 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:29.798 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 505684 for partition DRS-test_drs_insert-2 returned fetch data (error=NONE, highWaterMark=505684, lastStableOffset = -1, logStartOffset = 7507, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:29.798 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-2 at offset 505684 to node 127.0.0.1:9092 (id: 2 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:29.798 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-2] to broker 127.0.0.1:9092 (id: 2 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:32.332 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 2684419 for partition DRS-test_drs_insert-0 returned fetch data (error=NONE, highWaterMark=2684419, lastStableOffset = -1, logStartOffset = 2609646, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:32.333 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-0 at offset 2684419 to node 127.0.0.1:9092 (id: 0 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:32.333 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-0] to broker 127.0.0.1:9092 (id: 0 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:34.402 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:34.418 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:34.463 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:34.486 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:34.819 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 7389 for partition DRS-test_drs_insert-1 returned fetch data (error=NONE, highWaterMark=7389, lastStableOffset = -1, logStartOffset = 7389, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:34.819 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-1 at offset 7389 to node 127.0.0.1:9092 (id: 1 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:34.819 DEBUG 10423 --- [insert-11-1-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-1] to broker 127.0.0.1:9092 (id: 1 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:34.850 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 505684 for partition DRS-test_drs_insert-2 returned fetch data (error=NONE, highWaterMark=505684, lastStableOffset = -1, logStartOffset = 7507, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:34.851 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-2 at offset 505684 to node 127.0.0.1:9092 (id: 2 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:34.851 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-2] to broker 127.0.0.1:9092 (id: 2 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:35.836 DEBUG 10423 --- [st_drs_insert-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Sending Heartbeat request to coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)
2021-12-08 20:58:35.862 DEBUG 10423 --- [insert-11-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=srm-xdp-test_drs_insert-9] Received successful Heartbeat response
2021-12-08 20:58:37.365 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Fetch READ_UNCOMMITTED at offset 2684419 for partition DRS-test_drs_insert-0 returned fetch data (error=NONE, highWaterMark=2684419, lastStableOffset = -1, logStartOffset = 2609646, abortedTransactions = null, recordsSizeInBytes=0)
2021-12-08 20:58:37.365 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Added READ_UNCOMMITTED fetch request for partition DRS-test_drs_insert-0 at offset 2684419 to node 127.0.0.1:9092 (id: 0 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:37.365 DEBUG 10423 --- [insert-11-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=srm-xdp-test_drs_insert-9] Sending READ_UNCOMMITTED fetch for partitions [DRS-test_drs_insert-0] to broker 127.0.0.1:9092 (id: 0 rack: cn-east-2c###5547fd6bf8f84bb5a7f9db062ad3d015)
2021-12-08 20:58:38.635 INFO 10423 --- [ Thread-15] ConfigServletWebServerApplicationContext : Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@6989da5e: startup date [Wed Dec 08 20:46:34 CST 2021]; root of context hierarchy
2021-12-08 20:58:38.636 INFO 10423 --- [ Thread-15] ConfigServletWebServerApplicationContext : Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@56f730b2: startup date [Wed Dec 08 20:46:39 CST 2021]; parent: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@6989da5e
2021-12
这种信息是什么情况呢?
kafka是主动拉取消息的,所以一直在尝试拉取消息。
但是拉不到,所以是debug的日志类型,属于正常。
好的,谢谢
你的答案