我调用API的producer写发送消息,但是kafka控制台提示创建topic成功,却没有消息?

nirvana 发表于: 2018-05-13   最后更新时间: 2018-05-13  
  •   0 订阅,651 游览
public void sendMessage(){

  Properties props = new Properties();
  props.put("bootstrap.servers", "192.168.31.96:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer<String,String> producer = new KafkaProducer<>(props);

  for (int i = 0; i <10 ; i++) {
  String mesg = Integer.toString(i);
  ProducerRecord<String, String> records = new      ProducerRecord<String,String>("test4",mesg);
  producer.send(records);
  }
  producer.close();
}






发表于: 3月前   最后更新时间: 3月前   游览量:651
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 这一行改成  producer.send(records).get();试试。
    • ProducerRecord<String, String> records = new ProducerRecord<String,String>("test4", "+++++++++++" + mesg);

                  Future<RecordMetadata> future = producer.send(records);
                  System.out.println(future);
      我这样写,在kafka服务可以查询到创建 topic =test4
      启动 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test4
      执行java代码,发送好多次,也看不到kafak控制台输出我发送的消息
        • java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test4-0: 30005 ms has passed since batch creation plus linger time
           at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
           at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
           at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
           at com.example.producers.KafkaProducerDemo.sendMessage(KafkaProducerDemo.java:31)
           at com.example.producers.ProducersApplication.main(ProducersApplication.java:13)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:497)
           at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
          Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test4-0: 30005 ms has passed since batch creation plus linger time

          这是异常,我不太能看懂..............
            • 这个是超时异常,是网络问题。
              你telnet 192.168.31.96 9092 看看通不通。
              如果不通,则在服务器server.properteis增加linstener。
                • ############################# Socket Server Settings #############################

                  # The address the socket server listens on. It will get the value returned from
                  # java.net.InetAddress.getCanonicalHostName() if not configured.
                  #   FORMAT:
                  #     listeners = listener_name://host_name:port
                  #   EXAMPLE:
                  #     listeners = PLAINTEXT://your.host.name:9092
                  #listeners=PLAINTEXT://:9092

                  我单机版的,没有集群,这里都没配置
                    • 嘿嘿,这次没异常了,这个lisreners的作用是什么呢?
                      kafka的控制台没看到发送的信息,我应该怎么才能知道发送成功了。
                      • 评论…
                        • in this conversation
                          提问