Kafka SASL认证

原创
半兽人 发表于: 2017-05-03   最后更新时间: 2022-01-02 18:29:53  
{{totalSubscript}} 订阅, 36,004 游览

7.3 使用SASL认证

1. JAAS配置

Kafka使用Java认证授权服务(JAAS)进行SASL配置。

  1. 为kafka broker配置JAAS

    KafkaServer是每个KafkaServer/Broker使用的JAAS文件中的名称。本节提供broker的SASL配置选项,包括进行broker之间通信所需的SASL客户端连接。如果将多个listeners配置为SASL,则名称可以在listeners名称前以小写字母开头,后跟一个句点,例如sasl_ssl.KafkaServer

    客户端部分用于验证与zookeeper的SASL连接。它还允许broker在zookeeper节点上设置SASL ACL。并锁定这些节点,以便只有broker可以修改它。所有的broker必须principal名称相同。如果要使用客户端以外的名称,设置zookeeper.sasl.client(例如,-Dzookeeper.sasl.clientconfig=ZkClient)。

    默认情况下,Zookeeper使用 “zookeeper” 作为服务名称。如果你需要修改,设置zookeeper.sasl.client.user(例如,-Dzookeeper.sasl.client.username=zk

    Broker还可以使用sasl.jaas.config配置JAAS。属性名称必须以包括SASL机制的listener前缀为前缀,例如:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config只能指定一个登录模块。 如果listener上配置了多种机制,则listener必须使用和机制前缀为每种机制提供的配置。 例如:

    listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
       username="admin" \
       password="admin-secret";
    listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="admin" \
       password="admin-secret" \
       user_admin="admin-secret" \
       user_alice="alice-secret";
    

    如果在不同级别定义了JAAS配置,则使用的优先级顺序为:

    • broker配置属性listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
    • 静态JAAS配置{listenerName}.KafkaServer
    • 静态JAAS配置KafkaServer

    请注意,只能使用静态JAAS配置来配置ZooKeeper JAAS配置。

  2. 为Kafka client配置JAAS

    客户端可以使用sasl.jaas.config或使用类似broker的静态JAAS配置文件配置JAAS。

    1. 客户端的JAAS配置

      客户端可以将JAAS配置指定给生产者或消费者,无需创建物理配置文件。通过为每个客户端指定不同的属性,此模式还使同一JVM中的不同生产者和消费者可以使用不同的凭据。如果同时指定了静态JAAS配置系统属性java.security.auth.login.config和客户端属性sasl.jaas.config,则将会使用客户端的属性。

      请参见GSSAPI(Kerberos)PLAINSCRAMOAUTHBEARER

    2. 静态文件配置JAAS

      使用静态JAAS配置文件来配置客户端上的SASL认证。

      1. 添加一个名为KafkaClient的客户端登录的JAAS配置文件。 在KafkaClient中为所选机制配置登录模块,如设置GSSAPI(Kerberos)PLAINSCRAM的示例中所述。 例如,GSSAPI凭据可以配置为:
        KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_client.keytab"
         principal="kafka-client-1@EXAMPLE.COM";
        };
        
      2. 将JAAS配置文件位置作为JVM参数传递给每个客户端JVM。 例如:
        -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
        

2. SASL配置

SASL可与PLAINTEXTSSL一起或分别用作安全协议传输层(SASL_PLAINTEXTSASL_SSL,如果使用SASL_SSL,则必须配置SSL)。

  1. SASL机制

    Kafka支持以下的SASL机制:

    • GSSAPI (Kerberos)
    • PLAIN
    • SCRAM-SHA-256
    • SCRAM-SHA-512
    • OAUTHBEARER
  2. 为Kafka broker配置SASL

    1. server.properties配置一个SASL端口,SASL_PLAINTEXT或SASL_SSL添加到listeners中(至少一个),用逗号分隔:

      listeners=SASL_PLAINTEXT://host.name:port
      

      如果你只配置一个SASL端口(或者如果你需要broker使用SASL互相验证),那么需要确保broker之间设置相同的SASL协议:

      security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
      
    2. 选择一个或多个支持的机制,并通过以下的步骤为机器配置SASL。在broker之间启用多个机制:

  3. 为Kafka client配置SASL

    SASL仅支持新的java生产者和消费者,不支持老的API。

    要在客户端上配置SASL验证,选择在broker中启用的客户端身份验证的SASL机制,并按照以下步骤配置所选机制的SASL。

开始SASL认证

  1. SASL之Kerberos认证
  2. SASL之PLAIN认证
  3. SASL之SCRAM认证
  4. 在broker中启用多个SASL机制
  5. 在运行的集群中修改SASL机制
更新于 2022-01-02

Hello Money 3年前

我加了SASL以后 现在用shell 无法访问了,

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group my-first-app  --from-beginning

这是脚本 kafka-console-consumer.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi

if [ "x$KAFKA_OPTS" = "x" ]; then
    echo '加载了加密配置../config/kafka_client_jaas.conf'
    export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/program/kafka_2.13-2.6.0/config/kafka_client_jaas.conf
fi

echo $KAFKA_OPTS
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
[2021-05-20 15:51:31,234] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:31,568] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:32,056] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:32,287] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:32,575] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:33,064] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:33,368] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:33,582] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-20 15:51:34,070] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
3年前

请教个问题,如果想用kafka提供的quota功能(根据user限流),是不是只能使用SASL之PLAIN认证和SASL之SCRAM认证呢?

路人 4年前

如果使用了SASL_PLAINTEXT方式,kafka_server_jaas.conf只配置了kafkaserver的密码没有client,在启动kafka的时候会提示kafka brokers与zookeeper的主机之间未使用sasl认证。 (WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file)。这个现象是正常的吗,SASL_PLAINTEXT方式是否不需要broker和zookeeper的sasl认证?

