confluent-kafka-sparkstreaming
1
Error deserializing key/value for partition historysignal-1 at offset 0. If needed, please seek past the record to continue consumption, Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1 Caused by: java.lang.ArrayIndexOutOfBoundsException: 21
2
val kafkaParam = Map[String,Object](
"bootstrap.servers" -> "10.20.201.99:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "xgtestlocal",
"auto.offset.reset" -> "latest",
// "max,poll,records" -> "10000",
"enable.auto.commit" -> (false: java.lang.Boolean),
"schema.registry.url" -> "https://10.20.201.99:8081"
)
3
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition historysignal-1 at offset 0. If needed, please seek past the record to continue consumption.Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1Caused by: java.lang.ArrayIndexOutOfBoundsException: 21
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
自己指定解析方式吧
我是用了kafkaAeroDeserilizer解析,也还是出错。不知道您说的自己指定是什么意思?
你自定义解析的话,把错误日志打出来,肯定是你的消息格式不对呀。导致解析失败。
我这边用的是confluent到kafka,转换类为Avro。如果confluent与sparkstreaming同时启动不报错,但是kafka里面数据多,再启动sparkstreaming就会出现上述错误。
这不科学吧,kafka最为中间层,上层与下层是解耦关系,积累再多消息也无关。你多关注下sparkstreaming获取的数据是不是有问题。
你的答案