java客户端kakfa(2.11-0.10.2.1)无法获取服务端kafka(2.10-0.8.2.1)的KafkaConsuemer

  续、 发表于: 2018-03-02   最后更新时间: 2018-03-03  
  •   27 订阅,514 游览

无法获取kafkaconsumer,相关代码如下

try {
            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        } catch (KafkaException e) {
            throw new KafkaClientError(String.format("can not fetch patitions for topic: %s", topic), e);
        }

properties参数如下:

Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.137.131:2181");
prop.put("group.id", "kafkaconsumer");
prop.put("enable.auto.commit", "false");
prop.put("session.timeout.ms", "20000");
prop.put("max.poll.records", "50");
prop.put("auto.offset.reset", "earliest");
prop.put("fetch.max.bytes", "2097152");
prop.put(TOPIC_NAME, topicName);
prop.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

请问需要怎么更改?







发表于: 3月前   最后更新时间: 3月前   游览量:514
上一条: zookeeper节点中kafka的一些消费组节点信息不全
下一条: kafka高级消费者API读取消息不完整

评论…


  • 客户端版本不能高于服务端。一般是向下兼容。
    通过高级消费者,使用多线程的情况下,该如何手动提交offset呢?
    http://orchome.com/10
    • 配置了,服务端kafka配置如下:
      log.segment.bytes=1073741824
      socket.send.buffer.bytes=1048576
      num.network.threads=3
      log.flush.scheduler.interval.ms=3000
      message.max.bytes=1048576
      replica.lag.time.max.ms=10000
      num.io.threads=8
      fetch.purgatory.purge.interval.requests=10000
      replica.lag.max.messages=4000
      host.name=192.168.137.131
      port=6667
      advertised.host.name=192.168.137.131
      advertised.port=6667
      listeners=PLAINTEXT://192.168.137.131:6667
      advertised.listeners=PLAINTEXT://192.168.137.131:6667
      producer.purgatory.purge.interval.requests=10000
      default.replication.factor=1
      replica.high.watermark.checkpoint.interval.ms=5000
      controlled.shutdown.retry.backoff.ms=5000
      num.partitions=1
      log.flush.interval.messages=10000
      controller.socket.timeout.ms=30000
      queued.max.requests=1000
      controlled.shutdown.max.retries=3
      replica.fetch.wait.max.ms=500
      controlled.shutdown.enable=false
      log.roll.hours=168
      replica.socket.receive.buffer.bytes=1048576
      log.retention.bytes=-1
      zookeeper.connection.timeout.ms=10000
      replica.fetch.max.bytes=1048576
      num.replica.fetchers=1
      socket.request.max.bytes=104857600
      log.cleanup.interval.mins=10
      zookeeper.sync.time.ms=2000
      log.index.interval.bytes=4096
      broker.id=0
      controller.message.queue.size=10
      log.flush.interval.ms=3000
      replica.fetch.min.bytes=1
      replica.socket.timeout.ms=30000
      zookeeper.session.timeout.ms=15000
      auto.create.topics.enable=false
      log.index.size.max.bytes=10485760
      socket.receive.buffer.bytes=1048576
      log.retention.hours=168
      log.cleaner.enable=true
      auto.leader.rebalance.enable=false
      log.dirs=/home/kafkaData/kafka
      num.recovery.threads.per.data.dir=1

      zookeeper.connect=192.168.137.131:2181

      local.zookeeper.enable=true
      local.zookeeper.dataDir=/home/kafkaData/zookeeper
      local.zookeeper.clientPort=2181
      local.zookeeper.maxClientCnxns=0

      autopurge.purgeInterval=24
      autopurge.snapRetainCount=10

        • 这个有尝试,但还是不行,目前能确定的是可以使用kafka.javaapi.producer.Producer发送数据
          然后在消费数据的时候,使用的是高级消费,topic里面有数据,但是消费不到,代码在:http://orchome.com/846
          • 评论…
            • in this conversation
              提问