kafka多线程消费取不到数据

ighack 发表于: 2017-06-13   最后更新时间: 2017-06-13 19:19:10   5,782 游览

主类核心代码

  ThreadGroup tg = new ThreadGroup("处理线程组");
        while(true){
            if (tg.activeCount() < threadNum) {
                Thread th = new Thread(tg, new Consumer(topicName,filePath));
                th.start();
            } else {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

Consumer类的核心代码

public void run(){
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", enableAutoCommit);
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records", maxPollRecords);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        //Thread ch = new Thread(new CheckThread(tg,topic));
        //ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);


                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    DoSomeThings Dos = new DoSomeThings();
                    boolean iResult = Dos.execute(record);
                    if(iResult){
                        consumer.commitSync();
                    }
                    //fixedThreadPool.execute(new DoSomeThings(record));
                }
                consumer.close();
    }
发表于 2017-06-13
添加评论

https://www.orchome.com/5
看完这篇文章 你就明白了,分区数=线程数

半兽人 -> 半兽人 6年前

不严谨,线程数<=分区数

ighack -> 半兽人 6年前

我有三个分区,只开了一个线程

半兽人 -> ighack 6年前

所以最多只能有3个线程消费。

ighack -> 半兽人 6年前
Consumer类的核心代码
consumer
= new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
public void run(){
boolean isBreak = false;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeThings Dos = new DoSomeThings();
boolean iResult = Dos.execute(record);
if (iResult) {
consumer.commitSync();
}
isBreak = true;
//fixedThreadPool.execute(new DoSomeThings(record));
}
if(isBreak){
break;
}
}
consumer.wakeup();
}

主类的核心代码

ExecutorService ConsumerThreadPool = Executors.newFixedThreadPool(threadNum);
System.out.println("Hello World!");
RedisHelper.setRedisURL(redisURL);
RedisHelper.getLogicConfigs();
ThreadGroup tg = new ThreadGroup("处理线程组");
while(true){

ConsumerThreadPool.execute(new Consumer(topicName,filePath));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我发现这种写法可以取到数据。但取不全我发5条数据。日志里只有3条。而且第次都是相同的三条。如每次发1,2,3,4,5 日志记录里每次记录的都是0,1,4
ighack -> ighack 6年前

我发现这种写法可以取到数据。但取不全我发5条数据。日志里只有3条。而且第次都是相同的三条。如每次发0,1,2,3,4 日志记录里每次记录的都是0,1,4

半兽人 -> ighack 6年前

自动关闭设置为false。

ighack -> 半兽人 6年前

我换了一种写法。现在可以接收全。但问题是自动关闭也高在了false。但没有提交的数据。不重启是收不到数据的

半兽人 -> ighack 6年前

什么叫没有提交的数据。

你的答案

查看kafka相关的其他问题或提一个您自己的问题