此文章已经弃用,请访问新地址
7.3 使用SASL验证
1、Kafka brokers的SASL配置
在broker中选择1个或多个支持的机制启用,kafka目前支持的机制有 GSSAPI 和 PLAIN 。
添加一个JAAS文件来配置选择的 GSSAPI(Kerberos)或 PLANIN。
JAAS配置文件位置作为JVM参数传递给每个broker代理。例如:
- Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
在server.properties配置一个SASL端口,增加至少1个SASL_PLAINTEXT或SASL_SSL到
listeners
。用逗号分隔:listeners=SASL_PLAINTEXT://host.name:port
如果使用SASL_SSL,那SSL也必须配置,如果你仅配置SASL端口(或者,如果你需要broker互相使用SASL进行身份验证),那么,确保你设置相同的SASL协议为broker间通讯:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
在server.properties中启用1个或多个SASL机制:
sasl.enabled.mechanisms=GSSAPI (,PLAIN)
如果使用SASL做broker之间通信,在server.properties中配置SASL机制:
sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
按照GSSAPI(Kerberos)或PLAIN 的步骤来启用SASL配置。在broker中启用多种机制,按照下面的步骤进行。
重要提示:
KafkaServer
是在JAAS文件中KafkaServer/Broker
的节名。本节提供SASL配置选项,包括broker间通讯,SASL客户端连接。Client
是用来认证SASL与zookeeper连接。它还允许broker设置 SASL ACL 到zookeeper 节点,锁定这些节点,只有broker可以修改它。它必须在所有的broker中具有相同的principal名称。如果你要使用Client
以外的节名,设置系统属性zookeeper.sasl.client
(例如,-Dzookeeper.sasl.client=ZkClient)。ZooKeeper
使用 "zookeeper" 作为默认的服务器名称。如果你想改变它,设置zookeeper.sasl.client.username
(例如, -Dzookeeper.sasl.client.username=zk).
2、Kafka Client的SASL配置
SASL认证仅支持新的Java生产者和消费者,不支持旧的API。在客户端上配置SASL认证:
- 选择一个SASL机制。
- 为选择的机制(GSSAPI(Kerberos)或PLAIN)添加一个JAAS配置文件。KafkaClient是客户端使用JAAS文件中的节名。
- JAAS配置文件的位置作为每个客户端的JVM的参数,例如:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
- 在
producer.properties
或consumer.properties
配置以下属性:security.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism=GSSAPI (or PLAIN)
- 为所选的机制(GSSAPI(Kerberos)或PLAIN)配置SASL。
3、使用 SASL/Kerberos 认证
1. 预备知识
Kerberos
如果你已在使用Kerberos(例如,通过使用Active Directory),则无需安装新的服务器。否则,你将需要安装一个,Linux有Kerberos安装和配置的简短说明(Ubuntu,Radhat)封装。请注意,如果您使用的是Oracle的Java,你需要下载java版本的JCE策略文件
,将它们复制到$JAVA_HOME/jre/lib/security
中.创建Kerberos Principal
如果你使用的是公司的Kerberos或Active Directory服务器,请向Kerberos管理员询问群集中每个broker的principal以及将使用Kerberos验证(通过客户端和工具)访问Kafka的每个操作系统用户。sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}' sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
确保所有host都可以使用
hostname(主机名)
到达 - Kerberos要求所有的host都可以用他们的FQDN (Fully Qualified Domain Name 完全合格域名/全称域名,是指主机名加上全路径)
解析。
2. 配置Kafka Broker
添加一个JAAS文件,类似下面的每个kafka broker的配置目录。这个例子我们姑且命名为
kafka_server_jaas.conf
(注意,每个broker都应该有自己的密钥表)。KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; }; // Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
在JAAS文件的KafkaServer部分告诉broker使用的
principal
和该principal
存储的keytab(密钥表)
的位置。它允许broker使用keytab登录。更多细节参见zookeeper的SASL配置。通过JAAS和krb5文件位置(可选的)作为JVM参数到每个broker。(更多细节):
-Djava.security.krb5.conf=/etc/kafka/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
确保在JAAS文件的keytabs配置文件可被启动的Broker的操作系统员读取。
在server.properties中配置SASL端口和SASL机制,例如:
listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI
我们还必须在server.properties配置服务器名称,应与匹配broker的principal名,在上面的例子中,principal是"kafka/kafka1.hostname.com@EXAMPLE.com", 所以:
sasl.kerberos.service.name=kafka
3、配置kafka客户端
要在客户端上配置SASL认证:
客户端(生产者,消费者,connect,等等)用自己的principal认证集群(通常用相同名称作为运行客户端的用户)。因此,获取或根据需要创建这些principal。然后,为每个principal创建一个JAAS文件,
KafkaClient
描述了生产者和消费者客户端如何连接到broker。下面是一个客户端使用keytab的配置例子(建议长时间运行的进程)。KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1@EXAMPLE.COM"; };
对于命令行工具,比如
kafka-console-consumer
或kafka-console-producer
,kinit
连同 “useTicketCache=true”使用,如:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };
通过JAAS和krb5文件位置(可选)作为JVM参数(每个客户端的JVM):
-Djava.security.krb5.conf=/etc/kafka/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
确保在kafka_client_jaas.conf的keytab配置能被启动kafka客户端的系统操作员读取。
在producer.properties或consumer.properites配置以下属性:
security.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
4、使用 SASL/PLAIN 认证
SASL/PLAIN是简单的username/password认证机制,通常使用TLS用于加密来实现安全认证。kafka支持一个SASL/PLAIN的默认实现,可作为生产者的扩展使用。
username用作ACL等配置已认证的Principal。
1、配置Kafka Broker
添加JAAS文件,这个例子的配置文件是每个broker的配置目录,命名为 kafka_server_jaas.conf。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };
这个配置定义了2个用户(admin 和 alice)。在
KafkaServer
部分,username
和password
是broker用于初始化连接到其他的broker,在这个例子中,admin
用户为broker间的通讯,user_userName
定义了所有连接到broker和broker验证的所有的客户端连接包括其他broker的用户密码。JAAS配置文件地址作为JVM参数(为每个broker):
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
在server.properites 配置SASL端口和SASL机制(描述),例如:
listeners=SASL_SSL://host.name:port security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN
2、kafka客户端配置
要在客户端上配置SASL验证:
KafkaClient节介绍像生产者和消费者如何连接到broker,以下是对PLAIN机制的客户端配置示例:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; };
KafkaClient部分,username和password是客户端用来配置客户端连接的用户,在这个例子中,客户端使用
alice用户
连接到broker。JAAS配置文件地址作为JVM参数(每个客户端JVM)。
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
在producer.properties或consumer.properties配置以下属性:
security.protocol=SASL_SSL sasl.mechanism=PLAIN
3、在production中使用SASL/PLAN
SASL/PLAIN
应仅用SSL作为传输层,以确保明文密码不会在线上无密传输。默认在JAAS配置文件里实现SASL/PLAIN(kafka指定用户名和密码),为了避免在磁盘上存储密码,你可以自己实现插件
javax.security.auth.spi.LoginModule
(从外部提供用户名和密码)。登录模板实现应该提供用户名作为公共凭证和密码作为subject的私有凭据。默认的实现org.apache.kafka.common.security.plain.PlainLoginModule
可作为一个例子。在production系统中,外部认证服务器可以实现密码验证。Kafka broker可以与这些服务器集成(通过添加自己实现
javax.security.sasl.SaslServer
)。在这个包路径org.apache.kafka.common.security.plain
中,包括在kafka的默认的实现,可以作为一个例子来开始。新的Providers(供应商)必须安装并在JVM中注册。供应商可以通过添加安装提供者类到CLASSPATH或打包一个jar文件并将其添加到JAVA_HOME/lib/ext。
Providers 可以静态的注册添加一个提供者到安全属性文件到
JAVA_HOME/lib/security/java.security
.security.provider.n=providerClassName
其中,providerClassName是新Provider的全名,n是优先排序(较低的数字表明更搞的优先权)
另外,还可以在运行时动态调用注册供应商的
security.addprovider
在客户端应用程序的开始或在登录模块的静态初始化器。例如:Security.addProvider(new PlainSaslServerProvider());
更多细节, 可查看 JCA。
5、在broker中使用多种SASL机制
在指定的JAAS配置文件,KafkaServer部分中启用选择的多种机制,例如:
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };
在server.properties中启用SASL机制:
sasl.enabled.mechanisms=GSSAPI,PLAIN
在server.properties指定SASL安全协议和机制(为broker间通讯):
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
- 按照上面指定的步骤启用GSSAPI(Kerberos)和PLAIN.
6、在运行中的集群修改SASL机制
SASL机制可使用以下顺序在运行中的集群进行修改:
启用新的SASL机制,在server.properties中配置sasl.enabled.mechanisms(为每个broker)。更新JAAS配置文件,包含上面描述的两种机制,逐步增加节点。
重新启动客户端,使用新的机制。
变更broker间通讯机制(如果需要),在server.properties设置
sasl.mechanism.inter.broker.protocol
新机制,并逐步替换集群。要删除旧的机制(如果需要),移除在server.properties中旧的机制,和移除JAAS配置文件旧机制的条目。最后逐步替换集群。
大佬,如何通过api消费有权限组的topic 拜托了。我配置的权限机制是256的
你好,请问一下,现在kafka集群开启了sasl认证后,发现同一消费组中的消费者没有退出动作,但是一直在向kafka集群发送认证请求,请问下SASL认证是有时间限制吗?过段时间需要认证一次?
https://www.orchome.com/1949
https://www.orchome.com/1950
https://www.orchome.com/1951
总有一款适合你。
还有一点 为什么zk server节点需要配置jaas认证?kafka的认证流程是怎样的?
https://www.orchome.com/170
下面的问题能解答一下吗
文章里面都有介绍。zk和kafka集群之间也要认证。
我看了,说的不清楚
这里有个逻辑没有提到,如果zookeeper使用SASL模式的话认证文件要怎么配置? 我参考(https://blog.csdn.net/zhbr_f1/article/details/73549043)看到zk server也要配置一些账号和密码,那这些账号和密码是给谁用呢? 和kafka server中配置的账号和密码有什么关系?
kafka 认证文件中的KafkaServer节创建的哪个用户是给zookeeper server使用的? 如果zookeeper client要联系zk server 要用哪个账号?
群主,有没有方法能连两个带sasl的kafka 啊,不知道怎么设置那个system.setporperty互不影响,就是要从一个sasl 的kafka取数据,然后写入一个sasl的kafka
这.... 没试过,分2个应用启动吧
https://www.orchome.com/270 麻烦老大解答一下评论
你好,这边在启动kafka的时候报了如下一个错误,部署步骤是按照你这个文章一步步走的
[2017-11-28 20:46:18,929] ERROR An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.client.ZooKeeperSaslClient)
[2017-11-28 20:46:18,929] ERROR SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
[2017-11-28 20:46:18,929] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
[2017-11-28 20:46:18,929] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-11-28 20:46:18,931] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kdc的日志如下
Nov 28 20:46:18 devhost33-2 krb5kdc25748: AS_REQ (5 etypes {17 16 23 1 3}) 10.100.1.138: ISSUE: authtime 1511873178, etypes {rep=17 tkt=17 ses=17}, hadoop/devhost33-2@BSFIT for krbtgt/BSFIT@BSFIT
Nov 28 20:46:18 devhost33-2 krb5kdc25748: TGS_REQ (6 etypes {18 17 16 23 1 3}) 10.100.1.138: LOOKING_UP_SERVER: authtime 0, hadoop/devhost33-2@BSFIT for zookeeper/10.100.1.138@BSFIT, Server not found in Kerberos database
你看的这篇文章是老的。
新的在这里:https://www.orchome.com/553
看下我的操作笔记
https://www.orchome.com/500
您好,我想请教一下: 我 server 认证启动起来了,但是 client去连接的时候
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/client.keytab"
principal="kafka/i-7zvl27mr@EXAMPLE.COM"
useTicketCache=true;
};
我使用kinit -k -t /etc/client.keytab kafka/i-7zvl27mr@EXAMPLE.COM 命令是正常的
会报错:Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No key to store
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
这可能是什么原因呢?
No key to store
你好,我现在在一个50台测试集群中使用kafka的kerberos认证。其中4台式broker,那么当我使用yarn-client执行我的程序时,带的参数是
--driver-java-options="-Djava.security.auth.login.config=/home/sss/sss/kafka_client_jaas.conf"
--files /home/sss/keytab/hxxtsx.keytab
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/sss/sss/kafka_client_jaas.conf"
我需要保证这50台机器上都要有/home/sss/sss/kafka_client_jaas.conf这个文件吗?
对
谢谢。