kafka认证和acl

7.4 认证和acl

kafka附带一个可插拔的ACL(Access Control List 访问控制列表),它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties中包含以下内容。

allow.everyone.if.no.acl.found=true

你也可以在server.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。

super.users=User:Bob;User:Alice

默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:

principal.builder.class=CustomizedPrincipalBuilderClass

可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L

举个例子,添加规则,将user@MYDOMAIN.COM转换为用户,同时保持默认规则,示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:

选项 描述 默认 类型选择
--add 添加一个acl Action
--remove 移除一个acl Action
--list 列出acl Action
--authorizer authorizer的完全限定类名 kafka.security.auth.SimpleAclAuthorizer Configuration
--authorizer-properties key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 Configuration
--cluster 指定集群作为资源。 Resource
--topic [topic-name] 指定topic作为资源。 Resource
--group [group-name] 指定 consumer-group 作为资源。 Resource
-allow-principal 添加到允许访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--deny-principal 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--allow-host --allow-principal中的principal的IP地址允许访问。 如果--allow-principal指定的默认值是*,则意味着指定“所有主机” Host
--deny-host 允许或拒绝的操作。
有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
ALL Operation
--operation --deny-principal中的principals的IP地址拒绝访问。 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" Host
--producer 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 Convenience
--consumer 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 Convenience
--force 假设所有操作都是yes,规避提示 Convenience

例子

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-hostdeny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。







发表于: 1年前   最后更新时间: 15天前   游览量:14904
上一条: kafka SASL验证
下一条: kafka在正在运行的集群中整合安全功能

评论…


  • 可不可以设置客户端Java KafkaConsumer的权限?
    • User:* has Deny permission for operations: Read from hosts: 10.29.28.207
       User:* has Deny permission for operations: Write from hosts: 10.29.28.207
       User:* has Allow permission for operations: Read from hosts: 10.29.180.131
       User:* has Allow permission for operations: Write from hosts: 10.29.180.131 
      这个是远程的
        • String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092");
            Properties props = new Properties();
            props.put("bootstrap.servers", connectionString);
            props.put("group.id", groupid);
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000"));
            props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000"));
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            String type="earliest";
            if(type.equals("earliest")){
             props.put("auto.offset.reset", "earliest");
            }
          这个是java的,发现还是没有效果
            31机器:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 10.29.180.132 --operation Read --topic Test-topic
            bin/kafka-console-producer.sh --broker-list zk2:9092 --topic Test-test
            >test2ettest2
            32机器:
            root@dspdevweb2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server zk3:9092 --topic Test-test --from-beginning
            testtest
            333
            55555
            test2ettest2

            ERROR Error when sending message to topic Test-topic with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
            org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Test-topic]是因为没有配置authorizer.class.name吗

            我启动之后显示认证失败,这是什么问题?
            [2018-04-16 15:53:06,016] ERROR An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.client.ZooKeeperSaslClient)
            [2018-04-16 15:53:06,016] ERROR SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
            [2018-04-16 15:53:06,016] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
            [2018-04-16 15:53:06,016] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
            [2018-04-16 15:53:06,017] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
            org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
            你好,单独设置这个好像没什么用,是否要跟kerberos之类的安全认证一起使用才可以。就算设置了acl,所有的用户还是都可以读写kafka
            楼主,你好。因为我的consumer是动态增加的,而且要以不同的用户名连接broker进行消费。目前的user是通过配置文件在kafka启动的时候配置的SASL, 用户名也是启动时指定的,如果我的kafka已经运行了,能不能不重启kafka而动态增加用户。
            群主我想请教一个问题,望您指导下感谢。
            kafka可否能像hdfs一样使用proxyuser,比如kafka的superuser我指定usera,然后我通过usera  代理创建userb的   producer,完成消息的产生,acl还是userb的?感谢!
            • 群主您好:
              那这种需求是不是需要二次开发了,比如借助redis判断权限,因为我想给kafka平台话,多应用接入,实现kerberos下的权限管理,hadoop在kerberos下是可以代理用户的,kafka我查了部分资料没发现支持(也许我查询的资料过少吧),类似于通过keytab创建代理用户执行操作,问您下这个是不是目前版本还不支持?再次感谢!
                大神,既然ACL与SSL是独立的,那么使用这个--allow-principal User:xxx 参数该如何配置Client用户名呢,通过 kafka_client_jaas.conf 配置吗?
                • 给你个常用的例子吧

                  kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

                  kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation Write --operation Read --allow-principal User:* --allow-host 192.168.0.11 --add --topic test1

                  这样,就可以在11节点上对 topic test1拥有读与写的权限
                    • 这个不是通过IP粒度的吗,如果要通过User:Bob 这样的粒度该如何操作呢,就像上面例子中那个命令:  bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
                        • 大神,你给的这个例子可以达到不用设置sasl就ip粒度的控制吧。我按照评论里的方法,设置了

                          server.properteis中的authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer,然后执行了./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation All --allow-principal User:* --allow-host 172.16.64.132 --add --cluster

                          ,然后按非sasl方式连接kafka失败

                            请问kafka可以按用户隔离数据吗?比如不同的用户可以建相同的topic,一个用户只能看到自己创建的topic?
                            请问这里的用户指的是什么?我没有配置SSL和SASL,直接按照本页的步骤进行设置,用这条命令
                            /kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation All --allow-principal User:* --allow-host 192.168.70.101 --allow-host 192.168.70.102 --add --cluster 
                            给集群配了all权限,但是启动集群的时候还是会发生很多权限问题,比如没有update metadata的权限。但是如果设置了allow.everyone.if.no.acl.found=true,则权限控制好像整个是失效的,任意一台没有acl权限的host上的root用户都可以对集群收发消息。
                            • 我也遇到了你这种问题,设置了allow.everyone.if.no.acl.found=true 后感觉权限就失效了,但是不设置这个 启动的时候又会有一堆错误,请问您后来是怎么解决的
                                • 我添加了 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 这个配置,为什么感觉权限控制也失效的呢,从没有配置权限的机器上也能向 集群里 写入数据
                                  • 评论…
                                    • in this conversation