kafka使用SASL验证

半兽人 发表于: 2016-06-30   最后更新时间: 2018-09-04 23:19:18  
{{totalSubscript}} 订阅, 43,367 游览

此文章已经弃用,请访问新地址

kafka使用SASL验证

7.3 使用SASL验证

1、Kafka brokers的SASL配置

  1. 在broker中选择1个或多个支持的机制启用,kafka目前支持的机制有 GSSAPI 和 PLAIN 。

  2. 添加一个JAAS文件来配置选择的 GSSAPI(Kerberos)或 PLANIN。

  3. JAAS配置文件位置作为JVM参数传递给每个broker代理。例如:

     - Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  4. 在server.properties配置一个SASL端口,增加至少1个SASL_PLAINTEXT或SASL_SSL到listeners。用逗号分隔:

     listeners=SASL_PLAINTEXT://host.name:port
    

    如果使用SASL_SSL,那SSL也必须配置,如果你仅配置SASL端口(或者,如果你需要broker互相使用SASL进行身份验证),那么,确保你设置相同的SASL协议为broker间通讯:

     security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
    
  5. 在server.properties中启用1个或多个SASL机制:

     sasl.enabled.mechanisms=GSSAPI (,PLAIN)
    
  6. 如果使用SASL做broker之间通信,在server.properties中配置SASL机制:

     sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
    
  7. 按照GSSAPI(Kerberos)或PLAIN 的步骤来启用SASL配置。在broker中启用多种机制,按照下面的步骤进行。

    重要提示:

    1. KafkaServer是在JAAS文件中KafkaServer/Broker的节名。本节提供SASL配置选项,包括broker间通讯,SASL客户端连接。
    2. Client是用来认证SASL与zookeeper连接。它还允许broker设置 SASL ACL 到zookeeper 节点,锁定这些节点,只有broker可以修改它。它必须在所有的broker中具有相同的principal名称。如果你要使用Client以外的节名,设置系统属性zookeeper.sasl.client(例如,-Dzookeeper.sasl.client=ZkClient)。
    3. ZooKeeper 使用 "zookeeper" 作为默认的服务器名称。如果你想改变它,设置zookeeper.sasl.client.username (例如, -Dzookeeper.sasl.client.username=zk).

2、Kafka Client的SASL配置

SASL认证仅支持新的Java生产者和消费者,不支持旧的API。在客户端上配置SASL认证:

  1. 选择一个SASL机制。
  2. 为选择的机制(GSSAPI(Kerberos)或PLAIN)添加一个JAAS配置文件。KafkaClient是客户端使用JAAS文件中的节名。
  3. JAAS配置文件的位置作为每个客户端的JVM的参数,例如:
     -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    
  4. producer.propertiesconsumer.properties 配置以下属性:
     security.protocol=SASL_PLAINTEXT (or SASL_SSL)
     sasl.mechanism=GSSAPI (or PLAIN)
    
  5. 为所选的机制(GSSAPI(Kerberos)或PLAIN)配置SASL。

3、使用 SASL/Kerberos 认证

1. 预备知识
  1. Kerberos
    如果你已在使用Kerberos(例如,通过使用Active Directory),则无需安装新的服务器。否则,你将需要安装一个,Linux有Kerberos安装和配置的简短说明(UbuntuRadhat)封装。请注意,如果您使用的是Oracle的Java,你需要下载java版本的JCE策略文件,将它们复制到 $JAVA_HOME/jre/lib/security中.

  2. 创建Kerberos Principal
    如果你使用的是公司的Kerberos或Active Directory服务器,请向Kerberos管理员询问群集中每个broker的principal以及将使用Kerberos验证(通过客户端和工具)访问Kafka的每个操作系统用户。

    sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
    sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
    
  3. 确保所有host都可以使用hostname(主机名)到达 - Kerberos要求所有的host都可以用他们的FQDN (Fully Qualified Domain Name 完全合格域名/全称域名,是指主机名加上全路径)解析。

