kafka集群搭建好了。
./bin/kafka-console-producer.sh --broker-list 192.168.42.xx:9095 --topic topic12
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.42.xx:9095 --from-beginning --topic topic12
用命令的客户端测试。都能收到消息。但是用项目程序连接集群。消费数据,一直卡着不动。
版本是kafka_2.12-0.11.0.1
上面的日志是server.log的。
项目DUBG日志为:
2017-12-21 17:41:39-[DEBUG]-(org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler:615) Group coordinator lookup for group DemoConsumer failed: The coordinator is not available.
2017-12-21 17:41:39-[DEBUG]-(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:225) Coordinator discovery failed for group DemoConsumer, refreshing metadata
telnet一下,地址通吗?
telnet是通的,项目的生产者可以生产消息,用kafka消费端命令工具。可以消费数据。但就是项目的消费者,不能消费数据
卡着不动是正常的,判断下主题是否有消息。
项目生产者已经生产消息到队列中。消息者不能进行消费。队列有消息的。下面是翻阅源码,找到debug报错的位置
AbstractCoordinator 类的内部类
private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received GroupCoordinator response {} for group {}", resp, groupId);
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
Errors error = findCoordinatorResponse.error();
clearFindCoordinatorFuture();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.coordinator = new Node(
Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
findCoordinatorResponse.node().host(),
findCoordinatorResponse.node().port());
log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
client.tryConnect(coordinator);
heartbeat.resetTimeouts(time.milliseconds());
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
//这个地方
log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
future.raise(error);
}
}
集群不正常吧,这个错写的是leader不可用。
集群不正常,具体是什么?项目可以写消息,但不能进行消费
https://www.orchome.com/454
--describe
多谢,上面的问题,重新安装了kafka和zookeeper得到解决。只有感觉奇怪。项目能进行生产消息,但不能消费消息。具体的原因是什么到目前都还不知道。
客气,我刚才我是让另外一位的重装,没想到你先重装了。
你这个错误明显是集群不正常。
你的答案