版本
spring-boot :2.1.0.RELEASE
spring-kafka:2.2.0.RELEASE
kafka版本:kafka_2.12-2.1.0,一台机器启用3个实例配置
@Bean
public KafkaConsumer listener(){
return new KafkaConsumer();
}
@Bean
public KafkaListenerContainerFactory messageListenerContainer(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory consumerFactory(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,url);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"zlink");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
return consumerFactory;
}
@Bean
public ProducerFactory producerFactory(){
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,url);
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
return new DefaultKafkaProducerFactory(props);
}
@Bean
public KafkaTemplate kafkaTemplate(){
return new KafkaTemplate(producerFactory());
}
@Bean
public NewTopic topic(){
return new NewTopic("zlink-device-online",3, (short) 3);
}
@Bean
public KafkaAdmin kafkaAdmin(){
Map<String,Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,url);
return new KafkaAdmin(config);
}
- listener
@KafkaListener(topics = ConstantCode.KAFKA_TOPIC_DEVICE_ONLINE)
public void listen(ConsumerRecord<String,String> record){
String data = record.value();
Integer partition = record.partition();
Long timestamp = record.timestamp();
Long offset = record.offset();
log.info("kafka consumer message:{};partition:{};offset:{};timestamp:{}",data,partition,offset,timestamp);
gatewayService.gatewayOnline(data);
}
问题:
gatewayService.gatewayOnline(data)处理消息需要800ms的时间。
第一步:发送500条消息
第二步:过十分钟后在发送500条消息
之后会隔两三分钟发送500条消息,总发送12次的500条消息,最后的结果是第二步所有消息会收到两次,其他的都是一次,请问一下会是什么原因导致的?应该怎么解决?
springboot没用过,有些代码看不懂,我先大概猜测下。
自动提交offset时,gatewayService.gatewayOnline(data); 如果抛异常,listen会不会重新来,导致没提交成功。
从日志看没有报错信息,如果有异常,应该会打印的
已经解决了,中间有一个重新分配分区的日志,之前一直没看到。原因就是一次获取500条消息的处理时间大于了max.poll.interval.ms这个值
你的答案