java客户端kakfa(2.11-0.10.2.1)无法获取服务端kafka(2.10-0.8.2.1)的KafkaConsuemer

  续、 发表于: 2018-03-02   最后更新时间: 2018-03-03 23:24:03   3,969 游览

无法获取kafkaconsumer,相关代码如下

try {
            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        } catch (KafkaException e) {
            throw new KafkaClientError(String.format("can not fetch patitions for topic: %s", topic), e);
        }

properties参数如下:

Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.137.131:2181");
prop.put("group.id", "kafkaconsumer");
prop.put("enable.auto.commit", "false");
prop.put("session.timeout.ms", "20000");
prop.put("max.poll.records", "50");
prop.put("auto.offset.reset", "earliest");
prop.put("fetch.max.bytes", "2097152");
prop.put(TOPIC_NAME, topicName);
prop.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

请问需要怎么更改?

发表于 2018-03-02
  续、 -> 半兽人 6年前

通过高级消费者,使用多线程的情况下,该如何手动提交offset呢?

半兽人 ->   续、 6年前

多线程的情况下是无法保障线程安全的。

  续、 -> 半兽人 6年前

客户端kakfa(2.11-0.10.2.1)应该怎么向服务端kafka(2.10-0.8.2.1)发送(生产)数据?

半兽人 ->   续、 6年前

https://www.orchome.com/303
建议生产和消费者都用最新的

  续、 -> 半兽人 6年前

出现了下面的异常

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

半兽人 ->   续、 6年前

网络不通,你集群配置外网访问了吗?

listeners=

  续、 -> 半兽人 6年前

配置了,服务端kafka配置如下:

log.segment.bytes=1073741824
socket.send.buffer.bytes=1048576
num.network.threads=3
log.flush.scheduler.interval.ms=3000
message.max.bytes=1048576
replica.lag.time.max.ms=10000
num.io.threads=8
fetch.purgatory.purge.interval.requests=10000
replica.lag.max.messages=4000
host.name=192.168.137.131
port=6667
advertised.host.name=192.168.137.131
advertised.port=6667
listeners=PLAINTEXT://192.168.137.131:6667
advertised.listeners=PLAINTEXT://192.168.137.131:6667
producer.purgatory.purge.interval.requests=10000
default.replication.factor=1
replica.high.watermark.checkpoint.interval.ms=5000
controlled.shutdown.retry.backoff.ms=5000
num.partitions=1
log.flush.interval.messages=10000
controller.socket.timeout.ms=30000
queued.max.requests=1000
controlled.shutdown.max.retries=3
replica.fetch.wait.max.ms=500
controlled.shutdown.enable=false
log.roll.hours=168
replica.socket.receive.buffer.bytes=1048576
log.retention.bytes=-1
zookeeper.connection.timeout.ms=10000
replica.fetch.max.bytes=1048576
num.replica.fetchers=1
socket.request.max.bytes=104857600
log.cleanup.interval.mins=10
zookeeper.sync.time.ms=2000
log.index.interval.bytes=4096
broker.id=0
controller.message.queue.size=10
log.flush.interval.ms=3000
replica.fetch.min.bytes=1
replica.socket.timeout.ms=30000
zookeeper.session.timeout.ms=15000
auto.create.topics.enable=false
log.index.size.max.bytes=10485760
socket.receive.buffer.bytes=1048576
log.retention.hours=168
log.cleaner.enable=true
auto.leader.rebalance.enable=false
log.dirs=/home/kafkaData/kafka
num.recovery.threads.per.data.dir=1

zookeeper.connect=192.168.137.131:2181

local.zookeeper.enable=true
local.zookeeper.dataDir=/home/kafkaData/zookeeper
local.zookeeper.clientPort=2181
local.zookeeper.maxClientCnxns=0

autopurge.purgeInterval=24
autopurge.snapRetainCount=10
半兽人 ->   续、 6年前

telnet 192.168.137.131 6667
通吗

  续、 -> 半兽人 6年前

通的,而且,我用2.11-0.8.2.1的客户端也可以发送数据到服务端

  续、 -> 半兽人 6年前

如果2.11-0.10.2.1的客户端使用kafka.javaapi.producer.Producer,也是可以发送数据成功的

  续、 -> 半兽人 6年前

可以用qq交流么,最近快被这个问题搞死了,我的qq:357271682

  续、 -> 半兽人 6年前

这个有尝试,但还是不行,目前能确定的是可以使用kafka.javaapi.producer.Producer发送数据
然后在消费数据的时候,使用的是高级消费,topic里面有数据,但是消费不到,代码在:https://www.orchome.com/846

  续、 -> 半兽人 6年前

通的,而且,我用2.11-0.8.2.1的客户端也可以发送数据到服务端

半兽人 ->   续、 6年前

客户端低版本可以向高版本发的。

  续、 -> 半兽人 6年前

出现了下面的异常

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

半兽人 ->   续、 6年前

你这个是网络问题那,超时。

通过高级消费者,使用多线程的情况下,该如何手动提交offset呢?

客户端版本不能高于服务端。一般是向下兼容。

你的答案

查看kafka相关的其他问题或提一个您自己的问题