问题:我一个kafka集群里有很多topic。我创建了很多consumer去订阅每一个topic,一对一的消费,但是问题是:我启动第一个消费者没问题,后面再启动其他的消费者就消费不到数据了,而且也没报错。不知道啥原因呢?我consumer写的是多例模式的。
@Component
@Scope("prototype")
public class KafkaMqConsumer {
@Resource(name = "receiveExecutor")
private Executor receiveExecutor;
public void consumer(Integer dataSourceId,
String dataSourceName,
String metaDataCode,
String metaDataCnName,
KafkaConfig kafkaConfig,
String format,
String formatType) {
JaasConfig jaasConfig = kafkaConfig.getJaasConfig();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, jaasConfig.getAutoOffsetResetConfig());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, jaasConfig.getClientIdConfig());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, jaasConfig.getMaxPollRecordsConfig());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, jaasConfig.getEnableAutoCommitConfig());
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(
Stream.of(kafkaConfig.getTopic().split(",")).collect(Collectors.toList()));
Runnable runnable =
() -> {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1L));
log.info(
"kafka : The number of records for all topics :{}",
records.count());
records.forEach(
record -> {……
调用方式:
@Resource private ObjectFactory<KafkaMqConsumer> objectFactory;
void method(){
objectFactory.getObject().consumer(dataSourceId,
dataSource.getName(),
dataSource.getMetaDataCode(),
metaDataMapper.selectByPrimaryKey(dataSource.getMetaDataCode()).getCnName(),
kafkaConfig,
dataSource.getFormat(),
dataSource.getFormatType());
}
2.最开始报错,
WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629
然后我百度,改了clientid。每一个consumer分别去订阅不同topic,就出现不报错,也没法消费到数据。
心态崩了啊老铁!
虽然你这个对象工厂的逻辑我不知道是什么。
但是基于实例已存在的异常来看,你必须要解决的是
我大概理解了,您说的对象是KafkaConsumer,KafkaConsumer这个东西,我只需要创建一次,然后下次需要订阅其他topic的时候再使用这个,不需要再new。谢谢
2021-12-16 15:48
直接又报错了。我是尝试再次订阅的时候。
多线程不安全,你最好还是先了解一下。
可以试试:SpringBoot和kafka集成
问题解决了。原来的多例,线程池方式都没问题。把核心线程数设置的大一点解决了。原来设置的是一。不过还是不清楚为啥,这个跟核心线程有关?为啥非核心线程的就不行了?用的是那个task的excuter。有空研究下要不要换个。
我不是多线程去new,我是线程池里面跑的订阅方法
你的答案