kafka的消费进程存在,但阻塞不消费数据,查到一些分区无消费者

SmallHeart 发表于: 2019-05-10   最后更新时间: 2019-05-10 19:04:06   4,788 游览

提问说明

1.kafka的消费进程存在,但阻塞不消费数据
2.去kafka服务器上查询某些分区没有消费者
3.使用高级消费api

消费代码

 public class KafkaConsumerThread implements Runnable {
    private final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class);
    private ThreadPoolExecutor executor;
    private ConsumerConnector connector = null;
    private KafkaStream<byte[], byte[]> kafkaStream = null;
    private int handlerType;

    private volatile boolean run = true;

    public KafkaConsumerThread(String topic, int partitions, ThreadPoolExecutor executor, int handlerType) {
        this.executor = executor;
        ConsumerConfig config = createConsumerConfig(topic);
        connector = Consumer.createJavaConsumerConnector(config);
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, partitions); // 每个分区设置一个线程消费消息
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = connector.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(topic);
        kafkaStream = kafkaStreams.get(0);
        this.handlerType = handlerType;

    }

    private ConsumerConfig createConsumerConfig(String topic) {
        Properties props = new Properties();
        // 连接集群,可以填多个,以","分隔
        props.put("zookeeper.connect", Globals.CONSUMER_HOST);
        props.put("zookeeper.connection.timeout.ms", Globals.CON_TIMEOUT);
        props.put("group.id", "GROUP_" + topic);
        props.put("zookeeper.session.timeout.ms", "30000");
        props.put("zookeeper.sync.time.ms", "4000");
        props.put("socket.receive.buffer.bytes", String.valueOf(10 * 1024 * 1024));
        props.put("fetch.message.max.bytes", String.valueOf(10 * 1024 * 1024));
        props.put("queued.max.message.chunks", "5");
        props.put("auto.commit.interval.ms", "5000");
        props.put("rebalance.backoff.ms", "5000");
        props.put("rebalance.max.retries", "60");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> msgIte;
        MessageAndMetadata<byte[], byte[]> mam;
        while (run) {
            try {
                msgIte = kafkaStream.iterator();
            } catch (Exception ex) {
                log.error("Consumer kafka biz data Error !!", ex);
                continue;
            }
            try {
                while (msgIte.hasNext()) {
                    mam = msgIte.next();// 接收到的数据
                    Runnable runnable = null;
                    if (handlerType == 1) {
                        runnable = new AHandler(mam.message());
                    } else if (handlerType == 2) {
                        runnable = new BHandler(mam.message());
                    } else if (handlerType == 3){
                        runnable = new CHandler(mam.message());
                    } else{
                        return;
                    }
                    executor.execute(runnable);
                    log.info("Consumer data success!  topic:{}, partition:{}, offset:{}", mam.topic(), mam.partition(), mam.offset());
                    if (log.isDebugEnabled()) {
                        log.debug("pool size:{}, queue size : {}", executor.getPoolSize(), executor.getQueue().size());
                    }
                }
            } catch (Exception e) {
                log.error("Parse biz kafka data Error !!", e);
            }
        }
    }

    public void stop() {
        run = false;
        if (connector != null) {
            connector.shutdown();
        }
    }
}

目前采用的解决方案

重启消费者进程就能恢复消费

希望得到帮助

1.问题原因和复现步骤?
2.如何根源解决?
3.该消费代码是否有优化建议?

发表于 2019-05-10
添加评论

你这个是0.8版本的kafka吧。很古老。

你使用的线程池是什么类型的?是不是线程池提交任务的时候阻塞了 把代码贴完整一点吧

你的答案

查看kafka相关的其他问题或提一个您自己的问题