storm-kafka跨集群运行kafkaspout,带kerberos认证的kafka数据

汤乐奇 发表于: 2017-12-15   最后更新时间: 2022-05-28 16:57:29   7,107 游览

由于部署环境的限制,storm的zookeeper需要部署在本地,而带kerberos认证的kafka的zookeeper环境部署在生产环境上。生产环境上的kafka集群采用的是kafka-client-0.9.0.0 版本。

现在kafka数据单独进行消费是ok的,即添加三个环境变量参数

-Djava.security.krb5.conf=/usr/local/krb5.conf 
-Djava.security.auth.login.config=/usr/local/xxx.jaas.conf 
-Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com

但是在storm环境运行kafkaspout 添加环境变量参数以后会出现storm本地zookeeper环境的错误:

14078 [main] ERROR o.a.s.d.nimbus - Error on initialization of server service-handler
java.lang.RuntimeException: org.apache.storm.shade.org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /storm
    at org.apache.storm.util$wrap_in_runtime.invoke(util.clj:54) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.zookeeper$create_node.invoke(zookeeper.clj:100) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.zookeeper$mkdirs.invoke(zookeeper.clj:130) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.cluster_state.zookeeper_state_factory$_mkState.invoke(zookeeper_state_factory.clj:32) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.cluster_state.zookeeper_state_factory.mkState(Unknown Source) ~[kafka-storm-2.0.jar:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_79]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_79]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_79]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_79]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[kafka-storm-2.0.jar:?]
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.cluster$mk_distributed_cluster_state.doInvoke(cluster.clj:46) ~[kafka-storm-2.0.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:559) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.cluster$mk_storm_cluster_state.doInvoke(cluster.clj:250) ~[kafka-storm-2.0.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:486) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.daemon.nimbus$nimbus_data.invoke(nimbus.clj:192) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.daemon.nimbus$fn__11005$exec_fn__1364__auto____11006.invoke(nimbus.clj:2416) ~[kafka-storm-2.0.jar:?]
    at clojure.lang.AFn.applyToHelper(AFn.java:156) ~[kafka-storm-2.0.jar:?]
    at clojure.lang.AFn.applyTo(AFn.java:144) ~[kafka-storm-2.0.jar:?]
    at clojure.core$apply.invoke(core.clj:630) ~[kafka-storm-2.0.jar:?]
    at org.apache.storm.daemon.nimbus$fn__11005$service_handler__11038.doInvoke(nimbus.clj:2413) [kafka-storm-2.0.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:421) [kafka-storm-2.0.jar:?]
    at org.apache.storm.testing$mk_local_storm_cluster.doInvoke(testing.clj:156) [kafka-storm-2.0.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:421) [kafka-storm-2.0.jar:?]
    at org.apache.storm.LocalCluster$_init.invoke(LocalCluster.clj:31) [kafka-storm-2.0.jar:?]
    at org.apache.storm.LocalCluster.<init>(Unknown Source) [kafka-storm-2.0.jar:?]
    at example.KafkaStormKafkaTopology.main(KafkaStormKafkaTopology.java:115) [kafka-storm-2.0.jar:?]

并且针对kafka-client-0.9.0.0 环境不添加

-Djava.security.auth.login.config=/usr/local/xxx.jaas.conf

会出现以下错误:

17931 [Thread-26-kafka-spout-executor[4 4]] ERROR o.a.s.d.executor - 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:488) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault.createConsumer(KafkaConsumerFactoryDefault.java:26) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:452) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:445) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:639) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [kafka-storm-0.9-1.0.jar:?]
        at clojure.lang.AFn.run(AFn.java:22) [kafka-storm-0.9-1.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80) ~[kafka-storm-0.9-1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-storm-0.9-1.0.jar:?]
        ... 8 more

所以只能选用kafka-client高版本的包。然后在kafkaspout中的props添加sasl.jaas.config参数,入下所示:

props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                    + "useTicketCache=false "
                    + "renewTicket=true "
                    + "storeKey=true "
                    + "debug=true "
                    + "useKeyTab=true "
                    + "keyTab="/usr/local/src/kafka/origin/conf/user.keytab" "
                    + "principal="" + principal + "";");

这边princpal设置为 xxxxx(租户名)与xxxx@HADOOP.com

然后在运行storm的时候设置

-Djava.security.krb5.conf=/usr/local/krb5.conf

刚开始运行的时候提示认证通过,并且成功订阅topic

17482 [Thread-18-identity-executor[2 2]] INFO  o.a.s.d.executor - Preparing bolt identity:(2)
17482 [Thread-26-kafka-spout-executor[4 4]] INFO  o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = earliest
    bootstrap.servers = [ip:port]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 11
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = kafka
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = SASL_PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

Debug is  true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /usr/local/src/kafka/origin/conf/user.keytab refreshKrb5Config is false principal is xxxxx tryFirstPass is false useFirstPass is false storePass is false clearPass is false

principal is xxxxx@HADOOP.COM
Will use keytab
Commit Succeeded 

17942 [Thread-26-kafka-spout-executor[4 4]] INFO  o.a.s.k.s.NamedSubscription - Kafka consumer subscribed topics [xxxxtopic]

然后问题来了。。。运行的时候报错如下:

19302 [Thread-26-kafka-spout-executor[4 4]] DEBUG o.a.k.c.n.Selector - Connection with 10.78.152.91/10.78.152.91 disconnected
javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTH_FAILED state.
 at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:296) ~[kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslToken(SaslClientAuthenticator.java:213) ~[kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:181) ~[kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:71) ~[kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:303) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [kafka-storm-2.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-storm-2.0.jar:?]
        at org.apache.storm.kafka.spout.NamedSubscription.subscribe(NamedSubscription.java:54) [kafka-storm-2.0.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:454) [kafka-storm-2.0.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:445) [kafka-storm-2.0.jar:?]
        at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:639) [kafka-storm-2.0.jar:?]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [kafka-storm-2.0.jar:?]
        at clojure.lang.AFn.run(AFn.java:22) [kafka-storm-2.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: javax.security.sasl.SaslException: GSS initiate failed
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) ~[?:1.7.0_79]
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:278) ~[kafka-storm-2.0.jar:?]
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:276) ~[kafka-storm-2.0.jar:?]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:1.7.0_79]
        at javax.security.auth.Subject.doAs(Subject.java:415) ~[?:1.7.0_79]
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:276) ~[kafka-storm-2.0.jar:?]
        ... 21 more

感觉好像是hostname没填写,是不是需要将principal设置为

xxxx/hostname@HADOOP.COM

但是填了hostname产生密码不对的报错,求大神帮忙看一下

发表于 2017-12-15
添加评论

occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and socketChannel.socket().getInetAddress().getHostName() must match the hostname in principal/hostname@realm Kafka Client will go to AUTH_FAILED state.

https://www.orchome.com/500
对比一下配置。

你的答案

查看kafka相关的其他问题或提一个您自己的问题