kafka Raft实战SASL/SCRAM

半兽人 发表于: 2024-11-05   最后更新时间: 2024-11-14 10:49:07  
{{totalSubscript}} 订阅, 1,156 游览

Kafka服务端

创建,格式化,同时添加账户(关键):

bin/kafka-storage.sh format -t "AAAAAAAAAAAAAAAAAAAAAA" -c config/kraft/server.properties \
  --add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'

认证配置

more config/kraft/server.properties

listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://10.0.19.208:9092

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

## SASL
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.mechanism.controller.protocol=SCRAM-SHA-256

more /etc/kafka/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin";
};

启动Kafka

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/kraft/server.properties

Kafka客户端

示例1:命令行

通过kafka自带的生产者和消费者命令行,来验证认证是否正常。

more /etc/kafka/kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};

consumer.properties 和 producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

生产者和消费者

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties

示例2:JAVA生产者和消费者

下面是Java客户端的使用示例:

生产者

package com.system.kafka.clients.demo.producer.origin;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.0.19.208:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 添加 SASL SCRAM 配置
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");  // 使用 SCRAM-SHA-256
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                "username=\"admin\" " +
                "password=\"admin-secret\";");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            try {
                producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i))).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        producer.close();
    }
}

消费者

package com.system.kafka.clients.demo.producer.origin;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.0.19.208:9092");
        props.put("group.id", "myself");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 添加 SASL SCRAM 配置
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                "username=\"admin\" " +
                "password=\"admin-secret\";");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
        }

    }

}

本例说明文档来自

kafka使用SASL/SCRAM认证

REF

https://docs.confluent.io/platform/current/security/authentication/sasl/scram/overview.html
https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API

更新于 2024-11-14

OneTian1211 4天前

您好,你这个没有加载acl模块的配置吗?
我的只要在配置文件中添加authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer就报错:

[2025-07-01 16:16:57,612] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
[2025-07-01 16:16:57,614] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=9, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.13:9093 (id: -4 rack: null)) (org.apache.kafka.raft.KafkaRaftClient)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-07-01 16:16:57,636] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-07-01 16:16:57,636] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] initialized channel manager. (kafka.server.ControllerRegistrationManager)
[2025-07-01 16:16:57,636] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)
[2025-07-01 16:16:57,637] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] maybeSendControllerRegistration: cannot register yet because the metadata.version is still 3.0-IV1, which does not support KIP-919 controller registration. (kafka.server.ControllerRegistrationManager)
[2025-07-01 16:16:57,637] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)
[2025-07-01 16:16:57,652] INFO [controller-1-to-controller-registration-channel-manager]: Starting (kafka.server.NodeToControllerRequestThread)
[2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,665] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,672] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=1, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=2GZE2nfgQvaYonlRrNSrrw)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.11:38142-0', clientAddress=/10.206.68.11, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@6c15c06c]) (kafka.server.ControllerApis)
org.apache.kafka.common.errors.AuthorizerNotReadyException
[2025-07-01 16:16:57,674] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,683] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=2, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=Rl3w3w2JRnuwNZ78qQXWrQ)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.12:5298-0', clientAddress=/10.206.68.12, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1ed5187]) (kafka.server.ControllerApis)
org.apache.kafka.common.errors.AuthorizerNotReadyException
[2025-07-01 16:16:57,686] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=10, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.11:9093 (id: -2 rack: null)) (org.apache.kafka.raft.KafkaRaftClient)

有知道是怎么回事吗?

半兽人 -> OneTian1211 4天前

这个例子没有的,建议你找对应的认证方式例子操作。

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章