kafka高级消费,topic里面有数据,但是消费不到

客户端版本0.10.2.1-2.11(本地代码)
服务端版本0.8.2.1-2.10(192.168.137.131)

package com.yu.test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class HighLevelConsumerTest extends Thread {

    public int sendPersize = 1;
    public String consumerName = "test_consumer03051";
    public String topic = "ywl_test";
    public Map<String, Integer> topicCountMap;
    public ConsumerConfig config;
    public ConsumerConnector consumer;
    public Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = null;
    public KafkaStream<byte[], byte[]> stream = null;
    public ConsumerIterator<byte[], byte[]> it = null;
    public List<byte[]> bytes;
    public Properties properties;
    private AtomicInteger sendCount;

    public HighLevelConsumerTest() {
        initProperties();
        topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        config = new ConsumerConfig(properties);
        sendCount = new AtomicInteger(0);
    }

    public void initProperties() {
        properties = new Properties();
        properties.put("zookeeper.connect", "192.168.137.131:2181");
        properties.put("group.id", "tes11t1");
        properties.put("auto.commit.enable", "false");
        properties.put("zookeeper.session.timeout.ms", "4000");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("consumer.timeout.ms", "10000");
    }

    public static void main(String[] args) throws Exception {
        new HighLevelConsumerTest().start();
    }

    @Override
    public void run() {
        while (true) {
            System.out.println(consumerName + " start to consumer Messages.");
            bytes = new ArrayList<byte[]>();
            try {
                consumer = Consumer.createJavaConsumerConnector(config);
                consumerMap = consumer.createMessageStreams(topicCountMap);
                stream = consumerMap.get(topic).get(0);
                it = stream.iterator();
                while (true) {
                    try {
                        while (it.hasNext()) {
                            bytes.add(it.next().message());
                            if (bytes.size() >= 8) {
                                submitMessage(bytes);
                            }
                        }
                    } catch (ConsumerTimeoutException e) {
                        System.out.println("------------------------ConsumerTimeoutException--------------------------------------------");
                        if (bytes.size() > 0) {
                            submitMessage(bytes);
                        }
                        System.out.println(consumerName + " consumer Messages time out for 10 seconds, and will re-connect after 1 minute.");

                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                close();
                sleepSeconds(60);
            }
        }
    }

    public void submitMessage(List<byte[]> bytes) {
        int size = bytes.size();
        if (exceute(bytes)) {
            consumer.commitOffsets();
            sendCount.addAndGet(size);
            System.out.println(sendCount.get());
            bytes.clear();
        }
    }

    public boolean exceute(List<byte[]> bytes) {
        for (byte[] str : bytes) {
            System.out.println(str);
        }
        return true;
    }

    public void sleepSeconds(int seconds) {
        try {
            sleep(seconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void close() {
        if (consumer != null) {
            consumer.shutdown();
            consumer = null;
        }
        if (it != null && it.hasNext()) {
            it.clearCurrentChunk();
            it = null;
        }
        if (stream != null && stream.size() > 0) {
            stream.clear();
            stream = null;
        }
        if (consumerMap != null) {
            consumerMap = null;
        }
    }
}





发表于: 1年前   最后更新时间: 1年前   游览量:1378
上一条: 关于kafka集群的配置
下一条: Failed to construct kafka consumer

  • 版本不一致。