提问说明
我在windows运行下面的producer,在linux虚拟机上部署了kafka,但是在linux用命令行没法消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.93.129:9092 --topic my-tc4 --from-beginning
producer代码:
package com.cetccq.platform.logserver;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class XTKafkaProducer {
private static Properties kafkaProps;
private static void initKafka() {
kafkaProps = new Properties();
// broker url
kafkaProps.put("bootstrap.servers", "192.168.93.129:9092");
// request need to validate
kafkaProps.put("acks", "all");
// request failed to try
kafkaProps.put("retries", 0);
// memory cache size
kafkaProps.put("batch.size", 16384);
//
kafkaProps.put("linger.ms", 1);
kafkaProps.put("buffer.memory", 33554432);
// define the way of key and value serializer
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static void main(String[] args) {
initKafka();
Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String,String>("my-tc4", Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully!");
producer.close();
}
}
servers.properties:
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.93.129:9092
port = 9092
num.network.threads=3
num.io.threads=8
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
我的操作步骤是这样的:
- 启动zookeeper
- 启动kafka broker
- 命令行创建topic:my-tc4
- 运行producer程序
- 命令行启动consumer
麻烦您看看哪里有问题?感激不尽!
producer.send(new ProducerRecord("my-tc4", Integer.toString(i), Integer.toString(i)));("my-tc4", Integer.toString(i), Integer.toString(i))).get();
这个在后面加个.get(),变成同步,先观察消息是否发送成功,是否有报错,如:
producer.send(new ProducerRecord
报错了:(KafkaProducer.java:1255)
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic my-tc4 not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:917)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:840)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:727)
at com.cetccq.platform.logserver.XTKafkaProducer.main(XTKafkaProducer.java:36)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic my-tc4 not present in metadata after 60000 ms.
不太明白metadata是个什么意思T T
网络问题,telnet一下9092,你server.properties配置没问题的。
看看是不是被墙了
确实是网络问题,telnet不通。那现在我是需要在linux上开启telnet服务吗?
终于弄好,大神太厉害了!太谢谢了!
linux关闭防火墙,或者允许9092端口访问策略。
结帖吧。
你的答案