kafka用生产者客户端JAVA_API向kafka添加消息时失败,但是发现topic创建成功了,但topic中没有任何消息

板面 发表于: 2018-06-28   最后更新时间: 2018-06-28 23:13:36   5,538 游览

环境腾讯云,

版本:kafka_2.11-1.1.0

zookeeper用的kafka自带的

config/server.properties 只改变了 listeners=PLAINTEXT://10.2.0.7:9092

用kafka自带的命令行工具测试可以写入数据

java代码

public static void p1(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "211.159.171.173:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        producer.send(new ProducerRecord<String, String>("onesdk2", "aaaaa", "aaaaa"));

        producer.flush();
        producer.close();
    }

运行时会停一阵,然后发现topic会创建成功,但是没有数据
发现zookeeper中的日志信息为:

[2018-06-28 19:03:52,899] INFO Got user-level KeeperException when processing sessionid:0x16445fbcfb60012 type:create cxid:0x2 zxid:0xc9 txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-06-28 19:03:53,031] INFO Got user-level KeeperException when processing sessionid:0x16445fbcfb60012 type:create cxid:0x19 zxid:0xcd txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-40505/owners/onesdk2 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-40505/owners/onesdk2 (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-06-28 19:03:53,034] INFO Got user-level KeeperException when processing sessionid:0x16445fbcfb60012 type:create cxid:0x1a zxid:0xce txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-40505/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-40505/owners (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-06-28 19:04:52,698] INFO Got user-level KeeperException when processing sessionid:0x16445fbcfb60012 type:setData cxid:0x24 zxid:0xd2 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-40505/offsets/onesdk2/0 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-40505/offsets/onesdk2/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-06-28 19:04:52,701] INFO Got user-level KeeperException when processing sessionid:0x16445fbcfb60012 type:create cxid:0x25 zxid:0xd3 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-40505/offsets Error:KeeperErrorCode = NoNode for /consumers/console-consumer-40505/offsets (org.apache.zookeeper.server.PrepRequestProcessor)

kafka日志:

[2018-06-28 18:52:47,750] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2018-06-28 18:52:47,816] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions onesdk2-0 (kafka.server.ReplicaFetcherManager)
[2018-06-28 18:52:47,829] INFO Replica loaded for partition onesdk2-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-06-28 18:52:47,831] INFO [Partition onesdk2-0 broker=0] onesdk2-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-06-28 18:52:47,848] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-06-28 19:02:47,642] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-06-28 19:03:42,894] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: onesdk2-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

我在本地虚拟机测试代码是没问题的, 腾讯云的ip端口用telnet测试也可以连上..

卡了一天了 跪求指导

发表于 2018-06-28
添加评论

ping计算机名的时候,是你的内部ip。
如果不是,配置host。

你写的没问题。
producer.send(new ProducerRecord<String, String>("onesdk2", "aaaaa", "aaaaa")).get();
加个.get(),看看是否有异常打。

板面 -> 半兽人 6年前

感谢大佬回复,已经改好了.配置如下:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://211.159.171.173:9092
这样就可以访问到了.
但还是有一点不明白, 值没有存进去,为啥topic却创建成功了呢?
希望大佬解答下

半兽人 -> 板面 6年前

没懂额,你这是超时吧。

你的答案

查看kafka相关的其他问题或提一个您自己的问题