请教个问题,使用spring kafka 本地调试循环发送1W+消息时,就会会出现producer超时被强制关闭异常,但是部署到服务器循环发送10W+消息都不会出现这种异常,两者使用一样的producer 配置,能帮忙分析下是什么原因吗?
java.lang.IllegalStateException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:696) [kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:683) [kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:185) [kafka-clients-1.0.1.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
程序不是优雅停机导致的吧。
还真是停机导致的,我加多个@kafkalistener,spring容器不会关闭,就不会出现producer被强制关闭这个问题了;
但是,我实现CommandLineRunner接口,在run方法内部循环发送,理论上也要执行完发送消息逻辑才会停机吧?,除非CommandLineRunner有时间限制?能帮忙解答下吗??
代码如下:
@SpringBootApplication @EnableKafka public class KafkaExampleApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(KafkaExampleApplication.class, args); } @Autowired private KafkaProducerExample kafkaProducerExample; @Autowired private KafkaProducerUtil kafkaProducerUtil; @Override public void run(String... args) { for (int i=0; i<50000; i++) { kafkaProducerExample.sendMsg(); } } }
1、可以在main方法里,增加几秒的休眠。
2、在for方法会面追加
producer.close();
可参考:https://www.orchome.com/303
你的答案