kafka版本1.0.0 用老的api:ConsumerConnector操作和 命令行操作都能正常接收kafka服务器上的消息。
日志循环打印以下内容:
09:28:31.219 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Coordinator discovery failed, refreshing metadata
09:28:31.311 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test] Sending metadata request (type=MetadataRequest, topics=demo) to node [hostname和谐]:6667 (id: 1001 rack: /default-rack)
09:28:31.317 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 22 to Cluster(id = YuZpqgKWQV6atlnwp2aB4A, nodes = [[hostname和谐]:6667 (id: 1001 rack: /default-rack)], partitions = [Partition(topic = demo, partition = 0, leader = 1001, replicas = [1001], isr = [1001], offlineReplicas = [])])
09:28:31.317 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Sending GroupCoordinator request to broker [hostname和谐]:6667 (id: 1001 rack: /default-rack)
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Received GroupCoordinator response ClientResponse(receivedTimeMs=1528680511323, latencyMs=6, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=43), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Group coordinator lookup failed: The coordinator is not available.
09:28:31.323 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test] Coordinator discovery failed, refreshing metadata
poll(long)
是长轮询,向kafka拉取消息的。我了解poll的功能,但是日志很明确的提示“The coordinator is not available.”coordinator 不可用,不知道为什么,其他方式都可以
新api正常吗?
旧的正常,新的producer 也是正常的 就是consumer调用poll方法不正常,收不到数据。同时用新api的producer 加上旧api的ConsumerConnector也是可以正常运作的
https://www.orchome.com/454
用新消费者和生产者命令测试一下。还有消费者活跃列表,集群状态等
解决了,之前我用的ambari,有三个服务器,但是默认建了一个broker。刚才我增加到三个就ok了。非常感谢你的解答,困惑了一两天
你的答案