问题如标题
代码如下:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer信息:" + "value:" + record.value() + "-" + "partition:" + record.partition() + "-" + "offset:" + record.offset());
}
}
consumer已启动,并且也已经接收到消息了
可以看见broker节点的信息了,但是没有group信息的
https://www.orchome.com/454
kafka0.8之前默认是存在zk里的,之后存在kafka topic中的,可通过上面命令获取。
你的答案