一个副本、一个分区生产信息。
消费的时候,开启了3个线程,并发消费同一主题,同一个组。
启动后瞬间就出现:
Exception in thread "Thread-10" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031)
at com.oristartech.kafka.core.consumer.ConsumerHandler$1.run(ConsumerHandler.java:100)
at java.lang.Thread.run(Thread.java:748)
翻看了一下其他问题发问,好像是说消费者是非线程安全的,而且线程 和 分区也有着限定的关系(线程 ≤ 分区)。
如果要满足多线程消费 同主题+同分组,该如何处理才好
一个消费者对应一个分区,所以你多个消费者,其实永远只有一个消费者拿到消息.
kafka拉取消息是一批一批的,大概一次拉取2000-4000条。你应该在拉取后进行多线程处理.
好的。
但是,poll的时候,测试发现大概每轮只有500条,就停顿0.5-1秒。并不像您说的“大概一次拉取2000-4000条”。
消费者拉取主函数:
public void consume(int threadNumber) {
if (ConsumerThreadTools.mapListeners.isEmpty()) {
logger.error("消息处理对象为空,不进行消费.........返回");
return;
}
if(executors == null) {
executors = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}else {
}
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println(JSONObject.toJSONString(records));
if (!records.isEmpty()) {
executors.submit(new ConsumerThreadWorker(records, offsets));
}
commitOffsets();
}
} catch (WakeupException e) {
// swallow this exception
} finally {
commitOffsets();
consumer.close();
}
}
拉取的条数跟你消息大小有关,你把poll时间调成100,提交改成自动试试
你的答案