2. 配置Kafka Broker
  1. 添加一个JAAS文件,类似下面的每个kafka broker的配置目录。这个例子我们姑且命名为kafka_server_jaas.conf(注意,每个broker都应该有自己的密钥表)。

    KafkaServer {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_server.keytab"
         principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
     };
    
     // Zookeeper client authentication
     Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/security/keytabs/kafka_server.keytab"
        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
     };
    

    在JAAS文件的KafkaServer部分告诉broker使用的principal和该principal存储的keytab(密钥表)的位置。它允许broker使用keytab登录。更多细节参见zookeeper的SASL配置。

  2. 通过JAAS和krb5文件位置(可选的)作为JVM参数到每个broker。(更多细节):

     -Djava.security.krb5.conf=/etc/kafka/krb5.conf
     -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 确保在JAAS文件的keytabs配置文件可被启动的Broker的操作系统员读取。

  4. 在server.properties中配置SASL端口和SASL机制,例如:

     listeners=SASL_PLAINTEXT://host.name:port
     security.inter.broker.protocol=SASL_PLAINTEXT
     sasl.mechanism.inter.broker.protocol=GSSAPI
     sasl.enabled.mechanisms=GSSAPI
    

    我们还必须在server.properties配置服务器名称,应与匹配broker的principal名,在上面的例子中,principal是"kafka/kafka1.hostname.com@EXAMPLE.com", 所以:

     sasl.kerberos.service.name=kafka
    

3、配置kafka客户端

要在客户端上配置SASL认证:

  1. 客户端(生产者,消费者,connect,等等)用自己的principal认证集群(通常用相同名称作为运行客户端的用户)。因此,获取或根据需要创建这些principal。然后,为每个principal创建一个JAAS文件,KafkaClient描述了生产者和消费者客户端如何连接到broker。下面是一个客户端使用keytab的配置例子(建议长时间运行的进程)。

    KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_client.keytab"
         principal="kafka-client-1@EXAMPLE.COM";
     };
    

    对于命令行工具,比如kafka-console-consumerkafka-console-producerkinit连同 “useTicketCache=true”使用,如:

     KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useTicketCache=true;
     };
    
  2. 通过JAAS和krb5文件位置(可选)作为JVM参数(每个客户端的JVM):

     -Djava.security.krb5.conf=/etc/kafka/krb5.conf
     -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    
  3. 确保在kafka_client_jaas.conf的keytab配置能被启动kafka客户端的系统操作员读取。

  4. 在producer.properties或consumer.properites配置以下属性:

     security.protocol=SASL_PLAINTEXT (or SASL_SSL)
     sasl.mechanism=GSSAPI
     sasl.kerberos.service.name=kafka
    

4、使用 SASL/PLAIN 认证

SASL/PLAIN是简单的username/password认证机制,通常使用TLS用于加密来实现安全认证。kafka支持一个SASL/PLAIN的默认实现,可作为生产者的扩展使用。

username用作ACL等配置已认证的Principal。

1、配置Kafka Broker
  1. 添加JAAS文件,这个例子的配置文件是每个broker的配置目录,命名为 kafka_server_jaas.conf。

    KafkaServer {
         org.apache.kafka.common.security.plain.PlainLoginModule required
         username="admin"
         password="admin-secret"
         user_admin="admin-secret"
         user_alice="alice-secret";
     };
    

    这个配置定义了2个用户(adminalice)。在KafkaServer部分,usernamepassword是broker用于初始化连接到其他的broker,在这个例子中,admin用户为broker间的通讯,user_userName定义了所有连接到broker和broker验证的所有的客户端连接包括其他broker的用户密码。

  2. JAAS配置文件地址作为JVM参数(为每个broker):

    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 在server.properites 配置SASL端口和SASL机制(描述),例如:

     listeners=SASL_SSL://host.name:port
     security.inter.broker.protocol=SASL_SSL
     sasl.mechanism.inter.broker.protocol=PLAIN
     sasl.enabled.mechanisms=PLAIN
    
