kafka启动多个消费者,为啥只有第一个能消费到数据。

✎ @#* 发表于: 2021-12-16   最后更新时间: 2021-12-16 11:06:18   1,776 游览
0
0

问题:我一个kafka集群里有很多topic。我创建了很多consumer去订阅每一个topic,一对一的消费,但是问题是:我启动第一个消费者没问题,后面再启动其他的消费者就消费不到数据了,而且也没报错。不知道啥原因呢?我consumer写的是多例模式的。

@Component
@Scope("prototype")
public class KafkaMqConsumer {

    @Resource(name = "receiveExecutor")
    private Executor receiveExecutor;

    public void consumer(Integer dataSourceId,
            String dataSourceName,
            String metaDataCode,
            String metaDataCnName,
            KafkaConfig kafkaConfig,
            String format,
            String formatType) {
        JaasConfig jaasConfig = kafkaConfig.getJaasConfig();
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, jaasConfig.getAutoOffsetResetConfig());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, jaasConfig.getClientIdConfig());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, jaasConfig.getMaxPollRecordsConfig());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, jaasConfig.getEnableAutoCommitConfig());

        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(
                    Stream.of(kafkaConfig.getTopic().split(",")).collect(Collectors.toList()));
            Runnable runnable =
                    () -> {
                        while (true) {
                            ConsumerRecords<String, String> records =
                                    consumer.poll(Duration.ofSeconds(1L));
                            log.info(
                                    "kafka : The number of records for all topics :{}",
                                    records.count());
                            records.forEach(
                                    record -> {……

调用方式:

@Resource private ObjectFactory<KafkaMqConsumer> objectFactory;
void method(){
 objectFactory.getObject().consumer(dataSourceId,
                dataSource.getName(),
                dataSource.getMetaDataCode(),
                metaDataMapper.selectByPrimaryKey(dataSource.getMetaDataCode()).getCnName(),
                kafkaConfig,
                dataSource.getFormat(),
                dataSource.getFormatType());
}

2.最开始报错,

WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629

然后我百度,改了clientid。每一个consumer分别去订阅不同topic,就出现不报错,也没法消费到数据。

心态崩了啊老铁!

发表于 2021-12-16
添加评论
0

虽然你这个对象工厂的逻辑我不知道是什么。

javax.management.InstanceAlreadyExistsException

但是基于实例已存在的异常来看,你必须要解决的是

  1. 对象是new的。
  2. 你这个对象工厂,区别“存在”的依据是什么?比如new出来的对象名不能相同。
✎ @#* -> 半兽人 3年前

我大概理解了,您说的对象是KafkaConsumer,KafkaConsumer这个东西,我只需要创建一次,然后下次需要订阅其他topic的时候再使用这个,不需要再new。谢谢

2021-12-16 15:48

直接又报错了。我是尝试再次订阅的时候。

KafkaConsumer is not safe for multi-threaded access

半兽人 -> ✎ @#* 3年前

多线程不安全,你最好还是先了解一下。
可以试试:SpringBoot和kafka集成

✎ @#* -> 半兽人 3年前

问题解决了。原来的多例,线程池方式都没问题。把核心线程数设置的大一点解决了。原来设置的是一。不过还是不清楚为啥,这个跟核心线程有关?为啥非核心线程的就不行了?用的是那个task的excuter。有空研究下要不要换个。

✎ @#* -> 半兽人 3年前

我不是多线程去new,我是线程池里面跑的订阅方法

你的答案

查看kafka相关的其他问题或提一个您自己的问题