Kafka集群中一个节点挂了后,生产数据报错。

lene 发表于: 2022-03-29   最后更新时间: 2022-03-29 14:45:56   1,711 游览

1、我用虚拟机搭建了Kafka集群,3个broker分配了3个IP,分别是192.168.3.250、251、252。其中251这台是leader,其他两台是follower。
2、版本:apache-zookeeper-3.7.0-bin.tar.gz、kafka_2.12-2.8.0.tgz。
3、模拟系统宕机:当我把其中一台follower-broker的zookeeper和kafka都shutdown时,生产数据就报错了。
4、生产者参数acks=1。

当正常生产和消费时,程序打印如下: 
produce | offset4, topic:data01 ,耗时:1173
consumer1 | offset = 4, value = {"topic":"coindata","time":"2022-03-29 10:40:24.112"}
produce | offset5, topic:data01 ,耗时:117
consumer1 | offset = 5, value = {"topic":"coindata","time":"2022-03-29 10:40:27.257"}
produce | offset6, topic:data01 ,耗时:117
consumer1 | offset = 6, value = {"topic":"coindata","time":"2022-03-29 10:40:29.259"}
produce | offset7, topic:data01 ,耗时:116
consumer1 | offset = 7, value = {"topic":"coindata","time":"2022-03-29 10:40:31.271"}
produce | offset8, topic:data01 ,耗时:116
consumer1 | offset = 8, value = {"topic":"coindata","time":"2022-03-29 10:40:33.271"}
当follower宕机时,程序打印如下:
produce | offset:-1, topic:data01 ,耗时:122344
Expiring 1 record(s) for data01-5:120014 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120010
Expiring 1 record(s) for data01-5:120010 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120012
Expiring 1 record(s) for data01-5:120012 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120013
Expiring 1 record(s) for data01-5:120013 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120012

生产者代码如下:

// 生产者配置参数
KafkaProduce(String kafka_url, String zookeeper_url) {
     properties = new Properties();
     //kafka_url= "192.168.3.250:9092,192.168.3.251:9092,192.168.3.252:9092"
     properties.put("bootstrap.servers", kafka_url); 
     properties.put("acks", "1");
     properties.put("retries", 0);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     properties.put("bak.partitioner.class", "kafka.producer.DefaultPartitioner");
     properties.put("bak.key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     properties.put("bak.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     properties.put("bak.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     properties.put("compression.type","snappy");
     properties.put("batch.size","200");
     properties.put("linger.ms","100");
}

// 生产数据
public void sendMessage(String topic, String key, String value) throws InterruptedException {
    KafkaProducer<String, String> kp = new KafkaProducer<String, String>(properties);
    while(true) {
        // 消息封装
        for (int i = 10000001; i < 10000502; i++) {
            String time = sdf_y_m_d_h_m_s.format(new Date());
            value = "{\"topic\":\"coindata\",\"time\":" +"\""+ time +"\"}";
            long timetick_start = new Date().getTime();
            ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, value);
            kp.send(pr, new Callback() {
                // 回调函数
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    long timetick_end = new Date().getTime();
                    System.out.println("produce | offset:" + metadata.offset() + ", topic:" + metadata.topic() +" ,耗时:" + (timetick_end - timetick_start));

                    if (null != exception) {
                        System.out.println(exception.getMessage());
                    }
                }
            });
        }
    }
    //关闭produce
    //kp.close();
}

5、可能我对Kafka的基础理论还不是很了解,我不太明白为何一台宕机后就无法生产数据了?

发表于 2022-03-29

你失败的原因可能是__consumer_offsets是个单个副本,这个topic存储的是kafka消费者的offset。

另外,关键有问题的topic的描述你没有贴,你用kafka自动的命令进行消费和生产也ok的时候,在去进行代码测试。

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181

## 查询集群描述(新)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe

命令来自:kafka命令大全

lene -> 半兽人 2年前

感谢,我已经明白了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题