//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
消费者组列表里,显示的名为ConcurrencyConsumer1
的消费者组有2台,
1台ip是192.168.4.11
和192.168.15.2
,所以每个占2个分区,是正常的。
看你的问题,你是想消费全部分区,
所以必须消费者组名不同,否则就会认定为同一个消费者组,消息就会被平均分配。