kafka 消费者消费不到数据

Tpcy~ 发表于: 2019-01-24   最后更新时间: 2019-01-24 10:58:34   14,147 游览

提问说明

1、在server.properties中num.partitions=2,配置了两个分区,我存入数据的时候,kafka应该会把数据分别存入到这两个分区里,然后我建了两个线程对应两个消费者,分别指定0分区,和1分区,但是消费不到数据?如果我把这个时间kafkaConsumer.poll(100000)加长就可以消费到500条,最多就是500条,如果设置100的话,就会消费不到数据

2、贴上相关代码(请勿用图片代替代码)

public class MyConsumer extends Thread{

    int partition=0;

    public MyConsumer(int partition) {
        this.partition=partition;
    }

    @Override
    public void run() {
//        Logger log=LoggerFactory.getLogger(MyConsumer.class);
        long startTime=System.currentTimeMillis()/1000;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9093");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        TopicPartition topicPartition=new TopicPartition(KafkaProperties.topic, partition);
        kafkaConsumer.assign(Arrays.asList(topicPartition));
        kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
        kafkaConsumer.seek(topicPartition, 0);
//        kafkaConsumer.subscribe(Arrays.asList(KafkaProperties.topic));
        int total=0;
//        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100000);
            total+=records.count();
//            for (ConsumerRecord<String, String> record : records) {
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value(),record.topic());
//                log.info("offset = %d, value = %s"+record.offset()+","+record.value()+","+record.topic());
//                System.out.println();
//            }
            long endTime=System.currentTimeMillis()/1000;
            System.out.println("查询"+total+"条,时间:"+(endTime-startTime)+"秒");
//        }
    }

    public static void main(String[] args){
        MyConsumer thread1 = new MyConsumer(0);
        MyConsumer thread2 = new MyConsumer(1);
        thread1.start();
        System.out.println("测试");
        thread2.start();
        }
 }

查询500条,时间:1秒
查询500条,时间:1秒

刚开始只设置了一个消费者,每次都是只能消费500条,写上while(true)循环,会一直消费,是时间的问题吗

发表于 2019-01-24
添加评论

我存了4万多条在这个topic里

半兽人 -> Tpcy~ 5年前

https://www.orchome.com/451
有现成的官网例子哦。

BLue -> 半兽人 3年前

博主,我有两个消费群组,消费不同的主题,其中有一个消费群组获取不到消息。这是什么原因,应该怎么排查。

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
RYDW            0          1325133         1331302         6169            consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124   consumer-4
RYDW            1          1485265         1493372         8107            consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124   consumer-4
RYDW            2          1307520         1310157         2637            consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124   consumer-4
TEST            0          19060           131421          112361          -                                               -               -
TEST            1          19326           132127          112801          -                                               -               -
TEST            2          18007           128207          110200          -
半兽人 -> BLue 3年前

第二个你的消费者组没有注册上去,你得详细描述一下,另起一个问题吧。

为什么第一个线程和第二个线程消费的offset是一样的呢

你的答案

查看kafka相关的其他问题或提一个您自己的问题