今天在尝试自己写一个消费者时,遇到了如下问题:
消费者代码基本和官网一样如下:
Properties props = new Properties();
props.put("bootstrap.servers", "host-129-152:9092");
props.put("group.id", "test1");
props.put("client.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try{
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic11"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}catch(Exception e){
logger.error(e.getMessage());
}
运行过程中无法消费消息,并报如下错误:
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Received GroupCoordinator response ClientResponse(receivedTimeMs=1525675045460, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=12,client_id=test1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group test1
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Group coordinator lookup for group test1 failed: The coordinator is not available.
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Coordinator discovery failed for group test1, refreshing metadata
请问可能是什么原因造成这个错误的呢?我在客户端命令可以正常消费信息,所用kakfa版本是0.11
1、去掉 props.put("client.id", "test1");
2、telnet host-129-152 9092 看看通不通
可以通的,但还是不行,一样报错。不知道会不会是跟我使用了CDH有关?但是我程序写的生产者是可以正常跑通生产消息的
换个消费者名字先。
换了呀,还是不可以
你把host-129-159换成ip试试。
程序和集群是同一台机器吗?
是同一台机器,换了ip也不行
还真是神奇了,看看集群对应的主题状态。
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
显示所有topic都是正常的,我把这个程序放到仅装了kafka的机器上是可以成功拉取消息的,感觉可能是和CDH有关,CDH最新只支持到kafka0.11,这方面它的包好像还有问题。我现在打算把CDH上的kakfa回退一个版本到0.10试试看,您感觉这样可以不?还有一个问题想请教一下大神,在很多demo中都会用到这个maven依赖:
<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version>
但是官网上并没有写它,我想请问一下它是提供了哪些功能?
这个是引用的所有模块,平常我们都是引单个模块,如:client,streams。
谢谢谢谢。上面的第一个问题通过降低CDH中的kafka版本为0.10解决了
我也遇到了一模一样的问题,我是用的是CDH5.15,kafka装的3.1.0,同样也是可以生产,但是不能消费。切到别的机器上的原生kafka就正常了。
问下这个问题的产生原因你有继续探究吗,除了降低版本?
我最终还是降低了版本,好像是CDH还是没有支持到0.10之后的版本
resolved
我这边没有降低版本,最后发现是用于偏移量的__consumer_offsets这个topic不知道为什么没了。我手动创建后,重启可以用了。供你参考
谢谢谢谢~
这个错误是由于 topic consumer_offsets 不正常造成的,可以查kafka/data/下有有没有consumer_offsets的数据文件(所有broker),及 __consumer_offsets 的znode 是否正常 ,可以将这两者都清掉,然后直接消费即可,前两都会自动重建
你的答案