7.3 使用SASL认证
1. JAAS配置
Kafka使用Java认证
和授权服务(JAAS)
进行SASL配置。
为kafka broker配置JAAS
KafkaServer
是每个KafkaServer/Broker
使用的JAAS文件中的名称。本节提供broker的SASL配置选项,包括进行broker之间通信所需的SASL客户端连接。如果将多个listeners配置为SASL,则名称可以在listeners名称前以小写字母开头,后跟一个句点,例如sasl_ssl.KafkaServer
。客户端部分用于验证与zookeeper的SASL连接。它还允许broker在zookeeper节点上设置
SASL ACL
。并锁定这些节点,以便只有broker可以修改它。所有的broker必须principal名称相同。如果要使用客户端以外的名称,设置zookeeper.sasl.client
(例如,-Dzookeeper.sasl.clientconfig=ZkClient
)。默认情况下,Zookeeper使用 “zookeeper” 作为服务名称。如果你需要修改,设置
zookeeper.sasl.client.user
(例如,-Dzookeeper.sasl.client.username=zk
)Broker还可以使用
sasl.jaas.config
配置JAAS。属性名称必须以包括SASL机制的listener前缀为前缀,例如:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
只能指定一个登录模块。 如果listener上配置了多种机制,则listener必须使用和机制前缀为每种机制提供的配置。 例如:listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="admin-secret"; listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_alice="alice-secret";
如果在不同级别定义了JAAS配置,则使用的优先级顺序为:
- broker配置属性
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
- 静态JAAS配置
{listenerName}.KafkaServer
- 静态JAAS配置
KafkaServer
请注意,只能使用静态JAAS配置来配置
ZooKeeper JAAS
配置。- broker配置属性
为Kafka client配置JAAS
客户端可以使用
sasl.jaas.config
或使用类似broker的静态JAAS配置文件配置JAAS。客户端的JAAS配置
客户端可以将JAAS配置指定给生产者或消费者,无需创建物理配置文件。通过为每个客户端指定不同的属性,此模式还使同一JVM中的不同生产者和消费者可以使用不同的凭据。如果同时指定了静态JAAS配置系统属性
java.security.auth.login.config
和客户端属性sasl.jaas.config
,则将会使用客户端的属性。静态文件配置JAAS
使用静态JAAS配置文件来配置客户端上的SASL认证。
- 添加一个名为
KafkaClient
的客户端登录的JAAS配置文件。 在KafkaClient中为所选机制配置登录模块,如设置GSSAPI(Kerberos),PLAIN或SCRAM的示例中所述。 例如,GSSAPI凭据可以配置为:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1@EXAMPLE.COM"; };
- 将JAAS配置文件位置作为JVM参数传递给每个客户端JVM。 例如:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
- 添加一个名为
2. SASL配置
SASL可与PLAINTEXT
或SSL
一起或分别用作安全协议传输层(SASL_PLAINTEXT
或SASL_SSL
,如果使用SASL_SSL,则必须配置SSL)。
SASL机制
Kafka支持以下的SASL机制:
- GSSAPI (Kerberos)
- PLAIN
- SCRAM-SHA-256
- SCRAM-SHA-512
- OAUTHBEARER
为Kafka broker配置SASL
在
server.properties
配置一个SASL端口,SASL_PLAINTEXT或SASL_SSL添加到listeners
中(至少一个),用逗号分隔:listeners=SASL_PLAINTEXT://host.name:port
如果你只配置一个SASL端口(或者如果你需要broker使用SASL互相验证),那么需要确保broker之间设置相同的SASL协议:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
选择一个或多个支持的机制,并通过以下的步骤为机器配置SASL。在broker之间启用多个机制:
为Kafka client配置SASL
SASL仅支持新的java生产者和消费者,不支持老的API。
要在客户端上配置SASL验证,选择在broker中启用的客户端身份验证的SASL机制,并按照以下步骤配置所选机制的SASL。
我加了SASL以后 现在用shell 无法访问了,
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group my-first-app --from-beginning
这是脚本 kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi if [ "x$KAFKA_OPTS" = "x" ]; then echo '加载了加密配置../config/kafka_client_jaas.conf' export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/program/kafka_2.13-2.6.0/config/kafka_client_jaas.conf fi echo $KAFKA_OPTS exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
[2021-05-20 15:51:31,234] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:31,568] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:32,056] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:32,287] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:32,575] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:33,064] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:33,368] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:33,582] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-05-20 15:51:34,070] WARN [Consumer clientId=consumer-my-first-app-1, groupId=my-first-app] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
请教个问题,如果想用kafka提供的quota功能(根据user限流),是不是只能使用SASL之PLAIN认证和SASL之SCRAM认证呢?
如果使用了SASL_PLAINTEXT方式,kafka_server_jaas.conf只配置了kafkaserver的密码没有client,在启动kafka的时候会提示kafka brokers与zookeeper的主机之间未使用sasl认证。 (WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file)。这个现象是正常的吗,SASL_PLAINTEXT方式是否不需要broker和zookeeper的sasl认证?
你好,请问一下,现在kafka集群开启了sasl认证后,发现同一消费组中的消费者没有退出动作,但是一直在向kafka集群发送认证请求,请问下SASL认证是有时间限制吗?过段时间需要认证一次?
请问这个kafka sasl配置完后,本地测试生产消费速度比原来慢很多,消费阻塞住了,这个是怎么回事?
数据加密传输,确实会慢点。
你好,这个生产者没事,程序阻塞消费一晚上数据没有出来,这是怎么回事?
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"Alice-123\";"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test2019")); while (true) { //这里是得到ConsumerRecords实例 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
你这个算是从来没成功过吧。。
在使用Kerberos认证环境下,使用kafka-console-consumer消费数据,当用zookeeper进行消费是出现如下错误。查看源码发现在源码中写死了只能获取Broker安全协议为PLAINTEXT的节点信息。大神,是不是使用zookeeper的方式,不能使用认证方式?
kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found for broker 0 at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:141) at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180) at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:180) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
可以的,只是都要加上认证
大神.,我配置了sasl之后,首先在服务器上不能创建消费增者,报disconnected (org.apache.kafka.clients.NetworkClient)
配置了,启动之后看不到消费的数据
问题解决了吗?我本地测试消费,直接阻塞住了
问题解决了吗?遇到相同的问题
kafka服务器同时启用kerberos和SSL不行吧?
可以的
为什么oracle java要替换成java版本的JCE策略文件,不替换会有什么影响?
https://www.orchome.com/436
因为地区保护,不替换不会成功的。
我可不可以配置两个端口,一个使用sasl安全认证,一个不使用安全认证
当然可以了
确认可行吗?
请问您一个问题,当kafka与zookeeper都启动了sasl/kerberos后,发现向一个不存在的topic生产消息时,kafka不会自动创建这个topic。已检查配置文件,保证开启自动创建topic功能,请问这是为什么?
都开启了吗?并且发送到不存在topic的客户端也认证通过了吧?可以看下集群是否有报错信息打印,可以贴一下。
首先已经确认在kafka的配置文件server.properties中添加 auto.create.topics.enable=true。
随后使用命令./kafka-console-producer.sh --broker-list 172.16.101.202:9092 --topic test9 --producer.config ../config/producer.properties
并输入内容,回车后,提示如下报错
[2019-11-15 14:14:08,832] WARN Error while fetching metadata with correlation id 0 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) [2019-11-15 14:14:09,034] WARN Error while fetching metadata with correlation id 1 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) [2019-11-15 14:14:09,137] WARN Error while fetching metadata with correlation id 2 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) [2019-11-15 14:14:09,240] WARN Error while fetching metadata with correlation id 3 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) [2019-11-15 14:14:09,343] WARN Error while fetching metadata with correlation id 4 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
zookeeper此时没有任何日志内容。
kafka此时的日志内容:
[2019-11-15 14:09:09,442] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler) [2019-11-15 14:09:09,442] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler) [2019-11-15 14:09:09,636] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler) [2019-11-15 14:09:09,636] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
kerberos此时的日志内容:
Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): AS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for krbtgt/EXAMPLE.COM@EXAMPLE.COM Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): TGS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for kafka/nf5466c18-app@EXAMPLE.COM
上述各服务的日志都没有报错,随后我将kafka的日志级别调整至DEBUG时,启动kafka时会循环刷一个报错
[2019-11-15 14:18:03,803] DEBUG Set SASL server state to HANDSHAKE_REQUEST (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-11-15 14:18:03,803] DEBUG Handle Kafka request METADATA (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-11-15 14:18:03,803] DEBUG Set SASL server state to FAILED (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-11-15 14:18:03,803] DEBUG Connection with /172.***.***.202 disconnected (org.apache.kafka.common.network.Selector) java.io.IOException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake. at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:243) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:338) at org.apache.kafka.common.network.Selector.poll(Selector.java:291) at kafka.network.Processor.poll(SocketServer.scala:476) at kafka.network.Processor.run(SocketServer.scala:416) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake.
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" bin/kafka-console-producer.sh --broker-list orchome:9093 --topic test --producer.config config/producer.properties
已经将包含krb5配置文件的绝对路径和client_jass文件的绝对路径的参数添加到kafka-run-class.sh,并修改了kafka-console-producer.sh脚本,确保成功调用该参数,而且使用生产命令对已存在的topic进行生产时,不会提示任何报错,而且也可以实时消费到。 只是对一个不存在的topic进行生产时,才会出现报错
主题默认是自创的,先看下这个:
https://issues.apache.org/jira/browse/KAFKA-5458