虚拟机CentOS 7下的docker(CentOS 7)内的kafka消费者无法消费到来自windows下Java客户端(IDE)生产者生产的消息

ぶぽめつせ 发表于: 2020-11-17   最后更新时间: 2020-11-17 22:26:29   2,454 游览

环境

位置关系如下:

                        |----------------------------------------|
                        |                                        |
                        |   |--------------------------------|   |
                        |   |                                |   |
                        |   |    docker(CentOS) 172.18.0.2   |   |
                        |   |--------------------------------|   |
                        |                                        |
                        | Virtual Machine(CentOS) 192.168.27.143 |
                        |----------------------------------------|

                                 Windows 192.168.137.123
  1. 我在虚拟机里创建了3个docker容器并分别部署了kafka和ZooKeeper(172.18.0.2,172.18.0.3,172.18.0.4 在同一个network)

  2. windows下hosts文件已做IP映射(172.18.0.2 master,172.18.0.3 slave1,172.18.0.4 slave2)

  3. 两两可以互相ping通

  4. 虚拟机2181,9092端口已开启

  5. docker 172.18.0.2 的2181,9092端口已映射

  6. 虚拟机防火墙已关,docker没有安装防火墙

出现问题

我在windows下Java客户端用代码模拟生产者向kafka发消息,kafka里面的消费者没有收到消息。

尝试一

我按照网上说的将advertised.listeners设为PLAINTEXT://<容器的hostname或IP>:9092,依旧不行

尝试二

listeners=INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://master:29094,EXTERNAL://192.168.27.143:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL

命令行

bin/kafka-server-start.sh -daemon /opt/kafka-2.6.0/config/server.properties

bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic one-kafka

bin/kafka-console-producer.sh --broker-list master:9092 --topic one-kafka

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic one-kafka

命令行生产者消费者没问题,Java客户端生产的消息依然不能被kafka消费

生产者代码

package com.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.27.143:9092");// 这里写master也不行
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 10; i++) {
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>("one-kafka", "666"));
            try {
                RecordMetadata metadata = future.get();
                System.out.println(metadata.topic());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }

}

客户端报错

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1314)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:970)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:758)
    at com.kafka.MyProducer.main(MyProducer.java:22)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.
发表于 2020-11-17
添加评论

