List<String> buffer = new ArrayList<>();
Properties consumerProp = getConsumerProp();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProp);
try{
consumer.subscribe(topicList);//topicList这个是传输过来的
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if(records.isEmpty()){
break;
}
for (ConsumerRecord<String, String> record : records){
buffer.add(record.value());
}
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
}finally {
try{
consumer.commitSync();
}finally {
consumer.close();
}
}
return buffer; //返回出去的数据
昵称
0 声望
这家伙太懒,什么都没留下
你这个程序问题一大堆。。你仔细读一读消费者类那篇文章。
https://www.orchome.com/451
你的答案