在公司搭建单机版本的zk和kafka,zookeeper以及kakfa开启kerberos认证后,集群可以正常搭建,kafka发送数据及消费数据进行kerberos认证,但是在利用kakfa 服务端api在连接zookeeper后,可以删除主题和新建节点,不需要进行kerberos认证,很奇怪这是什么原因,麻烦有知道的给指点一下吧。我把我的集群中相关配置贴出来:
zookeeper相关配置
zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/ceshi/zookeeper/data
dataLogDir=/home/ceshi/zookeeper/log
clientPort=22181
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
sessionRequireClientSASLAuth=true
requireClientAuthScheme=sasl
zk_server_jaas.conf
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/ceshi/zookeeper.keytab"
principal="zookeeper/bigdata1@TEST.COM"
userTicketCache=false;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/ceshi/zk-client.keytab"
principal="zk-client@TEST.COM"
userTicketCache=false;
};
java.env
export SERVER_JVMFLAGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf"
export CLIENT_JVMFLAGS="${CLIENT_JVMFLAGS} -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf -Dzookeeper.server.principal=zookeeper/bigdata1@TEST.COM"
#export JVMFLAGS="-Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf"
kafka相关配置:
server.properties
listeners=SASL_PLAINTEXT://bigdata1:29092
advertised.listeners=SASL_PLAINTEXT://bigdata1:29092
port=29092
host.name=bigdata1
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
sasl.kerberos.service.name=kafka
kafka_server_jaas.conf
KafkaServer{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/ceshi/kafka.keytab"
principal="kafka/bigdata1@TEST.COM";
};
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/ceshi/kafka.keytab"
principal="kafka/bigdata1@TEST.COM"
userTicketCache=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="/home/ceshi/zk-client.keytab"
principal="zk-client@TEST.COM";
};
kafka-server-start.sh
最后末尾添加:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/kafka/kafka_2.11-1.1.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
kakfa api程序如下:
pom.xml如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
package com.ieslab.cn.kaftest;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
public class TestConsumer {
public static void main(String[] args) throws Exception {
ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils.
isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ;
zkUtils.close();
}
}
在这个程序中可以,进行新建和删除主题,引用的的kakfa服务端的pom,很奇怪,大家又遇同样情况的吗?
我没有测试过admin接口,但是如果你的消费者和生产的代码需要认证的话,那服务器的认证是有效的。
你去看看kafka的日志,是否有认证相关的日志输出。
都有的,我的意思是说为什么利用java api连接zk还是可以新建主题和删除主题呢?我前提是zk已经加了认证。
不走zk,调用是走kafka的呀,kafka验证过了,流程如下:
那我这里不是直接调用的zk吗?zk直接能获取kakfa里面的topics信息,不用进行认证就可以直接删除主题。
ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils. isZkSecurityEnabled()); AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ; zkUtils.close();
你看下引用的包,都是kafka包下的呀。
那既然引用的kakfa包下的,那我上面的代码不更应该需要直接走kerberos认证才能删除代码吗?为何我不用认证kerberos,就可以直接删除主题呢?我想问的是这个。
你整个java客户端都没有认证?那你执行
看看,如果这个也能查到,我就怀疑你的认证是否开启成功了。
参考:使用Java管理kafka集群
利用kakfa_clients的java api连接认证的kerberos kafka集群是无法进行主题列表和创建主题的,但是我在pom.xml直接引入如下包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> </dependency>
利用服务端的kakfa连接zookeeper是可以不用认证的的,利用zk可以直接创建主题或者删除主题,及时下面的代码。
ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils. isZkSecurityEnabled()); AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ; zkUtils.close();
你的答案