把listeners的0.0.0.0改成${HOSTNAME##*-}试试。我记得kafka 0.0.0.0 这种一直不可用。
参考来自:https://www.kubebiz.com/KubeBiz/kafka

半兽人 -> 半兽人 4年前

${HOSTNAME##*-}这个是获取当前容器的ip,如果这个转化正确,你的问题应该就解决了。

是改成本容器的主机名吗?这样还是不行

错了错了,我复制错了,这个,获取当前容器的ip。

listeners=PLAINTEXT://$(hostname -i):9093

advertised.listeners已经弃用了,直接配置listeners就可以了。

我把原来的覆盖了,然后用这个,kafka启动不了了

你把java代码里的,ip地址192.168.27.143:9092,也全部改成master:9092,用host映射就可以了。
参考:https://www.orchome.com/1903
这里有写原因。

配置文件我写成了listeners=PLAINTEXT://master:9092
listeners=PLAINTEXT://$(hostname -i):9092这个不行,解析错误,Error creating broker listeners from 'PLAINTEXT://$(hostname -i):909
2': Unable to parse PLAINTEXT://$(hostname -i):9092 to a broker endpoint
我代码也写成了"172.18.0.2:9092"
客户端还是连不上。

只改java代码里的改成host映射,不改listeners了。 我看你命令用的就是master成功了 外部java也全用host,原因看上面的链接

配置文件我写成了listeners=PLAINTEXT://master:9092,代码也是写host,还是不行,我枯了。。

命令行可以?java客户端不行?

两个都不行了。。

check下host配置,容器内的和外部的host映射都不一样。
容器内的 master slave -> 172.18.0.x
外面的 master slave -> 192.168.27.143

3个容器的ip(windows已做映射):
master 172.18.0.2
slave1 172.18.0.3
slave2 172.18.0.4
3个容器放在虚拟机内
虚拟机 192.168.27.143
那应该怎么配置host🤔

你的java在哪里运行的

虚拟机外面的windows下的IDE

你在windows上,telnet 192.168.27.143 2181 或 9092 能通吗

我telnet不了,一进去就卡住了,不过我换了个工具,测得端口是开的

C:\Users\HP&gt;tcping 192.168.27.143 2181
Probing 192.168.27.143:2181/tcp - Port is open - time=14.415ms
Probing 192.168.27.143:2181/tcp - Port is open - time=77.710ms
Probing 192.168.27.143:2181/tcp - Port is open - time=0.690ms
Probing 192.168.27.143:2181/tcp - Port is open - time=0.541ms
Ping statistics for 192.168.27.143:2181
     4 probes sent.
     4 successful, 0 failed.  (0.00% fail)
Approximate trip times in milli-seconds:
     Minimum = 0.541ms, Maximum = 77.710ms, Average = 23.339ms
C:\Users\HP&gt;tcping 192.168.27.143 9092
Probing 192.168.27.143:9092/tcp - Port is open - time=2.055ms
Probing 192.168.27.143:9092/tcp - Port is open - time=0.686ms
Probing 192.168.27.143:9092/tcp - Port is open - time=3.479ms
Probing 192.168.27.143:9092/tcp - Port is open - time=0.502ms
Ping statistics for 192.168.27.143:9092
     4 probes sent.
     4 successful, 0 failed.  (0.00% fail)
Approximate trip times in milli-seconds:
     Minimum = 0.502ms, Maximum = 3.479ms, Average = 1.681ms
C:\Users\HP&gt;tcping 192.168.27.143 9093
Probing 192.168.27.143:9093/tcp - No response - time=2001.959ms
Probing 192.168.27.143:9093/tcp - No response - time=2000.190ms
Probing 192.168.27.143:9093/tcp - No response - time=2000.843ms
Probing 192.168.27.143:9093/tcp - No response - time=2000.144ms
Ping statistics for 192.168.27.143:9093
     4 probes sent.
     0 successful, 4 failed.  (100.00% fail)
Was unable to connect, cannot provide trip statistics.
C:\Users\HP&gt;tcping 192.168.27.143 22
Probing 192.168.27.143:22/tcp - Port is open - time=2.345ms
Probing 192.168.27.143:22/tcp - Port is open - time=1.384ms
Probing 192.168.27.143:22/tcp - Port is open - time=0.525ms
Probing 192.168.27.143:22/tcp - Port is open - time=1.177ms
Ping statistics for 192.168.27.143:22
     4 probes sent.
     4 successful, 0 failed.  (0.00% fail)
Approximate trip times in milli-seconds:
     Minimum = 0.525ms, Maximum = 2.345ms, Average = 1.358ms

看着都是通的,
windows上配置host为 master slave -> 192.168.27.143
(ps: 9093你如果不是这个端口,不要配错了,我看你的是9092)

windows hosts做了映射master -> 192.168.27.143
windows下IDE代码 master:9092
docker下kafka配置文件 listeners=PLAINTEXT://master:9092
命令行可以,客户端还是不行

我认为是你本机与虚拟机网络有问题,你把java程序放到vm里跑,肯定没问题。

那如果解决呢?在虚拟机跑有点麻烦😂😂😂

这个操作比较麻烦,
理论上的话,简单一点,是vm和docker之间需要路由。
你请求vm地址,到了vm,9092转发给谁,你要路由的呀。

这是不是有了呀?br-1d17b5a61c8d是我自定义的docker网络

[root@docker ~]# route -n
Kernel IP routing table
Destination     Gateway         Genmask         Flags Metric Ref    Use Iface
0.0.0.0         192.168.27.2    0.0.0.0         UG    100    0        0 ens33
172.17.0.0      0.0.0.0         255.255.0.0     U     0      0        0 docker0
172.18.0.0      0.0.0.0         255.255.0.0     U     0      0        0 br-1d17b5a61c8d
192.168.27.0    0.0.0.0         255.255.255.0   U     100    0        0 ens33
root@docker ~]# route -n
Kernel IP routing table
Destination   |  Gateway    |     Genmask     |    Flags | Metric | Ref  |  Use | Iface
0.0.0.0      |   192.168.27.2 |   0.0.0.0    |     UG    |100  |  0    |    0 |  ens33
172.17.0.0   |   0.0.0.0      |   255.255.0.0  |   U  |   0   |   0    |    0 | docker0
172.18.0.0   |   0.0.0.0     |    255.255.0.0  |   U  |   0  |    0   |     0  | br-1d17b5a61c8d
192.168.27.0 |   0.0.0.0    |     255.255.255.0 |  U |    100  |  0    |    0 | ens33

你还记得你之前telnet测试的时候,就卡住了,就说明路由没配置好。

我虚拟机是使用NAT模式的,是不是要改一下呢?

我分别在虚拟机和master容器用nc -lk 9092接收数据,用java发消息,都可以收到,路由应该没问题?

    try {
        socket = new Socket("master", 9092);
        String message = "\n你叫搜集的备份节点开发板的百分比时刻的部分看都不看分不开的";
        OutputStream os = socket.getOutputStream();
        os.write(message.getBytes("GBK"));
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
你的答案

查看kafka相关的其他问题或提一个您自己的问题