你好,我的程序在yarn-cluster模式下在集群上运行时,报:
17/03/31 19:39:06 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test4, partition = 0, offset = 90, NoTimestampType = -1, checksum = 2321628322, serialized key size = 1, serialized value size = 92, key = 1, value = {"table_name":"itg_student","partition_field":"prov_id,day_id","field_value":"811,20170307"}))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test4, partition = 0, offset = 90, NoTimestampType = -1, checksum = 2321628322, serialized key size = 1, serialized value size = 92, key = 1, value = {"table_name":"itg_student","partition_field":"prov_id,day_id","field_value":"811,20170307"}))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1.org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1(RDD.scala:942)
at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$apply$34.apply(RDD.scala:944)
at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$apply$34.apply(RDD.scala:944)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.framework.batch.HiveToHiveDynamicSchema$$anonfun$main$3.apply(HiveToHiveDynamicSchema.scala:178)
at com.framework.batch.HiveToHiveDynamicSchema$$anonfun$main$3.apply(HiveToHiveDynamicSchema.scala:158)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
代码是
kafkaConf = scala.collection.Map(
"metadata.broker.list" -> broker,
"bootstrap.servers" -> broker,
"zookeeper.connect" -> zkkafka,
"group.id" -> groupid,
"zookeeper.connection.timeout.ms" -> zkTimeout,
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"sasl.mechanism" -> "GSSAPI",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
consumerStrategy = ConsumerStrategies.Subscribe[String, String](Set(topic), kafkaConf)
linesDStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferBrokers, consumerStrategy)
val r1 = linesDStream.transform { rdd =>
/**/
rdd
}
r1.foreachRDD(rdd2=>
rdd2.toLocalIterator.foreach{
/**/
})
我的kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="test.keytab"
principal="test@HADOOP.test.test.CN";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="test.keytab"
principal="test@HADOOP.test.test.CN";
};
我的spark-submit命令是
spark-submit
--master yarn-cluster
--driver-java-options="-Djava.security.auth.login.config=kafka_client_jaas.conf"
--files kafka_client_jaas.conf,test.keytab
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
--jars
--class
我的同样的代码在测试环境中完全正常,但是换到另一个集群中就会报序列化的异常,我将2个集群的配置文件互相对照了下,也没有发现有什么异常,麻烦各位帮忙看下,万分感谢。
ps:我的kafka是0.10.0.0 spark是1.6.1
可以试试val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
你的答案