sparkstreaming消费sasl认证的kafka报配置错误

随风而去 发表于: 2019-05-30   最后更新时间: 2019-05-30 23:28:28   4,153 游览

我在使用spark2.2.2消费sasl认证kafka中的数据时报配置错误(原代码在未进行认证的kafka中正常消费数据),请问这是什么原因。

我已在spark-submit中添加了

--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/kafka/conf/kafka_jaas.conf

并在代码中添加了

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config","/home/kafka/conf/kafka_client_jaas.conf");

代码报错信息为

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
    at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
    at com.datatrans.atmptrans.atmp.AtmpTrans.main(AtmpTrans.java:93)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: 配置错误: 
    行 5: 应为 [option key]
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
    at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.SecurityException: java.io.IOException: 配置错误: 
    行 5: 应为 [option key]
发表于 2019-05-30
添加评论

此问题找到原因了吗? 能发下方便后人排查问题么

这个问题是jaas.conf里的配置有问题

半兽人 -> 半夏🍁 1年前

希望补充的更详细一点 :)

你kafka已经配置好认证了吗?
https://www.orchome.com/500

随风而去 -> 半兽人 5年前

是的,这是公司的kafka集群,已经有spark任务在运行。我的同事第一次提交任务时,也报了和这个大同小异的错误,但是他重新打包调教任务就好了,而我的却不可以,一直在报这个错误。

随风而去 -> 半兽人 5年前

由公司的kafka搭建团队给我们提供认证消费者组和topic名称以及客户端认证文件,我们只负责消费,所以我也没有办法看到kafka集群的配置是怎么样的

半兽人 -> 随风而去 5年前

如果是同样的代码的话,你考虑过jdk版本的问题。
行 5: 应为 [option key]。 这一行是什么?

随风而去 -> 半兽人 5年前

jdk的版本是1.8,spark的UI界面就显示,行5:应为 [option key]。 我也不清楚这具体指什么。

你的答案

查看kafka相关的其他问题或提一个您自己的问题