2、kafka客户端配置

要在客户端上配置SASL验证:

  1. KafkaClient节介绍像生产者和消费者如何连接到broker,以下是对PLAIN机制的客户端配置示例:

    KafkaClient {
         org.apache.kafka.common.security.plain.PlainLoginModule required
         username="alice"
         password="alice-secret";
     };
    

    KafkaClient部分,username和password是客户端用来配置客户端连接的用户,在这个例子中,客户端使用alice用户连接到broker。

  2. JAAS配置文件地址作为JVM参数(每个客户端JVM)。

    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    
  3. 在producer.properties或consumer.properties配置以下属性:

     security.protocol=SASL_SSL
     sasl.mechanism=PLAIN
    
3、在production中使用SASL/PLAN
  • SASL/PLAIN 应仅用SSL作为传输层,以确保明文密码不会在线上无密传输。

  • 默认在JAAS配置文件里实现SASL/PLAIN(kafka指定用户名和密码),为了避免在磁盘上存储密码,你可以自己实现插件javax.security.auth.spi.LoginModule(从外部提供用户名和密码)。登录模板实现应该提供用户名作为公共凭证和密码作为subject的私有凭据。默认的实现org.apache.kafka.common.security.plain.PlainLoginModule可作为一个例子。

  • 在production系统中,外部认证服务器可以实现密码验证。Kafka broker可以与这些服务器集成(通过添加自己实现 javax.security.sasl.SaslServer)。在这个包路径org.apache.kafka.common.security.plain中,包括在kafka的默认的实现,可以作为一个例子来开始。

    • 新的Providers(供应商)必须安装并在JVM中注册。供应商可以通过添加安装提供者类到CLASSPATH或打包一个jar文件并将其添加到JAVA_HOME/lib/ext

    • Providers 可以静态的注册添加一个提供者到安全属性文件到 JAVA_HOME/lib/security/java.security.

      security.provider.n=providerClassName
      

      其中,providerClassName是新Provider的全名,n是优先排序(较低的数字表明更搞的优先权)

    • 另外,还可以在运行时动态调用注册供应商的security.addprovider在客户端应用程序的开始或在登录模块的静态初始化器。例如:

      Security.addProvider(new PlainSaslServerProvider());
      
    • 更多细节, 可查看 JCA

5、在broker中使用多种SASL机制

  1. 在指定的JAAS配置文件,KafkaServer部分中启用选择的多种机制,例如:

     KafkaServer {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_server.keytab"
         principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    
         org.apache.kafka.common.security.plain.PlainLoginModule required
         username="admin"
         password="admin-secret"
         user_admin="admin-secret"
         user_alice="alice-secret";
     };
    
  2. 在server.properties中启用SASL机制:

     sasl.enabled.mechanisms=GSSAPI,PLAIN
    
  3. 在server.properties指定SASL安全协议和机制(为broker间通讯):

     security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
     sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
    
  4. 按照上面指定的步骤启用GSSAPI(Kerberos)和PLAIN.

6、在运行中的集群修改SASL机制

SASL机制可使用以下顺序在运行中的集群进行修改:

  1. 启用新的SASL机制,在server.properties中配置sasl.enabled.mechanisms(为每个broker)。更新JAAS配置文件,包含上面描述的两种机制,逐步增加节点。

  2. 重新启动客户端,使用新的机制。

  3. 变更broker间通讯机制(如果需要),在server.properties设置sasl.mechanism.inter.broker.protocol新机制,并逐步替换集群。

  4. 要删除旧的机制(如果需要),移除在server.properties中旧的机制,和移除JAAS配置文件旧机制的条目。最后逐步替换集群。

更新于 2018-09-04

………… 3年前

大佬,如何通过api消费有权限组的topic 拜托了。我配置的权限机制是256的

