我有一个 spark 2.0 应用程序,它使用 spark 流(使用 spark-streaming-kafka-0-10_2.11)从 kafka 读取消息。
结构化流看起来真的很酷,所以我想尝试迁移代码,但我不知道如何使用它。
在常规流中,我使用 kafkaUtils 来创建 Dstrean,在我传递的参数中,它是值反序列化器。
在结构化流中,文档说我应该使用 DataFrame 函数进行反序列化,但我无法准确理解这意味着什么。
我查看了诸如this example之类的示例,但是我在Kafka中的Avro对象非常复杂,不能像示例中的String那样简单地转换..
到目前为止,我尝试了这种代码(我在这里看到了另一个问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
我得到“数据类型不匹配:无法将 BinaryType 转换为 StructType(StructField(....”
如何反序列化该值?