1、我用虚拟机搭建了Kafka集群,3个broker分配了3个IP,分别是192.168.3.250、251、252。其中251这台是leader,其他两台是follower。
2、版本:apache-zookeeper-3.7.0-bin.tar.gz、kafka_2.12-2.8.0.tgz。
3、模拟系统宕机:当我把其中一台follower-broker的zookeeper和kafka都shutdown时,生产数据就报错了。
4、生产者参数acks=1。
当正常生产和消费时,程序打印如下:
produce | offset:4, topic:data01 ,耗时:1173
consumer1 | offset = 4, value = {"topic":"coindata","time":"2022-03-29 10:40:24.112"}
produce | offset:5, topic:data01 ,耗时:117
consumer1 | offset = 5, value = {"topic":"coindata","time":"2022-03-29 10:40:27.257"}
produce | offset:6, topic:data01 ,耗时:117
consumer1 | offset = 6, value = {"topic":"coindata","time":"2022-03-29 10:40:29.259"}
produce | offset:7, topic:data01 ,耗时:116
consumer1 | offset = 7, value = {"topic":"coindata","time":"2022-03-29 10:40:31.271"}
produce | offset:8, topic:data01 ,耗时:116
consumer1 | offset = 8, value = {"topic":"coindata","time":"2022-03-29 10:40:33.271"}
当follower宕机时,程序打印如下:
produce | offset:-1, topic:data01 ,耗时:122344
Expiring 1 record(s) for data01-5:120014 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120010
Expiring 1 record(s) for data01-5:120010 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120012
Expiring 1 record(s) for data01-5:120012 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120013
Expiring 1 record(s) for data01-5:120013 ms has passed since batch creation
produce | offset:-1, topic:data01 ,耗时:120012
生产者代码如下:
// 生产者配置参数
KafkaProduce(String kafka_url, String zookeeper_url) {
properties = new Properties();
//kafka_url= "192.168.3.250:9092,192.168.3.251:9092,192.168.3.252:9092"
properties.put("bootstrap.servers", kafka_url);
properties.put("acks", "1");
properties.put("retries", 0);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bak.partitioner.class", "kafka.producer.DefaultPartitioner");
properties.put("bak.key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bak.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bak.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type","snappy");
properties.put("batch.size","200");
properties.put("linger.ms","100");
}
// 生产数据
public void sendMessage(String topic, String key, String value) throws InterruptedException {
KafkaProducer<String, String> kp = new KafkaProducer<String, String>(properties);
while(true) {
// 消息封装
for (int i = 10000001; i < 10000502; i++) {
String time = sdf_y_m_d_h_m_s.format(new Date());
value = "{\"topic\":\"coindata\",\"time\":" +"\""+ time +"\"}";
long timetick_start = new Date().getTime();
ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, value);
kp.send(pr, new Callback() {
// 回调函数
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long timetick_end = new Date().getTime();
System.out.println("produce | offset:" + metadata.offset() + ", topic:" + metadata.topic() +" ,耗时:" + (timetick_end - timetick_start));
if (null != exception) {
System.out.println(exception.getMessage());
}
}
});
}
}
//关闭produce
//kp.close();
}
5、可能我对Kafka的基础理论还不是很了解,我不太明白为何一台宕机后就无法生产数据了?
你失败的原因可能是
__consumer_offsets
是个单个副本,这个topic存储的是kafka消费者的offset。另外,关键有问题的topic的描述你没有贴,你用kafka自动的命令进行消费和生产也ok的时候,在去进行代码测试。
## 查询集群描述 bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 ## 查询集群描述(新) bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
命令来自:kafka命令大全
感谢,我已经明白了。
你的答案