java代码消费不了Kafka消息

顺其自然 发表于: 2018-04-02   最后更新时间: 2018-04-02  
  •   0 订阅,666 游览

按照orcHome网上的教程写了消费者程序,但是不能消费消息,但是用Kafka命令能消费消息,以下是我的相关配置。
kafka版本是kafka_2.11-1.0.1

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
</dependency>
Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.165:9092,192.168.201.168:9092");//服务器地址
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");//消息者组
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//是否自动提交偏移量
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//指定的时间内发送心跳给群组的时间
            props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");//送心跳的频率一般设置成session.timeout.ms值的3分之一。
            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");//消息者从服务器获取记录的最小字节数。
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");//等到有足够的数据时才返回给消费者,看和fetch.min.bytes参数哪个先满足。
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//从头条开始处理消费
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            consumer = new KafkaConsumer<>(props);
public void run() {
        String topic=Global.getConfig("get_topic");
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("tostring="+record.toString()+"|key="+record.key()+"|value="+record.value());
            }
            consumer.commitSync();
        }
    }

我用Debug跟踪了一下,发现卡在AbstractCoordinator类中ensureCoordinatorReady方法这块了,老是进行循环,打印log.debug("Coordinator discovery failed, refreshing metadata")。

protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
        long remainingMs = timeoutMs;

        while (coordinatorUnknown()) {
            RequestFuture<Void> future = lookupCoordinator();
            client.poll(future, remainingMs);

            if (future.failed()) {
                if (future.isRetriable()) {
                    remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
                    if (remainingMs <= 0)
                        break;

*log.debug("Coordinator discovery failed, refreshing metadata");*
                    client.awaitMetadataUpdate(remainingMs);
                } else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                markCoordinatorUnknown();
                time.sleep(retryBackoffMs);
            }

            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
            if (remainingMs <= 0)
                break;
        }

        return !coordinatorUnknown();
    }






发表于: 3月前   最后更新时间: 3月前   游览量:666
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 睡觉前对这个问题不死心,打开电脑又查了查,终于解决了问题。主要是因为我服务器之前装过Kafka所以产生了脏数据。
    以下是解决过程:
    1、进入zookeeper 运行zkCli.sh 。
    2、运行ls /brokers/topics 查看主题
    3、然后运行 rmr /brokers/topics/__consumer_offsets 删除__consumer_offsets_主题
    4、然后重启kafka集群就好了。
    确保topic中有消息。
    有没有报错?
    •  控制台打印这个,也没啥错误啊,消息是我用命令发送的。/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.202.165:9092 --topic  test
      2018-04-02 22:03:08,170 INFO  [main] org.springframework.boot.StartupInfoLogger[logStarted 57]: Started Application in 15.684 seconds (JVM running for 16.758)
      2018-04-02 22:03:08,666 INFO  [pool-2-thread-1] org.apache.kafka.clients.Metadata[update 265]: Cluster ID: 380dL-1DRV2PUuB1NLMSbw
        • 调试了好几次小时了,没有解决真是郁闷。。但是我用以下这个jar包就能消费消息,感觉还是代码哪块没写好。
          <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.8.0</artifactId>
             <version>0.8.0-beta1</version>
            </dependency>
            • 在老项目里用这个代码就能消息到消息。

              String topic = Global.getConfig("topic_task_history");//接收机器返回任务状态的topic
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(topic, 1);
                Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                  .createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
                ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
                while (iterator.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> msg = iterator.next();
                 String message;
                 try {
                  message = new String(msg.message(),"UTF-8");
                  System.out.println("message="+message);
                 } catch (UnsupportedEncodingException e) {
                  logger.error("监控历史消息|异常",e);
                 }
                }
                • 改了参数,直接运行试试。

                  import org.apache.kafka.clients.consumer.ConsumerRecord;
                  import org.apache.kafka.clients.consumer.ConsumerRecords;
                  import org.apache.kafka.clients.consumer.KafkaConsumer;

                  import java.util.Arrays;
                  import java.util.Properties;

                  public class ConsumerTest {
                      public static void main(String[] args) {
                          Properties props = new Properties();
                          props.put("bootstrap.servers", "172.30.34.4:9092");
                          props.put("group.id", "myself");
                          props.put("enable.auto.commit", "true");
                          props.put("auto.commit.interval.ms", "1000");
                          props.put("session.timeout.ms", "30000");
                          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                          KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                          consumer.subscribe(Arrays.asList("TRADE-NOTIFY"));
                          while (true) {
                              ConsumerRecords<String, String> records = consumer.poll(100);
                              for (ConsumerRecord<String, String> record : records)
                                  System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
                          }
                      }
                  }
                    • 我也挺郁闷的。。刚重新装了Kafka了也不行。这个日志信息是啥意思呢?log.debug("Coordinator discovery failed, refreshing metadata")
                      • 评论…
                        • in this conversation
                          提问