0 声望

这家伙太懒,什么都没留下

个人动态
  • 回复 kafka三个zookeeper一个broker中元数据怎么存 中 :

    ok,感谢

    4年前
  • 半兽人 回复 kafka三个zookeeper一个broker中元数据怎么存 中 :

    zookeeper只有一个leader,其他follower一直会同步leader的数据。

    4年前
  • 半兽人 回复 kafka消费者Java客户端 中 :

    没看到你的报错呀。

    5年前
  • kafka消费者Java客户端 发表评论:

    代码:

    public static final String brokerList = "172.16.15.89:9092";
        public static final String topic = "topic-demo2";
        public static final String groupId = "group.demo";
        public static final AtomicBoolean isRunning = new AtomicBoolean(true);
        public static Properties ininConfig(){
            Properties properties=new Properties();
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CompnayDeserializer.class.getName());
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
            return properties;
        }
        public static void main(String[] args) {
            Properties properties=ininConfig();
            KafkaConsumer<string,string> consumer=new KafkaConsumer<string, string="">(properties);
            consumer.subscribe(Arrays.asList(topic));
            try {
                while (isRunning.get()) {
                    ConsumerRecords<string, string=""> records =  consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<string, string=""> record : records) {
                        System.out.println("topic = " + record.topic()
                                + ", partition = " + record.partition()
                                + ", offset = " + record.offset());
                        System.out.println("消息:key = " + record.key()
                                + ", value = " + record.value());
                    }
                }
            }  catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    

    日志:

    [2019-07-04 18:01:11,573] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 14 (__consumer_offsets-12) (reason: Adding new member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:01:11,573] INFO [GroupCoordinator 0]: Stabilized group group.demo generation 15 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:01:11,579] INFO [GroupCoordinator 0]: Assignment received from leader for group group.demo for generation 15 (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8 in group group.demo has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 15 (__consumer_offsets-12) (reason: removing member consumer-1-296a3a10-4b03-4d23-b698-616371f285c8 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:01:29,556] INFO [GroupCoordinator 0]: Group group.demo with generation 16 is now empty (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:04:09,237] INFO [GroupCoordinator 0]: Preparing to rebalance group group.demo in state PreparingRebalance with old generation 16 (__consumer_offsets-12) (reason: Adding new member consumer-1-6b5e9036-a820-4775-b891-4ca11e8a4ed3) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:04:09,238] INFO [GroupCoordinator 0]: Stabilized group group.demo generation 17 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
    [2019-07-04 18:04:09,243] INFO [GroupCoordinator 0]: Assignment received from leader for group group.demo for generation 17 (kafka.coordinator.group.GroupCoordinator)
    

    麻烦大佬给看看 为啥报错?消费者客户端读取不到消息.

    5年前
  • 回复 Kafka Producer配置 中 :

    单位不一样吗

    5年前