启用kerberos下sparkstreaming读取kafka数据问题

漂泊的美好 发表于: 2017-06-29   最后更新时间: 2017-06-29 20:24:48   6,726 游览

我得spark版本为2.0.0,kafka版本为0.10

以下是我得代码kafka参数设置部分


Map kafkaParams = new HashMap();
kafkaParams.put("bootstrap.servers", "sd-node39:9092,sd-node40:9092,sd-node41:9092,sd-node42:9092");
kafkaParams.put("zookeeper.connect", "sd-node32:2181/kafka");
kafkaParams.put("metadata.broker.list", "sd-node39:9092,sd-node40:9092,sd-node41:9092,sd-node42:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", group);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name", "kafka");
kafkaParams.put("sasl.mechanism", "GSSAPI");


下面是我spark-submit提交命令
spark-submit \
--master yarn --deploy-mode cluster --driver-memory 3g --executor-memory 1g \
--executor-cores 4 --num-executors 2 \
--files /etc/kafka/kafka_client_jaas.conf,/etc/kafka/kafka.keytab \
--class com.main.SparkStreamingComplete \
--conf "spark.executor.extraJavaOptions= \ Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" \
--driver-java-options="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" \ SparkstreamingReadkafka.jar test


所用到的JAAS文件kafka_client_jaas.conf
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/etc/kafka/kafka.keytab"
principal="kafka@HADOOP.COM";
};


再local模式下可以运行,但是切换到client,cluster模式下,就会出现下列错误
17/06/29 20:08:07 INFO consumer.ConsumerConfig: ConsumerConfig values:
interceptor.classes = null
request.timeout.ms = 40000
check.crcs = true
ssl.truststore.password = null
retry.backoff.ms = 100
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 65536
ssl.key.password = null
ssl.cipher.suites = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.service.name = kafka
ssl.provider = null
session.timeout.ms = 30000
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
max.poll.records = 2147483647
bootstrap.servers = [sd-node39:9092, sd-node40:9092, sd-node41:9092, sd-node42:9092]
client.id =
fetch.max.wait.ms = 500
fetch.min.bytes = 1
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
auto.offset.reset = earliest
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
max.partition.fetch.bytes = 1048576
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
ssl.endpoint.identification.algorithm = null
ssl.keystore.location = null
ssl.truststore.location = null
exclude.internal.topics = true
ssl.keystore.password = null
metrics.sample.window.ms = 30000
security.protocol = SASL_PLAINTEXT
metadata.max.age.ms = 300000
auto.commit.interval.ms = 5000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
group.id = test
enable.auto.commit = true
metric.reporters = []
ssl.truststore.type = JKS
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
heartbeat.interval.ms = 3000

17/06/29 20:08:07 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
at com.main.SparkStreamingComplete.main(SparkStreamingComplete.java:152)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: Configuration Error:
Line 6: expected [option key]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.SecurityException: Configuration Error:
Line 6: expected [option key]
at com.sun.security.auth.login.ConfigFile.(ConfigFile.java:110)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47)
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 25 more
Caused by: java.io.IOException: Configuration Error:
Line 6: expected [option key]
at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:439)
at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
at com.sun.security.auth.login.ConfigFile.(ConfigFile.java:108)
... 40 more


这个问题看了一周多了,跪求大神给指导下!感激不敬

发表于 2017-06-29
添加评论

换成集群模式后,确保下启动的命令或jvm参数,有没有加载认证。

你的答案

查看kafka相关的其他问题或提一个您自己的问题