由于部署环境的限制,storm的zookeeper需要部署在本地,而带kerberos认证的kafka的zookeeper环境部署在生产环境上。生产环境上的kafka集群采用的是kafka-client-0.9.0.0 版本。
现在kafka数据单独进行消费是ok的,即添加三个环境变量参数
-Djava.security.krb5.conf=/usr/local/krb5.conf
-Djava.security.auth.login.config=/usr/local/xxx.jaas.conf
-Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
但是在storm环境运行kafkaspout 添加环境变量参数以后会出现storm本地zookeeper环境的错误:
14078 [main] ERROR o.a.s.d.nimbus - Error on initialization of server service-handler
java.lang.RuntimeException: org.apache.storm.shade.org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /storm
at org.apache.storm.util$wrap_in_runtime.invoke(util.clj:54) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.zookeeper$create_node.invoke(zookeeper.clj:100) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.zookeeper$mkdirs.invoke(zookeeper.clj:130) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.cluster_state.zookeeper_state_factory$_mkState.invoke(zookeeper_state_factory.clj:32) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.cluster_state.zookeeper_state_factory.mkState(Unknown Source) ~[kafka-storm-2.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[kafka-storm-2.0.jar:?]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.cluster$mk_distributed_cluster_state.doInvoke(cluster.clj:46) ~[kafka-storm-2.0.jar:?]
at clojure.lang.RestFn.invoke(RestFn.java:559) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.cluster$mk_storm_cluster_state.doInvoke(cluster.clj:250) ~[kafka-storm-2.0.jar:?]
at clojure.lang.RestFn.invoke(RestFn.java:486) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.daemon.nimbus$nimbus_data.invoke(nimbus.clj:192) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.daemon.nimbus$fn__11005$exec_fn__1364__auto____11006.invoke(nimbus.clj:2416) ~[kafka-storm-2.0.jar:?]
at clojure.lang.AFn.applyToHelper(AFn.java:156) ~[kafka-storm-2.0.jar:?]
at clojure.lang.AFn.applyTo(AFn.java:144) ~[kafka-storm-2.0.jar:?]
at clojure.core$apply.invoke(core.clj:630) ~[kafka-storm-2.0.jar:?]
at org.apache.storm.daemon.nimbus$fn__11005$service_handler__11038.doInvoke(nimbus.clj:2413) [kafka-storm-2.0.jar:?]
at clojure.lang.RestFn.invoke(RestFn.java:421) [kafka-storm-2.0.jar:?]
at org.apache.storm.testing$mk_local_storm_cluster.doInvoke(testing.clj:156) [kafka-storm-2.0.jar:?]
at clojure.lang.RestFn.invoke(RestFn.java:421) [kafka-storm-2.0.jar:?]
at org.apache.storm.LocalCluster$_init.invoke(LocalCluster.clj:31) [kafka-storm-2.0.jar:?]
at org.apache.storm.LocalCluster.<init>(Unknown Source) [kafka-storm-2.0.jar:?]
at example.KafkaStormKafkaTopology.main(KafkaStormKafkaTopology.java:115) [kafka-storm-2.0.jar:?]
并且针对kafka-client-0.9.0.0 环境不添加
-Djava.security.auth.login.config=/usr/local/xxx.jaas.conf
会出现以下错误:
17931 [Thread-26-kafka-spout-executor[4 4]] ERROR o.a.s.d.executor -
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:488) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault.createConsumer(KafkaConsumerFactoryDefault.java:26) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:452) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:445) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:639) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [kafka-storm-0.9-1.0.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [kafka-storm-0.9-1.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80) ~[kafka-storm-0.9-1.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-storm-0.9-1.0.jar:?]
... 8 more
所以只能选用kafka-client高版本的包。然后在kafkaspout中的props添加sasl.jaas.config参数,入下所示:
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false "
+ "renewTicket=true "
+ "storeKey=true "
+ "debug=true "
+ "useKeyTab=true "
+ "keyTab="/usr/local/src/kafka/origin/conf/user.keytab" "
+ "principal="" + principal + "";");
这边princpal设置为 xxxxx(租户名)与xxxx@HADOOP.com
然后在运行storm的时候设置
-Djava.security.krb5.conf=/usr/local/krb5.conf
刚开始运行的时候提示认证通过,并且成功订阅topic
17482 [Thread-18-identity-executor[2 2]] INFO o.a.s.d.executor - Preparing bolt identity:(2)
17482 [Thread-26-kafka-spout-executor[4 4]] INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [ip:port]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 11
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /usr/local/src/kafka/origin/conf/user.keytab refreshKrb5Config is false principal is xxxxx tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is xxxxx@HADOOP.COM
Will use keytab
Commit Succeeded
17942 [Thread-26-kafka-spout-executor[4 4]] INFO o.a.s.k.s.NamedSubscription - Kafka consumer subscribed topics [xxxxtopic]
然后问题来了。。。运行的时候报错如下:
19302 [Thread-26-kafka-spout-executor[4 4]] DEBUG o.a.k.c.n.Selector - Connection with 10.78.152.91/10.78.152.91 disconnected
javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTH_FAILED state.
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:296) ~[kafka-storm-2.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslToken(SaslClientAuthenticator.java:213) ~[kafka-storm-2.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:181) ~[kafka-storm-2.0.jar:?]
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:71) ~[kafka-storm-2.0.jar:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350) [kafka-storm-2.0.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:303) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [kafka-storm-2.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-storm-2.0.jar:?]
at org.apache.storm.kafka.spout.NamedSubscription.subscribe(NamedSubscription.java:54) [kafka-storm-2.0.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:454) [kafka-storm-2.0.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:445) [kafka-storm-2.0.jar:?]
at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:639) [kafka-storm-2.0.jar:?]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [kafka-storm-2.0.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [kafka-storm-2.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: javax.security.sasl.SaslException: GSS initiate failed
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) ~[?:1.7.0_79]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:278) ~[kafka-storm-2.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:276) ~[kafka-storm-2.0.jar:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.7.0_79]
at javax.security.auth.Subject.doAs(Subject.java:415) ~[?:1.7.0_79]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:276) ~[kafka-storm-2.0.jar:?]
... 21 more
感觉好像是hostname没填写,是不是需要将principal设置为
xxxx/hostname@HADOOP.COM
但是填了hostname产生密码不对的报错,求大神帮忙看一下