kafka docker网关转发后的ip无法正常生产消费

L-Yming 发表于: 2021-09-23   最后更新时间: 2021-09-23 14:24:37   1,450 游览

1、已有一个

kafka broker 10.10.3.1:9092 
zk 10.10.3.1:2181

2、现需要新增一个端口给其他网络的设备使用

3、路由网关转发对应

10.10.3.1:9888 -> 192.168.1.110:12340

4、新增一个broken 采用docker启动方式

docker run -d --restart on-failure:3 -p 9888:9888 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=10.10.3.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka.com:12340 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9888 -e AUTO_CREATE_TOPICS=false -t wurstmeister/kafka

5、更改新增broken容器的hosts

10.10.3.1 kafka.com

6、更改客户端hosts

192.168.1.110 kafka.com

7、客户端python访问

from kafka import KafkaProducer
kafka_topic = "test"
kafka_bootstrap_servers = "kafka.com:12340"
kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
print("begin send")
print(kafka_producer._metadata)
kafka_producer.send(kafka_topic,value=b"aaaaaaaaaaaaaaaaaaa")

8、显示连通但无法正常获取broken相关信息

begin send
ClusterMetadata(brokers: 0, topics: 0, groups: 0)
Traceback (most recent call last):
  File "kafka_test.py", line 9, in <module>
    kafka_producer.send(kafka_topic,value=b"aaaaaaaaaaaaaaaaaaa")
  File "/usr/local/lib/python3.6/dist-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.6/dist-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

这样新增broken的方式是否不正确???

发表于 2021-09-23
添加评论

1、确保容器内可正常的生产和消费。
2、你这个配置挺麻烦的,容器内也要配置hosts(因为你用的域名),因为kafka客户端(你的python)获取的是KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9888这个,所以你要配置成KAFKA_LISTENERS=PLAINTEXT://kafka.com:12340
3、端口最好保持一致,不然要做2次路由,比较绕。

你读读这篇文章,核心一定要理解,因为你这个问题很绕,你必须理解不能访问的核心。
kafka外网转发

L-Yming -> 半兽人 2年前

启动新的broken容器内已经增加host 10.10.3.1 kafka.com
设置KAFKA_LISTENERS=PLAINTEXT://kafka.com:12340 启动kafka失败

org.apache.kafka.common.KafkaException: Socket server failed to bind to kafka.com:12340: Address not available.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:671)
        at kafka.network.Acceptor.<init>(SocketServer.scala:539)
        at kafka.network.SocketServer.createAcceptor(SocketServer.scala:280)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
        at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
        at kafka.network.SocketServer.startup(SocketServer.scala:125)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:303)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address not available
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:461)
        at sun.nio.ch.Net.bind(Net.java:453)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:667)

设置KAFKA_LISTENERS=PLAINTEXT://kafka.com:9888 同样失败

半兽人 -> L-Yming 2年前

地址不认,容器的地址。

你的答案

查看kafka相关的其他问题或提一个您自己的问题