KafkaConsumer 调用poll,进入死循环

沉默行僧 发表于: 2018-06-11   最后更新时间: 2018-06-11 09:56:56   11,015 游览

kafka版本1.0.0 用老的api:ConsumerConnector操作和 命令行操作都能正常接收kafka服务器上的消息。

日志循环打印以下内容:

09:28:31.219 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Coordinator discovery failed, refreshing metadata
09:28:31.311 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test] Sending metadata request (type=MetadataRequest, topics=demo) to node [hostname和谐]:6667 (id: 1001 rack: /default-rack)
09:28:31.317 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 22 to Cluster(id = YuZpqgKWQV6atlnwp2aB4A, nodes = [[hostname和谐]:6667 (id: 1001 rack: /default-rack)], partitions = [Partition(topic = demo, partition = 0, leader = 1001, replicas = [1001], isr = [1001], offlineReplicas = [])])
09:28:31.317 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Sending GroupCoordinator request to broker [hostname和谐]:6667 (id: 1001 rack: /default-rack)
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Received GroupCoordinator response ClientResponse(receivedTimeMs=1528680511323, latencyMs=6, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=43), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Group coordinator lookup failed: The coordinator is not available.
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Coordinator discovery failed, refreshing metadata
发表于 2018-06-11

poll(long)是长轮询,向kafka拉取消息的。

沉默行僧 -> 半兽人 6年前

我了解poll的功能,但是日志很明确的提示“The coordinator is not available.”coordinator 不可用,不知道为什么,其他方式都可以

半兽人 -> 沉默行僧 6年前

新api正常吗?

沉默行僧 -> 半兽人 6年前

旧的正常,新的producer 也是正常的 就是consumer调用poll方法不正常,收不到数据。同时用新api的producer 加上旧api的ConsumerConnector也是可以正常运作的

半兽人 -> 沉默行僧 6年前

https://www.orchome.com/454
用新消费者和生产者命令测试一下。还有消费者活跃列表,集群状态等

沉默行僧 -> 半兽人 6年前

解决了,之前我用的ambari,有三个服务器,但是默认建了一个broker。刚才我增加到三个就ok了。非常感谢你的解答,困惑了一两天

你的答案

查看kafka相关的其他问题或提一个您自己的问题