多线程消费 KafkaConsumer is not safe for multi-threaded access

一个副本、一个分区生产信息。

消费的时候,开启了3个线程,并发消费同一主题,同一个组。

启动后瞬间就出现:

Exception in thread "Thread-10" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031)
at com.oristartech.kafka.core.consumer.ConsumerHandler$1.run(ConsumerHandler.java:100)
at java.lang.Thread.run(Thread.java:748)

翻看了一下其他问题发问,好像是说消费者是非线程安全的,而且线程 和 分区也有着限定的关系(线程 ≤ 分区)。

如果要满足多线程消费 同主题+同分组,该如何处理才好







发表于: 17天前   最后更新时间: 17天前   游览量:89
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 一个消费者对应一个分区,所以你多个消费者,其实永远只有一个消费者拿到消息.
    kafka拉取消息是一批一批的,大概一次拉取2000-4000条。你应该在拉取后进行多线程处理.
    • 好的。
      但是,poll的时候,测试发现大概每轮只有500条,就停顿0.5-1秒。并不像您说的“大概一次拉取2000-4000条”。
      消费者拉取主函数:
      public void consume(int threadNumber) {

        if (ConsumerThreadTools.mapListeners.isEmpty()) {
         logger.error("消息处理对象为空,不进行消费.........返回");
         return;
        }

        if(executors == null) {
         
         executors = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
           new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        }else {
         
        }
        
        try {
         while (true) {
          ConsumerRecords<String, String> records = consumer.poll(500);
          System.out.println(JSONObject.toJSONString(records));
          if (!records.isEmpty()) {
           executors.submit(new ConsumerThreadWorker(records, offsets));
          }
          commitOffsets();
         }
        } catch (WakeupException e) {
         // swallow this exception
        } finally {
         commitOffsets();
         consumer.close();
        }
       }


      • 评论…
        • in this conversation