尝试使用教程中的代码去访问kafka,进行信息发布却没有成功。
1、部署kafka环境:
# kafka是使用docker-compose部署的
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
2、使用kafka的生产者和消费者能正常生产和消费(这里用的是apache最新版的kafka3.1.0的tar包中bin的sh文件):
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
3、使用java去生产消息的时候,kafka-console-consumer.sh
这个消费者却接收不到(maven使用的是kafka-client:3.1.0):
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的
props.put("bootstrap.servers", "192.168.38.134:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
System.out.println(i);
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
求大神赐教
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
你配置的是
127.0.0.1
,所以只有localhost
和127.0.0.1
才可以访问。理论上,这个地址要变成你的容器ip地址,就可以了,以下有几种方式,你可以尝试一下以下方法:
方式一:
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
方式二:
删除这个配置
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
方式三:
environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
哇!!!感谢大神!!!膜拜
大神,我试了,不行😂
你补充下报错信息。
大神,我的image是bitnami的,是bitnami优化过的,有一些配置可能不一样。但是能测试通过说明是能用的。
这里是我的测试结果:
方法一:
# 这个命令行报错 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 报错内容: [2022-04-28 00:14:52,525] WARN [Consumer clientId=console-consumer, groupId=console-consumer-11293] Error connecting to node b3f45b5d91ff:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: b3f45b5d91ff: 未知的名称或服务 at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1519) at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367) at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301) at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:984) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1157) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1045) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:456) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:101) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
方法二:错误同上
方法三:
因为是bitnami的版本,所以没有这个配置。但是我在github上找到了一模一样的配置。
# docker-compose.ymlversion: "3" services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' - '9093:9093' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper
使用同一指令,错误同上。
bitnami版使用9093端口可以使用命令行进行生产和接收:
./kafka-console-producer.sh --broker-list localhost:9093 --topic test ./kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning
bitnami版使用java进行消息生产:
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); // 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的 props.put("bootstrap.servers", "192.168.38.134:9093"); props.put("acks", "all"); //重试次数 props.put("retries", 1); //批次大小 props.put("batch.size", 16384); //等待时间 props.put("linger.ms", 1); //RecordAccumulator 缓冲区大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { System.out.println(i); producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))).get(); } producer.close(); } }
无限卡死在0号信息
192.168.38.134是虚拟机的ip?你应该访问的是docker容器的ip吧,如:
# 查看容器ip docker inspect --format='{{.NetworkSettings.IPAddress}}' mycentos3
问题的核心,就是要想办法将容器ip配置到
KAFKA_CFG_ADVERTISED_LISTENERS
上,如:- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://容器ip:9092
方法一和二就是因为无法解析hostname报的错,是因为kafka需要通过hostname获取到容器内部ip。
方法三的问题是你配置了一个
CLIENT://kafka:9092
,其中kafka是域名,你解析不开,自然会报错的。就是说“CLIENT://kafka:9092”要设置为“CLIENT://0.0.0.0:9092”?
我的docker是在虚拟机里面安装的,就是docker的地址还在虚拟机的下一层~
但我在虚拟机里面用kafka的sh文件能直接访问docker的kafka,在主机用java却访问不了,ufw已经打开端口了
你看看这篇文章,有介绍为什么会导致客户端超时的原因:kafka外网转发
震惊!原来是如此原理,即刻尝试
等你的结果。
你的答案