kafka认证和acl

7.4 认证和acl

kafka附带一个可插拔的ACL(Access Control List 访问控制列表),它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties中包含以下内容。

allow.everyone.if.no.acl.found=true

你也可以在server.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。

super.users=User:Bob;User:Alice

默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:

principal.builder.class=CustomizedPrincipalBuilderClass

可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L

举个例子,添加规则,将user@MYDOMAIN.COM转换为用户,同时保持默认规则,示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:

选项 描述 默认 类型选择
--add 添加一个acl Action
--remove 移除一个acl Action
--list 列出acl Action
--authorizer authorizer的完全限定类名 kafka.security.auth.SimpleAclAuthorizer Configuration
--authorizer-properties key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 Configuration
--cluster 指定集群作为资源。 Resource
--topic [topic-name] 指定topic作为资源。 Resource
--group [group-name] 指定 consumer-group 作为资源。 Resource
-allow-principal 添加到允许访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--deny-principal 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--allow-host --allow-principal中的principal的IP地址允许访问。 如果--allow-principal指定的默认值是*,则意味着指定“所有主机” Host
--deny-host 允许或拒绝的操作。
有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
ALL Operation
--operation --deny-principal中的principals的IP地址拒绝访问。 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" Host
--producer 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 Convenience
--consumer 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 Convenience
--force 假设所有操作都是yes,规避提示 Convenience

例子

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-hostdeny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。






发表于: 2年前   最后更新时间: 2月前   游览量:16559
上一条: kafka SASL验证
下一条: kafka在正在运行的集群中整合安全功能

评论…


  • 大神,在javaj客户端 配置了props.put("security.protocol","SASL_PLAINTEXT");
      props.put("sasl.mechanism","PLAIN");,报Caused by: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule的错
    大神,那个控制某个ip某个用户,这个用户假如在java端应该怎么配置
    大神,我想问一下这里的用户可以和数据库表里的用户关联起来吗,如果不能的话,这里的用户有什么意义呢,靠什么区分呢
    可不可以设置客户端Java KafkaConsumer的权限?
    • User:* has Deny permission for operations: Read from hosts: 10.29.28.207
       User:* has Deny permission for operations: Write from hosts: 10.29.28.207
       User:* has Allow permission for operations: Read from hosts: 10.29.180.131
       User:* has Allow permission for operations: Write from hosts: 10.29.180.131 
      这个是远程的
        • String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092");
            Properties props = new Properties();
            props.put("bootstrap.servers", connectionString);
            props.put("group.id", groupid);
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000"));
            props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000"));
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            String type="earliest";
            if(type.equals("earliest")){
             props.put("auto.offset.reset", "earliest");
            }
          这个是java的,发现还是没有效果
            31机器:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 10.29.180.132 --operation Read --topic Test-topic
            bin/kafka-console-producer.sh --broker-list zk2:9092 --topic Test-test
            >test2ettest2
            32机器:
            root@dspdevweb2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server zk3:9092 --topic Test-test --from-beginning
            testtest
            333
            55555
            test2ettest2

            ERROR Error when sending message to topic Test-topic with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
            org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Test-topic]是因为没有配置authorizer.class.name吗

            我启动之后显示认证失败,这是什么问题?
            [2018-04-16 15:53:06,016] ERROR An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.client.ZooKeeperSaslClient)
            [2018-04-16 15:53:06,016] ERROR SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
            [2018-04-16 15:53:06,016] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
            [2018-04-16 15:53:06,016] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
            [2018-04-16 15:53:06,017] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
            org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
            你好,单独设置这个好像没什么用,是否要跟kerberos之类的安全认证一起使用才可以。就算设置了acl,所有的用户还是都可以读写kafka
            楼主,你好。因为我的consumer是动态增加的,而且要以不同的用户名连接broker进行消费。目前的user是通过配置文件在kafka启动的时候配置的SASL, 用户名也是启动时指定的,如果我的kafka已经运行了,能不能不重启kafka而动态增加用户。
            群主我想请教一个问题,望您指导下感谢。
            kafka可否能像hdfs一样使用proxyuser,比如kafka的superuser我指定usera,然后我通过usera  代理创建userb的   producer,完成消息的产生,acl还是userb的?感谢!
            • 群主您好:
              那这种需求是不是需要二次开发了,比如借助redis判断权限,因为我想给kafka平台话,多应用接入,实现kerberos下的权限管理,hadoop在kerberos下是可以代理用户的,kafka我查了部分资料没发现支持(也许我查询的资料过少吧),类似于通过keytab创建代理用户执行操作,问您下这个是不是目前版本还不支持?再次感谢!
              • 评论…
                • in this conversation