flink1.10.1从kafka拉取数据计算,能正常发送和消费数据,但是找不到消费组member-risk-flink
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaServers);
properties.setProperty("group.id", "member-risk-flink");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("max.poll.records", kafkaOffset);
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer011(calculateInputTopic, new SimpleStringSchema(), properties));
MacBook-Pro-3:kafka_2.12-0.11.0.1 zc$ bin/kafka-consumer-groups.sh --bootstrap-server 10.9.253.80:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
MacBook-Pro-3:kafka_2.12-0.11.0.1 zc$ bin/kafka-consumer-groups.sh --bootstrap-server 10.9.253.80:9092 --group member-risk-flink --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'member-risk-flink' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
member-risk-abstraction-calculate-input 3 5 5 0 - - -
member-risk-abstraction-calculate-input 0 4 4 0 - - -
member-risk-abstraction-calculate-input 2 4 4 0 - - -
member-risk-abstraction-calculate-input 1 2 2 0 - - -
MacBook-Pro-3:kafka_2.12-0.11.0.1 zc$ bin/kafka-consumer-groups.sh --bootstrap-server 10.9.253.80:9092 --delete --group member-risk-flink
Option '[delete]' is only valid with '[zookeeper]'. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.
MacBook-Pro-3:kafka_2.12-0.11.0.1 zc$ bin/kafka-consumer-groups.sh --zookeeper 10.9.253.80:2181 --delete --group member-risk-flink
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
Error: Delete for group 'member-risk-flink' failed because group does not exist.
请问楼主解决这个问题了吗,我也遇到了和楼主一样的情况,我用sparkstreaming 和flink消费kafka的数据,但是也查询不到消费者组信息(zk和bootstrap-server两种方式都没找到),kafka版本是1.1.0
你的消费者不要停,然后在查询看看。
没有停
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
试试这个。
这个问题我也遇到了,查到原因是Flink作业消费kafka时,GroupOverview中groupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据,导致list不到,describe的时候我感觉也是有问题的,显示不出来owner,提示没消费者
你的答案