@KafkaListener(topics = "kafka-jt808-0200")
public void handle0200(ConsumerRecord<String, String> record) {
System.out.println("000 " + record.value());
}
1、去掉线程池,只打印,你在试试,线程池会一直创建线程,影响速度。
2、缩小每条记录的大小,如果你只测试条数的话。
另外,你的kafka部署在阿里云,你的程序如果在本地,那么也会因为带宽问题,影响的,毕竟是走公网的。
我只打印一下消息,一分钟才七八千,按理说一个system.out.print不应该有太大的消耗的。
我的配置如下:
kafka:
bootstrap-servers: 39.105.175.129:9092 # kafka集群信息
producer: # 生产者配置
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: hujunjie
# group-id: zhTestGroup # 消费者组
enable-auto-commit: true # 关闭自动提交
auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: RECORD
@KafkaListener(topics = "kafka-jt808-0200")
public void handle0200(ConsumerRecord<String, String> record) {
ThreadPoolExecutorFactory.executor.execute(() -> {
String value = record.value();
System.out.println("000 " + value);
});
}
netstat -unltpa|grep 9092
查看你端口的绑定情况,贴一下。