Job aborted due to stage failure: Task 0.0 had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord

旅行。 发表于: 2017-03-31   最后更新时间: 2017-03-31 20:37:23   9,417 游览

你好,我的程序在yarn-cluster模式下在集群上运行时,报:

17/03/31 19:39:06 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test4, partition = 0, offset = 90, NoTimestampType = -1, checksum = 2321628322, serialized key size = 1, serialized value size = 92, key = 1, value = {"table_name":"itg_student","partition_field":"prov_id,day_id","field_value":"811,20170307"}))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test4, partition = 0, offset = 90, NoTimestampType = -1, checksum = 2321628322, serialized key size = 1, serialized value size = 92, key = 1, value = {"table_name":"itg_student","partition_field":"prov_id,day_id","field_value":"811,20170307"}))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1.org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1(RDD.scala:942)
        at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$apply$34.apply(RDD.scala:944)
        at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$apply$34.apply(RDD.scala:944)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at com.framework.batch.HiveToHiveDynamicSchema$$anonfun$main$3.apply(HiveToHiveDynamicSchema.scala:178)
        at com.framework.batch.HiveToHiveDynamicSchema$$anonfun$main$3.apply(HiveToHiveDynamicSchema.scala:158)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

代码是

kafkaConf = scala.collection.Map(
      "metadata.broker.list" -> broker,
      "bootstrap.servers" -> broker,
      "zookeeper.connect" -> zkkafka,
      "group.id" -> groupid,
      "zookeeper.connection.timeout.ms" -> zkTimeout,
      "security.protocol" -> "SASL_PLAINTEXT",
      "sasl.kerberos.service.name" -> "kafka",
      "sasl.mechanism" -> "GSSAPI",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )

    consumerStrategy = ConsumerStrategies.Subscribe[String, String](Set(topic), kafkaConf)

    linesDStream = KafkaUtils.createDirectStream(
      ssc, LocationStrategies.PreferBrokers, consumerStrategy)


    val r1 = linesDStream.transform { rdd =>
        /**/
        rdd
      }

    r1.foreachRDD(rdd2=>
    rdd2.toLocalIterator.foreach{
    /**/
    })

我的kafka_client_jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="test.keytab"
principal="test@HADOOP.test.test.CN";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   storeKey=true
   keyTab="test.keytab"
   principal="test@HADOOP.test.test.CN";
};

我的spark-submit命令是

spark-submit  
--master yarn-cluster 
--driver-java-options="-Djava.security.auth.login.config=kafka_client_jaas.conf"  
--files kafka_client_jaas.conf,test.keytab 
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" 
--jars
--class

我的同样的代码在测试环境中完全正常,但是换到另一个集群中就会报序列化的异常,我将2个集群的配置文件互相对照了下,也没有发现有什么异常,麻烦各位帮忙看下,万分感谢。

ps:我的kafka是0.10.0.0 spark是1.6.1

发表于 2017-03-31
添加评论

可以试试val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

你的答案

查看kafka相关的其他问题或提一个您自己的问题