1、已有一个
kafka broker 10.10.3.1:9092
zk 10.10.3.1:2181
2、现需要新增一个端口给其他网络的设备使用
3、路由网关转发对应
10.10.3.1:9888 -> 192.168.1.110:12340
4、新增一个broken 采用docker启动方式
docker run -d --restart on-failure:3 -p 9888:9888 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=10.10.3.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka.com:12340 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9888 -e AUTO_CREATE_TOPICS=false -t wurstmeister/kafka
5、更改新增broken容器的hosts
10.10.3.1 kafka.com
6、更改客户端hosts
192.168.1.110 kafka.com
7、客户端python访问
from kafka import KafkaProducer
kafka_topic = "test"
kafka_bootstrap_servers = "kafka.com:12340"
kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
print("begin send")
print(kafka_producer._metadata)
kafka_producer.send(kafka_topic,value=b"aaaaaaaaaaaaaaaaaaa")
8、显示连通但无法正常获取broken相关信息
begin send
ClusterMetadata(brokers: 0, topics: 0, groups: 0)
Traceback (most recent call last):
File "kafka_test.py", line 9, in <module>
kafka_producer.send(kafka_topic,value=b"aaaaaaaaaaaaaaaaaaa")
File "/usr/local/lib/python3.6/dist-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.6/dist-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
这样新增broken的方式是否不正确???
1、确保容器内可正常的生产和消费。
2、你这个配置挺麻烦的,容器内也要配置
hosts(因为你用的域名)
,因为kafka客户端(你的python)获取的是KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9888
这个,所以你要配置成KAFKA_LISTENERS=PLAINTEXT://kafka.com:12340
。3、端口最好保持一致,不然要做2次路由,比较绕。
你读读这篇文章,核心一定要理解,因为你这个问题很绕,你必须理解不能访问的核心。
kafka外网转发
启动新的broken容器内已经增加host 10.10.3.1 kafka.com
设置KAFKA_LISTENERS=PLAINTEXT://kafka.com:12340 启动kafka失败
org.apache.kafka.common.KafkaException: Socket server failed to bind to kafka.com:12340: Address not available. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:671) at kafka.network.Acceptor.<init>(SocketServer.scala:539) at kafka.network.SocketServer.createAcceptor(SocketServer.scala:280) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) Caused by: java.net.BindException: Address not available at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:461) at sun.nio.ch.Net.bind(Net.java:453) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78) at kafka.network.Acceptor.openServerSocket(SocketServer.scala:667)
设置KAFKA_LISTENERS=PLAINTEXT://kafka.com:9888 同样失败
地址不认,容器的地址。
你的答案