5年前

你好,请问一下,现在kafka集群开启了sasl认证后,发现同一消费组中的消费者没有退出动作,但是一直在向kafka集群发送认证请求,请问下SASL认证是有时间限制吗?过段时间需要认证一次?

李孟 5年前

请问这个kafka sasl配置完后,本地测试生产消费速度比原来慢很多,消费阻塞住了,这个是怎么回事?

無名 -> 李孟 5年前

数据加密传输,确实会慢点。

李孟 -> 無名 5年前

你好,这个生产者没事,程序阻塞消费一晚上数据没有出来,这是怎么回事?

李孟 -> 無名 5年前

props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

    props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"Alice-123\";");

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test2019"));
    while (true) {
        //这里是得到ConsumerRecords实例
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {

            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} 
無名 -> 李孟 5年前

你这个算是从来没成功过吧。。

Nemo 6年前

在使用Kerberos认证环境下,使用kafka-console-consumer消费数据,当用zookeeper进行消费是出现如下错误。查看源码发现在源码中写死了只能获取Broker安全协议为PLAINTEXT的节点信息。大神,是不是使用zookeeper的方式,不能使用认证方式?

kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found for broker 0
    at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:141)
    at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180)
    at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:180)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
半兽人 -> Nemo 6年前

可以的,只是都要加上认证

马踏紫陌 6年前

大神.,我配置了sasl之后,首先在服务器上不能创建消费增者,报disconnected (org.apache.kafka.clients.NetworkClient)

错误,其次我用java端作为client端System.setProperty("java.security.auth.login.config","E:\CL\kafka_client_jaas.conf");
  props.put("security.protocol","SASL_PLAINTEXT");
  props.put("sasl.mechanism","PLAIN");
配置了,启动之后看不到消费的数据

李孟 -> 马踏紫陌 5年前

问题解决了吗?我本地测试消费,直接阻塞住了

舍不得 -> 李孟 4年前

问题解决了吗?遇到相同的问题

6年前

kafka服务器同时启用kerberos和SSL不行吧?

半兽人 -> 6年前

可以的

邱响 7年前

为什么oracle java要替换成java版本的JCE策略文件,不替换会有什么影响?

半兽人 -> 邱响 7年前

https://www.orchome.com/436
因为地区保护,不替换不会成功的。

7年前

我可不可以配置两个端口,一个使用sasl安全认证,一个不使用安全认证

半兽人 -> 7年前

当然可以了

-> 半兽人 7年前

确认可行吗?

张小生 -> 半兽人 5年前

请问您一个问题,当kafka与zookeeper都启动了sasl/kerberos后,发现向一个不存在的topic生产消息时,kafka不会自动创建这个topic。已检查配置文件,保证开启自动创建topic功能,请问这是为什么?

半兽人 -> 张小生 5年前

都开启了吗?并且发送到不存在topic的客户端也认证通过了吧?可以看下集群是否有报错信息打印,可以贴一下。

张小生 -> 半兽人 5年前

首先已经确认在kafka的配置文件server.properties中添加 auto.create.topics.enable=true。
随后使用命令./kafka-console-producer.sh --broker-list 172.16.101.202:9092 --topic test9 --producer.config ../config/producer.properties
并输入内容,回车后,提示如下报错

[2019-11-15 14:14:08,832] WARN Error while fetching metadata with correlation id 0 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2019-11-15 14:14:09,034] WARN Error while fetching metadata with correlation id 1 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2019-11-15 14:14:09,137] WARN Error while fetching metadata with correlation id 2 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2019-11-15 14:14:09,240] WARN Error while fetching metadata with correlation id 3 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2019-11-15 14:14:09,343] WARN Error while fetching metadata with correlation id 4 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)

zookeeper此时没有任何日志内容。
kafka此时的日志内容:

[2019-11-15 14:09:09,442] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
[2019-11-15 14:09:09,442] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
[2019-11-15 14:09:09,636] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
[2019-11-15 14:09:09,636] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)

kerberos此时的日志内容:

Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): AS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for krbtgt/EXAMPLE.COM@EXAMPLE.COM
Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): TGS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for kafka/nf5466c18-app@EXAMPLE.COM

上述各服务的日志都没有报错,随后我将kafka的日志级别调整至DEBUG时,启动kafka时会循环刷一个报错

[2019-11-15 14:18:03,803] DEBUG Set SASL server state to HANDSHAKE_REQUEST (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2019-11-15 14:18:03,803] DEBUG Handle Kafka request METADATA (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2019-11-15 14:18:03,803] DEBUG Set SASL server state to FAILED (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2019-11-15 14:18:03,803] DEBUG Connection with /172.***.***.202 disconnected (org.apache.kafka.common.network.Selector)
java.io.IOException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake.
        at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:243)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:338)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
        at kafka.network.Processor.poll(SocketServer.scala:476)
        at kafka.network.Processor.run(SocketServer.scala:416)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake.
半兽人 -> 张小生 5年前
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list orchome:9093 --topic test --producer.config config/producer.properties
张小生 -> 半兽人 5年前

已经将包含krb5配置文件的绝对路径和client_jass文件的绝对路径的参数添加到kafka-run-class.sh,并修改了kafka-console-producer.sh脚本,确保成功调用该参数,而且使用生产命令对已存在的topic进行生产时,不会提示任何报错,而且也可以实时消费到。 只是对一个不存在的topic进行生产时,才会出现报错

半兽人 -> 张小生 5年前

主题默认是自创的,先看下这个:
https://issues.apache.org/jira/browse/KAFKA-5458

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章