提问说明
1、在server.properties中num.partitions=2,配置了两个分区,我存入数据的时候,kafka应该会把数据分别存入到这两个分区里,然后我建了两个线程对应两个消费者,分别指定0分区,和1分区,但是消费不到数据?如果我把这个时间kafkaConsumer.poll(100000)加长就可以消费到500条,最多就是500条,如果设置100的话,就会消费不到数据
2、贴上相关代码(请勿用图片代替代码)
public class MyConsumer extends Thread{
int partition=0;
public MyConsumer(int partition) {
this.partition=partition;
}
@Override
public void run() {
// Logger log=LoggerFactory.getLogger(MyConsumer.class);
long startTime=System.currentTimeMillis()/1000;
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9093");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
TopicPartition topicPartition=new TopicPartition(KafkaProperties.topic, partition);
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 0);
// kafkaConsumer.subscribe(Arrays.asList(KafkaProperties.topic));
int total=0;
// while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100000);
total+=records.count();
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("offset = %d, value = %s", record.offset(), record.value(),record.topic());
// log.info("offset = %d, value = %s"+record.offset()+","+record.value()+","+record.topic());
// System.out.println();
// }
long endTime=System.currentTimeMillis()/1000;
System.out.println("查询"+total+"条,时间:"+(endTime-startTime)+"秒");
// }
}
public static void main(String[] args){
MyConsumer thread1 = new MyConsumer(0);
MyConsumer thread2 = new MyConsumer(1);
thread1.start();
System.out.println("测试");
thread2.start();
}
}
查询500条,时间:1秒
查询500条,时间:1秒
刚开始只设置了一个消费者,每次都是只能消费500条,写上while(true)循环,会一直消费,是时间的问题吗
我存了4万多条在这个topic里
https://www.orchome.com/451
有现成的官网例子哦。
博主,我有两个消费群组,消费不同的主题,其中有一个消费群组获取不到消息。这是什么原因,应该怎么排查。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID RYDW 0 1325133 1331302 6169 consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124 consumer-4 RYDW 1 1485265 1493372 8107 consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124 consumer-4 RYDW 2 1307520 1310157 2637 consumer-4-fe32519c-dfc7-42db-8fc1-69a0d7b1ad1d /172.16.9.124 consumer-4 TEST 0 19060 131421 112361 - - - TEST 1 19326 132127 112801 - - - TEST 2 18007 128207 110200 -
第二个你的消费者组没有注册上去,你得详细描述一下,另起一个问题吧。
为什么第一个线程和第二个线程消费的offset是一样的呢
你的答案