Kafka服务端
创建,格式化,同时添加账户(关键):
bin/kafka-storage.sh format -t "AAAAAAAAAAAAAAAAAAAAAA" -c config/kraft/server.properties \
--add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
认证配置
more config/kraft/server.properties
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://10.0.19.208:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
## SASL
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.mechanism.controller.protocol=SCRAM-SHA-256
more /etc/kafka/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin";
};
启动Kafka
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/kraft/server.properties
Kafka客户端
示例1:命令行
通过kafka自带的生产者和消费者命令行,来验证认证是否正常。
more /etc/kafka/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
consumer.properties 和 producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
生产者和消费者
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties
示例2:JAVA生产者和消费者
下面是Java客户端的使用示例:
生产者
package com.system.kafka.clients.demo.producer.origin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.19.208:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 添加 SASL SCRAM 配置
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256"); // 使用 SCRAM-SHA-256
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\";");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
try {
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i))).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
producer.close();
}
}
消费者
package com.system.kafka.clients.demo.producer.origin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.19.208:9092");
props.put("group.id", "myself");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 添加 SASL SCRAM 配置
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
}
}
}
本例说明文档来自
REF
https://docs.confluent.io/platform/current/security/authentication/sasl/scram/overview.html
https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API
您好,你这个没有加载acl模块的配置吗?
我的只要在配置文件中添加authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer就报错:
[2025-07-01 16:16:57,612] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor) [2025-07-01 16:16:57,614] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=9, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.13:9093 (id: -4 rack: null)) (org.apache.kafka.raft.KafkaRaftClient) [2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer) [2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer) [2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer) [2025-07-01 16:16:57,636] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer) [2025-07-01 16:16:57,636] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] initialized channel manager. (kafka.server.ControllerRegistrationManager) [2025-07-01 16:16:57,636] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer) [2025-07-01 16:16:57,637] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] maybeSendControllerRegistration: cannot register yet because the metadata.version is still 3.0-IV1, which does not support KIP-919 controller registration. (kafka.server.ControllerRegistrationManager) [2025-07-01 16:16:57,637] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer) [2025-07-01 16:16:57,652] INFO [controller-1-to-controller-registration-channel-manager]: Starting (kafka.server.NodeToControllerRequestThread) [2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2025-07-01 16:16:57,665] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2025-07-01 16:16:57,672] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=1, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=2GZE2nfgQvaYonlRrNSrrw)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.11:38142-0', clientAddress=/10.206.68.11, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@6c15c06c]) (kafka.server.ControllerApis) org.apache.kafka.common.errors.AuthorizerNotReadyException [2025-07-01 16:16:57,674] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2025-07-01 16:16:57,683] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=2, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=Rl3w3w2JRnuwNZ78qQXWrQ)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.12:5298-0', clientAddress=/10.206.68.12, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1ed5187]) (kafka.server.ControllerApis) org.apache.kafka.common.errors.AuthorizerNotReadyException [2025-07-01 16:16:57,686] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=10, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.11:9093 (id: -2 rack: null)) (org.apache.kafka.raft.KafkaRaftClient)
有知道是怎么回事吗?
这个例子没有的,建议你找对应的认证方式例子操作。