kafka认证和acl

半兽人 发表于: 2016-05-06   最后更新时间: 2018-09-04 23:14:45  
{{totalSubscript}} 订阅, 41,733 游览

7.4 认证和acl

kafka附带一个可插拔的ACL(Access Control List 访问控制列表),它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties中包含以下内容。

allow.everyone.if.no.acl.found=true

你也可以在server.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。

super.users=User:Bob;User:Alice

默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:

principal.builder.class=CustomizedPrincipalBuilderClass

可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L

举个例子,添加规则,将user@MYDOMAIN.COM转换为用户,同时保持默认规则,示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:

选项 描述 默认 类型选择
--add 添加一个acl Action
--remove 移除一个acl Action
--list 列出acl Action
--authorizer authorizer的完全限定类名 kafka.security.auth.SimpleAclAuthorizer Configuration
--authorizer-properties key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 Configuration
--cluster 指定集群作为资源。 Resource
--topic [topic-name] 指定topic作为资源。 Resource
--group [group-name] 指定 consumer-group 作为资源。 Resource
-allow-principal 添加到允许访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--deny-principal 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--allow-host --allow-principal中的principal的IP地址允许访问。 如果--allow-principal指定的默认值是*,则意味着指定“所有主机” Host
--deny-host 允许或拒绝的操作。
有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
ALL Operation
--operation --deny-principal中的principals的IP地址拒绝访问。 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" Host
--producer 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 Convenience
--consumer 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 Convenience
--force 假设所有操作都是yes,规避提示 Convenience

例子

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-hostdeny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。

更新于 2018-09-04
在线,39分钟前登录

yh 4月前

您好!我报了以下错,是因为zookeeper少配置了什么吗?WARN SASL

configuration failed. Willconfiguration failed. Will continue connectionconfiguration failed. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
javax.security.authconfiguration failedconfiguration failed. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
javax.security.auth.login.LoginException: No JAAS configuration sectionguration failed. Willconfiguration failed. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
javax.security.auth.login.LoginException: Noconfiguration failed. Willconfiguration failed. Will continue connection to Zookeeper server withoutconfiguration failed. Will continue connection to Zookeeper serverconfiguration failed. Will continue connection to Zookeeperconfiguration failed. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found was found in specified JAAS configuration file: '/etc/profiles.d/ssl/kafka/kafka_server_jaas.conf'.
        at org.apache.zookeeper.client.ZooKeeperSaslClient.<init>(ZooKeeperSaslClient.java:189)
        at    at org.ClientCnxn$SendThread.startConnect.startConnect(ClientCnxn.java:1161)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1211)
yh -> yh 4月前

不加acl配置时可以正常起,加了authorizer.class.name=kafka.security.authorizer.AclAuthorizer后报该错

半兽人 -> yh 4月前

错误很明显吧:

No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/profiles.d/ssl/kafka/kafka_server_jaas.conf'.

yh -> 半兽人 4月前

实际上我和这位的错误一样:https://www.orchome.com/6808 。加了Client之后报了同样的错误。我是根据这里来配置的:https://www.orchome.com/1966 。不知道这里是不是少了zookeeper相关的配置。

木木&很呆 1年前

kafka 自带zookeeper集群如何添加plain 认证

config/zookeeper.properties中。

config/zookeeper.properties 中添加了如下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

定义了 /opt/kafka/config/zk_server_jaas.conf

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-123"
        user_kafka="kafka-1234"
        user_producer="1234567";
};

zookeeper-server-start.sh 启动脚本改成了:

exec $base_dir/kafka-run-class$base_dirs.sh $EXTRA_ARGS  -Djava.security.auth.loginexec $base_dir/kafka-run-classexec $base_dir/kafka-run-classexec $base_dir/kafka-run-classexec $base_dir/kafka-run-classexec $base_dir/kafka-run-classexec $base_dir/kafka-run-class.sh $EXTRA_ARGS  -Djava.security.auth.login.config=/opt/kafka/config/zk_server_jaas.conf   org.apache.zookeeper.server.quorum.QuorumPeerMainexec $base_dir/kafka-run-classexec $base_dir/kafka-runexec $base_dir/kafka-run-class.sh $EXTRA_ARGS  -Djava.security.auth.login.config=/opt/kafka/config/zk_server_jaas.conf   org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

