1

我正在使用 Kafka 客户端 0.8 在 Spark 2、CDH 5.9 中运行流式传输作业。简单的目标是将信息保存在 Impala 中,逐条记录。

我无法摆脱这个错误,因为我不知道它来自哪里:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2

Direct Kafka Stream 是由

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
  "group.id" -> "myconsumergroup",
  "auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream  = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)

并由以下人员处理:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()

directKafkaStream.foreachRDD { rdd =>
  val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]

  val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")

    deviceEnriched.show(false)
    spark.sql("use my_database")
      deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}

streamingContext.start()
streamingContext.awaitTermination()
4

1 回答 1

4

简短的回答:消息是使用与您在 Spark 中使用的 JAR 不兼容的 JARcommons-lang3版本序列化的。

长答案:如果你刚刚用谷歌搜索了那个错误信息,然后搜索了 Apache Commons 源代码,你会发现......

  • 这篇文章深入探讨了 Java“类不兼容”序列化问题,一般来说
  • FastDateFormat声明serialVersionUID = 1L直到V3.1但切换到serialVersionUID = 2LV3.2源代码(因为当时二进制结构已经改变)

顺便说一句,我刚刚检查并发现 CDH 5.9commons-lang3V3.1(用于 Hive、Impala、Sentry、Hive-in-Oozie、Sqoop-in-Oozie)和V3.3.2(用于 Spark-in-Oozie)和V3中提供.4(对于 Sqoop),而 Spark 本身根本不需要它。去搞清楚。
而且由于 CDH 还没有随 Spark 2 一起提供,我猜你要么下载了“beta”包,要么下载了 Apache 版本——我检查了,Apache 版本 (V2.0.2) 随commons-lang3 V3.3.2一起提供

我的 2 美分:只需强制--jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar输入 Spark 2 命令行,看看这是否足以解决您的问题。

编辑  额外 2 美分,确保您的“自定义”JAR 优先于 YARN 类路径中已经存在的任何 JAR,使用--conf spark.yarn.user.classpath.first=true

于 2016-12-15T17:18:11.090 回答