kafka 请教关于Group coordinator lookup for group test1 failed: The coordinator is not available.的问题

桑代克 发表于: 2018-05-07   最后更新时间: 2018-05-15 07:44:49   10,208 游览

今天在尝试自己写一个消费者时,遇到了如下问题:
消费者代码基本和官网一样如下:

Properties props = new Properties();
        props.put("bootstrap.servers", "host-129-152:9092");
        props.put("group.id", "test1");
        props.put("client.id", "test1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        try{
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic11"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }

运行过程中无法消费消息,并报如下错误:

[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Received GroupCoordinator response ClientResponse(receivedTimeMs=1525675045460, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=12,client_id=test1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group test1
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Group coordinator lookup for group test1 failed: The coordinator is not available.
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Coordinator discovery failed for group test1, refreshing metadata

请问可能是什么原因造成这个错误的呢?我在客户端命令可以正常消费信息,所用kakfa版本是0.11

发表于 2018-05-07
添加评论

1、去掉 props.put("client.id", "test1");
2、telnet host-129-152 9092 看看通不通

桑代克 -> 半兽人 6年前

可以通的,但还是不行,一样报错。不知道会不会是跟我使用了CDH有关?但是我程序写的生产者是可以正常跑通生产消息的

半兽人 -> 桑代克 6年前

换个消费者名字先。

桑代克 -> 半兽人 6年前

换了呀,还是不可以

半兽人 -> 桑代克 6年前

你把host-129-159换成ip试试。
程序和集群是同一台机器吗?

桑代克 -> 半兽人 6年前

是同一台机器,换了ip也不行

半兽人 -> 桑代克 6年前

还真是神奇了,看看集群对应的主题状态。

bin/kafka-topics.sh --zookeeper localhost:2181 --describe
桑代克 -> 半兽人 6年前

显示所有topic都是正常的,我把这个程序放到仅装了kafka的机器上是可以成功拉取消息的,感觉可能是和CDH有关,CDH最新只支持到kafka0.11,这方面它的包好像还有问题。我现在打算把CDH上的kakfa回退一个版本到0.10试试看,您感觉这样可以不?还有一个问题想请教一下大神,在很多demo中都会用到这个maven依赖:

<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>

但是官网上并没有写它,我想请问一下它是提供了哪些功能?

半兽人 -> 桑代克 6年前

这个是引用的所有模块,平常我们都是引单个模块,如:client,streams。

桑代克 -> 半兽人 6年前

谢谢谢谢。上面的第一个问题通过降低CDH中的kafka版本为0.10解决了

-> 桑代克 6年前

我也遇到了一模一样的问题,我是用的是CDH5.15,kafka装的3.1.0,同样也是可以生产,但是不能消费。切到别的机器上的原生kafka就正常了。
问下这个问题的产生原因你有继续探究吗,除了降低版本?

桑代克 -> 6年前

我最终还是降低了版本,好像是CDH还是没有支持到0.10之后的版本

-> 6年前

resolved

-> 桑代克 6年前

我这边没有降低版本,最后发现是用于偏移量的__consumer_offsets这个topic不知道为什么没了。我手动创建后,重启可以用了。供你参考

桑代克 -> 6年前

谢谢谢谢~

这个错误是由于 topic consumer_offsets 不正常造成的,可以查kafka/data/下有有没有consumer_offsets的数据文件(所有broker),及 __consumer_offsets 的znode 是否正常 ,可以将这两者都清掉,然后直接消费即可,前两都会自动重建 

你的答案

查看kafka相关的其他问题或提一个您自己的问题