无法获取kafkaconsumer,相关代码如下
try {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
} catch (KafkaException e) {
throw new KafkaClientError(String.format("can not fetch patitions for topic: %s", topic), e);
}
properties参数如下:
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.137.131:2181");
prop.put("group.id", "kafkaconsumer");
prop.put("enable.auto.commit", "false");
prop.put("session.timeout.ms", "20000");
prop.put("max.poll.records", "50");
prop.put("auto.offset.reset", "earliest");
prop.put("fetch.max.bytes", "2097152");
prop.put(TOPIC_NAME, topicName);
prop.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
请问需要怎么更改?
https://www.orchome.com/10
通过高级消费者,使用多线程的情况下,该如何手动提交offset呢?
多线程的情况下是无法保障线程安全的。
客户端kakfa(2.11-0.10.2.1)应该怎么向服务端kafka(2.10-0.8.2.1)发送(生产)数据?
https://www.orchome.com/303
建议生产和消费者都用最新的
出现了下面的异常
网络不通,你集群配置外网访问了吗?
配置了,服务端kafka配置如下:
log.segment.bytes=1073741824 socket.send.buffer.bytes=1048576 num.network.threads=3 log.flush.scheduler.interval.ms=3000 message.max.bytes=1048576 replica.lag.time.max.ms=10000 num.io.threads=8 fetch.purgatory.purge.interval.requests=10000 replica.lag.max.messages=4000 host.name=192.168.137.131 port=6667 advertised.host.name=192.168.137.131 advertised.port=6667 listeners=PLAINTEXT://192.168.137.131:6667 advertised.listeners=PLAINTEXT://192.168.137.131:6667 producer.purgatory.purge.interval.requests=10000 default.replication.factor=1 replica.high.watermark.checkpoint.interval.ms=5000 controlled.shutdown.retry.backoff.ms=5000 num.partitions=1 log.flush.interval.messages=10000 controller.socket.timeout.ms=30000 queued.max.requests=1000 controlled.shutdown.max.retries=3 replica.fetch.wait.max.ms=500 controlled.shutdown.enable=false log.roll.hours=168 replica.socket.receive.buffer.bytes=1048576 log.retention.bytes=-1 zookeeper.connection.timeout.ms=10000 replica.fetch.max.bytes=1048576 num.replica.fetchers=1 socket.request.max.bytes=104857600 log.cleanup.interval.mins=10 zookeeper.sync.time.ms=2000 log.index.interval.bytes=4096 broker.id=0 controller.message.queue.size=10 log.flush.interval.ms=3000 replica.fetch.min.bytes=1 replica.socket.timeout.ms=30000 zookeeper.session.timeout.ms=15000 auto.create.topics.enable=false log.index.size.max.bytes=10485760 socket.receive.buffer.bytes=1048576 log.retention.hours=168 log.cleaner.enable=true auto.leader.rebalance.enable=false log.dirs=/home/kafkaData/kafka num.recovery.threads.per.data.dir=1 zookeeper.connect=192.168.137.131:2181 local.zookeeper.enable=true local.zookeeper.dataDir=/home/kafkaData/zookeeper local.zookeeper.clientPort=2181 local.zookeeper.maxClientCnxns=0 autopurge.purgeInterval=24 autopurge.snapRetainCount=10
telnet 192.168.137.131 6667
通吗
通的,而且,我用2.11-0.8.2.1的客户端也可以发送数据到服务端
如果2.11-0.10.2.1的客户端使用kafka.javaapi.producer.Producer,也是可以发送数据成功的
可以用qq交流么,最近快被这个问题搞死了,我的qq:357271682
https://www.orchome.com/342
看看这个
这个有尝试,但还是不行,目前能确定的是可以使用kafka.javaapi.producer.Producer发送数据
然后在消费数据的时候,使用的是高级消费,topic里面有数据,但是消费不到,代码在:https://www.orchome.com/846
通的,而且,我用2.11-0.8.2.1的客户端也可以发送数据到服务端
客户端低版本可以向高版本发的。
出现了下面的异常
你这个是网络问题那,超时。
通过高级消费者,使用多线程的情况下,该如何手动提交offset呢?
客户端版本不能高于服务端。一般是向下兼容。
你的答案