我调用API的producer写发送消息,但是kafka控制台提示创建topic成功,却没有消息?

nirvana 发表于: 2018-05-13   最后更新时间: 2018-05-13 21:37:46   6,602 游览
public void sendMessage(){

  Properties props = new Properties();
  props.put("bootstrap.servers", "192.168.31.96:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer<String,String> producer = new KafkaProducer<>(props);

  for (int i = 0; i <10 ; i++) {
  String mesg = Integer.toString(i);
  ProducerRecord<String, String> records = new      ProducerRecord<String,String>("test4",mesg);
  producer.send(records);
  }
  producer.close();
}
发表于 2018-05-13
添加评论

这一行改成producer.send(records).get();试试。

nirvana -> 半兽人 6年前

好的,谢谢,我试一下

nirvana -> 半兽人 6年前

加上.get()方法会报异常

nirvana -> 半兽人 6年前
ProducerRecord<String, String> records = new ProducerRecord<String,String>("test4", "+++++++++++" + mesg);
    Future<RecordMetadata> future = producer.send(records);
    System.out.println(future);

我这样写,在kafka服务可以查询到创建 topic =test4

启动

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test4

执行java代码,发送好多次,也看不到kafak控制台输出我发送的消息

半兽人 -> nirvana 6年前

是的,加上.get,就是阻塞式发送,可以看到为什么发送失败。异常就是原因呢。

nirvana -> 半兽人 6年前

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test4-0: 30005 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.example.producers.KafkaProducerDemo.sendMessage(KafkaProducerDemo.java:31)
at com.example.producers.ProducersApplication.main(ProducersApplication.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test4-0: 30005 ms has passed since batch creation plus linger time

这是异常,我不太能看懂..............

半兽人 -> nirvana 6年前

这个是超时异常,是网络问题。
你telnet 192.168.31.96 9092 看看通不通。
如果不通,则在服务器server.properteis增加linstener

nirvana -> 半兽人 6年前

可以通的,我在zookeeper 还可以查到创建的broker节点

半兽人 -> nirvana 6年前

你把kafka配置内容贴一下。主要是listeners和advertised.listeners。

nirvana -> 半兽人 6年前
####################### Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

我单机版的,没有集群,这里都没配置

半兽人 -> nirvana 6年前
listeners=PLAINTEXT://192.168.31.96:9092

放开这个,重启kafka,在试试。

nirvana -> 半兽人 6年前

嘿嘿,这次没异常了,这个lisreners的作用是什么呢?
kafka的控制台没看到发送的信息,我应该怎么才能知道发送成功了。

nirvana -> 半兽人 6年前

我启动consumer可以接收到消息了,太感谢你了,谢谢!

半兽人 -> nirvana 6年前

要么在开个消费者,要么用命令直接进行消费。
命令大全:https://www.orchome.com/454

半兽人 -> nirvana 6年前

不要用localhost,全部用ip。

半兽人 -> nirvana 6年前

listeners监听对应的ip和端口。否则只能用localhost。

nirvana -> 半兽人 6年前

好的,万分感谢。我先看你的教程,有难题了再继续请教,嘿嘿。

你的答案

查看kafka相关的其他问题或提一个您自己的问题