java代码消费不了Kafka消息

顺其自然 发表于: 2018-04-02   最后更新时间: 2021-09-18 14:30:38   7,462 游览

按照orcHome网上的教程写了消费者程序,但是不能消费消息,但是用Kafka命令能消费消息,以下是我的相关配置。
kafka版本是kafka_2.11-1.0.1

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.165:9092,192.168.201.168:9092");//服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");//消息者组
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//是否自动提交偏移量
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//指定的时间内发送心跳给群组的时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");//送心跳的频率一般设置成session.timeout.ms值的3分之一。
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");//消息者从服务器获取记录的最小字节数。
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");//等到有足够的数据时才返回给消费者,看和fetch.min.bytes参数哪个先满足。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//从头条开始处理消费
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
public void run() {
    String topic=Global.getConfig("get_topic");
    consumer.subscribe(Arrays.asList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("tostring="+record.toString()+"|key="+record.key()+"|value="+record.value());
        }
        consumer.commitSync();
    }
}

我用Debug跟踪了一下,发现卡在AbstractCoordinator类中ensureCoordinatorReady方法这块了,老是进行循环,打印log.debug("Coordinator discovery failed, refreshing metadata")

protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
    long remainingMs = timeoutMs;

    while (coordinatorUnknown()) {
        RequestFuture<Void> future = lookupCoordinator();
        client.poll(future, remainingMs);

        if (future.failed()) {
            if (future.isRetriable()) {
                remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
                if (remainingMs <= 0)
                    break;

                *log.debug("Coordinator discovery failed, refreshing metadata");*
                client.awaitMetadataUpdate(remainingMs);
            } else
                throw future.exception();
        } else if (coordinator != null && client.connectionFailed(coordinator)) {
            // we found the coordinator, but the connection has failed, so mark
            // it dead and backoff before retrying discovery
            markCoordinatorUnknown();
            time.sleep(retryBackoffMs);
        }

        remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
        if (remainingMs <= 0)
            break;
    }

    return !coordinatorUnknown();
}
发表于 2018-04-02
添加评论

睡觉前对这个问题不死心,打开电脑又查了查,终于解决了问题。主要是因为我服务器之前装过Kafka所以产生了脏数据。

以下是解决过程:
1、进入zookeeper 运行 zkCli.sh
2、运行ls /brokers/topics查看主题
3、然后运行 rmr /brokers/topics/consumer_offsets删除consumer_offsets_主题
4、然后重启kafka集群就好了。

-> 顺其自然 6年前

我也是这个问题,最后同样的方式解决了。才看到这条,相见恨晚

我也是这个问题,最后同样的方式解决了。感谢楼主

有没有报错?

顺其自然 -> 半兽人 6年前

控制台打印这个,也没啥错误啊,消息是我用命令发送的。/usr/local/kafka/bin/kafka-console-

producer.sh --broker-list 192.168.202.165:9092 --topic  test
2018-04-02 22:03:08,170 INFO  [main] org.springframework.boot.StartupInfoLogger[logStarted 57]: Started Application in 15.684 seconds (JVM running for 16.758)
2018-04-02 22:03:08,666 INFO  [pool-2-thread-1] org.apache.kafka.clients.Metadata[update 265]: Cluster ID: 380dL-1DRV2PUuB1NLMSbw
顺其自然 -> 半兽人 6年前

调试了好几次小时了,没有解决真是郁闷。。但是我用以下这个jar包就能消费消息,感觉还是代码哪块没写好。

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka_2.8.0</artifactId>
 <version>0.8.0-beta1</version>
</dependency>
顺其自然 -> 半兽人 6年前

在老项目里用这个代码就能消息到消息。

String topic = Global.getConfig("topic_task_history");//接收机器返回任务状态的topic
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
    .createMessageStreams(topicCountMap);
  KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
  ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
  while (iterator.hasNext()) {
   MessageAndMetadata<byte[], byte[]> msg = iterator.next();
   String message;
   try {
    message = new String(msg.message(),"UTF-8");
    System.out.println("message="+message);
   } catch (UnsupportedEncodingException e) {
    logger.error("监控历史消息|异常",e);
   }
  }
半兽人 -> 顺其自然 6年前

改了参数,直接运行试试。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.30.34.4:9092");
        props.put("group.id", "myself");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("TRADE-NOTIFY"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
        }
    }
}
顺其自然 -> 半兽人 6年前

刚试了一下,用这个测试类还是不行。难道是我Kafka服务器配置有问题?

無名 -> 顺其自然 6年前

没错误,实在是不知道怎么帮你

顺其自然 -> 無名 6年前

我也挺郁闷的。。刚重新装了Kafka了也不行。这个日志信息是啥意思呢?log.debug("Coordinator discovery failed, refreshing metadata")

無名 -> 顺其自然 6年前

你telnet下你的服务器。9092端口的

顺其自然 -> 無名 6年前

都是通的。。

确保topic中有消息。

你的答案

查看kafka相关的其他问题或提一个您自己的问题