kafka接口项目 consumer堆外内存管理

G1-JVM 发表于: 2022-01-07   最后更新时间: 2022-01-07 17:54:35   1,225 游览

之前有过一次提问,见:https://www.orchome.com/10576 (kafka频繁报 OutOfMemoryError: Direct buffer memory 异常)

后来发现可能不是写入端的问题,而是consumer消费造成的问题

[http-nio-10020-exec-404] ERROR o.a.c.c.C.[.[localhost].[/].[dispatcherServlet] | Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Direct buffer memory] with root cause
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at com.xd.newkafka.utils.KafkaConsumerUtils.consumerAssin(KafkaConsumerUtils.java:105)
    at com.xd.newkafka.controller.KafkaController.kafkaRead(KafkaController.java:200)

想请教一下:

1、在当前的模式下,每从controller层进来一个请求,就根据其所传参数创建一个consumer进行数据拉取,拉取完毕后关闭consumer,这样的模式是否合理?

2、接口项目的consumer需要保持长连接么?


public static List<KafkaBasePojo> consumerAssin(String topicName, String hostName, String consumerName, int partition,Integer consumerNum, long offset, int autoOffset) {

        List<KafkaBasePojo> kafkaBasePojoList = new ArrayList<>();
        Properties props = new Properties();
        props.put("bootstrap.servers", hostName);
        props.put("group.id", consumerName);
        props.put("enable.auto.commit", isAutoCommitBool);
        props.put("auto.commit.interval.ms", "1000");
        Integer maxSize = Integer.valueOf(GetCommon.getBusinessConfigMapValue("kafka.max.fetch.bytes", "2097152"));
        props.put("fetch.max.bytes", maxSize);
        props.put("fetch.message.max.bytes", maxSize);
        props.put("max.partition.fetch.bytes", maxSize);
        props.put("max.poll.records", MAX_POLLSIZE);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 50 * 1000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 50 * 1000);

        try  {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            ...
            ...
            try {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(700));
                ...
                ...
                consumer.close(Duration.ofMillis(5000));
            }


        } 
        return ...;
    }
发表于 2022-01-07
添加评论

kafka consumer是持续拉取消息的,拉到消息你就处理,永远不要关闭,本来就是持续批量拉取的(除了停机时候调用一下,否则与程序共存亡的)。

G1-JVM -> 半兽人 3年前

如果不能关闭的话,接口项目该如何设计来为其他项目提供数据拉取服务呢?因为这个项目是设计多方使用者的。另外,在学习kafka的过程中一直对生产者、消费者的使用找不到很明确的学习资料,如果能推荐一些,就非常感激了!

半兽人 -> G1-JVM 3年前

那你们设计有问题,kafka本来就是个中心,为何在来一个中心做中转?想想如果是我来实现这种设计,逻辑一定非常复杂。

网站只有:

G1-JVM -> 半兽人 3年前

😓 感谢解答,这样设计大概是为了将kafka资源统一管理、统一维护吧。

半兽人 -> G1-JVM 3年前

多此一举,画蛇添足。

你的答案

查看kafka相关的其他问题或提一个您自己的问题