代码:
public static final String brokerList = "172.16.15.89:9092";
public static final String topic = "topic-demo2";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties ininConfig(){
Properties properties=new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CompnayDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
return properties;
}
public static void main(String[] args) {
Properties properties=ininConfig();
KafkaConsumer<string,string> consumer=new KafkaConsumer<string, string="">(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<string, string=""> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<string, string=""> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset());
System.out.println("消息:key = " + record.key()
+ ", value = " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
日志:
[2019-07-04 18:01:11,573] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 14 (__consumer_offsets-12) (reason: Adding new member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:01:11,573] INFO [GroupCoordinator 0]: Stabilized group group.demo generation 15 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:01:11,579] INFO [GroupCoordinator 0]: Assignment received from leader for group group.demo for generation 15 (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8 in group group.demo has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 15 (__consumer_offsets-12) (reason: removing member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Group group.demo with generation 16 is now empty (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:04:09,237] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 16 (__consumer_offsets-12) (reason: Adding new member consumer-1-6b5e9036-a820-4775-b891-4ca11e8a4ed3) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:04:09,238] INFO [GroupCoordinator 0]: Stabilized group group.demo generation 17 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2019-07-04 18:04:09,243] INFO [GroupCoordinator 0]: Assignment received from leader for group group.demo for generation 17 (kafka.coordinator.group.GroupCoordinator)
麻烦大佬给看看 为啥报错?消费者客户端读取不到消息.