5年前

你好,请问一下,现在kafka集群开启了sasl认证后,发现同一消费组中的消费者没有退出动作,但是一直在向kafka集群发送认证请求,请问下SASL认证是有时间限制吗?过段时间需要认证一次?

開開新新 6年前

还有一点 为什么zk server节点需要配置jaas认证?kafka的认证流程是怎样的?

開開新新 -> 半兽人 6年前

下面的问题能解答一下吗

半兽人 -> 開開新新 6年前

文章里面都有介绍。zk和kafka集群之间也要认证。

開開新新 -> 半兽人 6年前

我看了,说的不清楚

開開新新 6年前

这里有个逻辑没有提到,如果zookeeper使用SASL模式的话认证文件要怎么配置? 我参考(https://blog.csdn.net/zhbr_f1/article/details/73549043)看到zk server也要配置一些账号和密码,那这些账号和密码是给谁用呢? 和kafka server中配置的账号和密码有什么关系?
kafka 认证文件中的KafkaServer节创建的哪个用户是给zookeeper server使用的? 如果zookeeper client要联系zk server 要用哪个账号?

6年前

群主,有没有方法能连两个带sasl的kafka 啊,不知道怎么设置那个system.setporperty互不影响,就是要从一个sasl 的kafka取数据,然后写入一个sasl的kafka

半兽人 -> 6年前

这.... 没试过,分2个应用启动吧

開開新新 -> 半兽人 6年前

https://www.orchome.com/270 麻烦老大解答一下评论

你好,这边在启动kafka的时候报了如下一个错误,部署步骤是按照你这个文章一步步走的
[2017-11-28 20:46:18,929] ERROR An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.client.ZooKeeperSaslClient)
[2017-11-28 20:46:18,929] ERROR SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
[2017-11-28 20:46:18,929] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
[2017-11-28 20:46:18,929] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-11-28 20:46:18,931] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kdc的日志如下

Nov 28 20:46:18 devhost33-2 krb5kdc25748: AS_REQ (5 etypes {17 16 23 1 3}) 10.100.1.138: ISSUE: authtime 1511873178, etypes {rep=17 tkt=17 ses=17}, hadoop/devhost33-2@BSFIT for krbtgt/BSFIT@BSFIT

Nov 28 20:46:18 devhost33-2 krb5kdc25748: TGS_REQ (6 etypes {18 17 16 23 1 3}) 10.100.1.138: LOOKING_UP_SERVER: authtime 0,  hadoop/devhost33-2@BSFIT for zookeeper/10.100.1.138@BSFIT, Server not found in Kerberos database

你看的这篇文章是老的。

新的在这里:https://www.orchome.com/553



看下我的操作笔记

https://www.orchome.com/500

PK 7年前

您好,我想请教一下: 我 server 认证启动起来了,但是 client去连接的时候

KafkaClient {

        com.sun.security.auth.module.Krb5LoginModule required

        useKeyTab=true

        storeKey=true

        keyTab="/etc/client.keytab"

        principal="kafka/i-7zvl27mr@EXAMPLE.COM"

        useTicketCache=true;

};

我使用kinit -k -t /etc/client.keytab kafka/i-7zvl27mr@EXAMPLE.COM 命令是正常的

会报错:Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No key to store

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)

这可能是什么原因呢?

半兽人 -> PK 7年前

No key to store

旅行。 7年前

你好,我现在在一个50台测试集群中使用kafka的kerberos认证。其中4台式broker,那么当我使用yarn-client执行我的程序时,带的参数是

--driver-java-options="-Djava.security.auth.login.config=/home/sss/sss/kafka_client_jaas.conf"  

--files /home/sss/keytab/hxxtsx.keytab

 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/sss/sss/kafka_client_jaas.conf"



我需要保证这50台机器上都要有/home/sss/sss/kafka_client_jaas.conf这个文件吗?

查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章