Kafka服务端
创建,格式化,同时添加账户(关键):
bin/kafka-storage.sh format -t "AAAAAAAAAAAAAAAAAAAAAA" -c config/kraft/server.properties \
--add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
认证配置
more config/kraft/server.properties
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://10.0.19.208:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
## SASL
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.mechanism.controller.protocol=SCRAM-SHA-256
more /etc/kafka/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin";
};
启动Kafka
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/kraft/server.properties
Kafka客户端
示例1:命令行
通过kafka自带的生产者和消费者命令行,来验证认证是否正常。
more /etc/kafka/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
consumer.properties 和 producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
生产者和消费者
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties
示例2:JAVA生产者和消费者
下面是Java客户端的使用示例:
生产者
package com.system.kafka.clients.demo.producer.origin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.19.208:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 添加 SASL SCRAM 配置
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256"); // 使用 SCRAM-SHA-256
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\";");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
try {
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i))).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
producer.close();
}
}
消费者
package com.system.kafka.clients.demo.producer.origin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.19.208:9092");
props.put("group.id", "myself");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 添加 SASL SCRAM 配置
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
}
}
}
本例说明文档来自
REF
https://docs.confluent.io/platform/current/security/authentication/sasl/scram/overview.html
https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API