但是实际配置下来就是配置不生效,ZK 不需要密码仍然可以访问

陈健飞 2年前

Kafka 2.4版本之后kafka.security.auth.SimpleAclAuthorizer 改为:kafka.security.authorizer.AclAuthorizer

半兽人 -> 陈健飞 2年前

感谢提醒,稍后我来更新一下文章。

Simon 4年前

这个授权好像没有设置用户是否有权限创建topic?如果要设置用户是否有权限创建topic应该怎么设置呢?

半兽人 -> Simon 4年前

只有超级用户,才有权限创建。

您好,前一段在您的指导下解决了kafka认证与程序消费问题,这两天想做一个kafka压力测试, 使用的方法是:

./kafka-producer-perf-test.sh --num-records 50000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props bootstrap.servers=server1.hzguode.com:9092,server2.hzguode.com:9092,server3.hzguode.com:9092 --producer.config /hadoop/app/kafka/config/producer.properties

现在遇到一个问题:不加 --producer.config会出现之前的disconnect错误,加上之后出现:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

这个错误明明之前解决了,是不是测试命令不是这么加的?希望您能抽时间帮我看一下,谢谢了!

您好,一直在您这里学习,今天遇到了一个问题,

[zk: localhost:2181(CONNECTED) 0] rmr /brokers/topics/test5
Authentication is not valid : /brokers/topics/test5/partitions

test5 topic 删除的时候报这个错误,之前zookeeper和kafka做过SASL认证,能帮我看下是什么原因么?谢谢

那你zk也要认证登录呀

认证登录 是在命令上面修改么?能具体点么 有点懵....

1、可以采用连接方式:

zookeeper-client/bin/zkCli.sh -server `hostname -f`:2181

2、执行 zkCli.sh之前先执行(可添加到zkCli.sh最前面):

export JVMFLAGS="-Djava.security.auth.login.config=/etc/zookeeper/conf/zookeeper_jaas.conf"(个人采取方式,加入了/etc/profile环境变量)

3、先执行source /etc/zookeeper/conf/zookeeper-env.sh,再执行/usr/hdp/current/zookeeper-client/bin/zkCli.sh

好的 非常感谢 我去操作一下

一样的额,你要加认证的。

嗯嗯 谢谢已经解决了 但是有个新问题了, 就是我用

./kafka-producer-perf-test.sh --num-records 2000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props

这个命令测试时候200W数据没问题,上升到300W时候就报错了:

org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-5: 30942 ms has passed since last append org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-2: 30021 ms has passed since batch creation plus linger time

能不能指个大概方向,网上说的listener改ip,加大request时间都改了貌似不起作用。。。。头晕

过了30秒了,还没未发出。看这里:
https://www.orchome.com/511
搜request关键字。

马踏紫陌 6年前

大神,在javaj客户端配置了

props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");

Caused by: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule

的错

马踏紫陌 6年前

大神,那个控制某个ip某个用户,这个用户假如在java端应该怎么配置

马踏紫陌 6年前

大神,我想问一下这里的用户可以和数据库表里的用户关联起来吗,如果不能的话,这里的用户有什么意义呢,靠什么区分呢

马踏紫陌 6年前

可不可以设置客户端Java KafkaConsumer的权限?

半兽人 -> 马踏紫陌 6年前

可以呀,acl就是控制用户的读写权限的。

马踏紫陌 -> 半兽人 6年前

就是我的java端(10.29.28.207),一个xshell端(10.29.180.131),怎么在131端控制java那边consumer的权限和producer的权限、

User:* has Deny permission for operations: Read from hosts: 10.29.28.207
User:* has Deny permission for operations: Write from hosts: 10.29.28.207
User:* has Allow permission for operations: Read from hosts: 10.29.180.131
User:* has Allow permission for operations: Write from hosts: 10.29.180.131

这个是远程的

String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092");
  Properties props = new Properties();
  props.put("bootstrap.servers", connectionString);
  props.put("group.id", groupid);
  props.put("enable.auto.commit", "false");
  props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000"));
  props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000"));
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  String type="earliest";
  if(type.equals("earliest")){
   props.put("auto.offset.reset", "earliest");
  }

这个是java的,发现还是没有效果

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章