9

我有一个 spark 2.0 应用程序,它使用 spark 流(使用 spark-streaming-kafka-0-10_2.11)从 kafka 读取消息。

结构化流看起来真的很酷,所以我想尝试迁移代码,但我不知道如何使用它。

在常规流中,我使用 kafkaUtils 来创建 Dstrean,在我传递的参数中,它是值反序列化器。

在结构化流中,文档说我应该使用 DataFrame 函数进行反序列化,但我无法准确理解这意味着什么。

我查看了诸如this example之类的示例,但是我在Kafka中的Avro对象非常复杂,不能像示例中的String那样简单地转换..

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我得到“数据类型不匹配:无法将 BinaryType 转换为 StructType(StructField(....”

如何反序列化该值?

4

4 回答 4

4

如上所述,从 Spark 2.1.0 开始,批量阅读器支持 avro,但 SparkSession.readStream() 不支持。以下是我如何根据其他响应让它在 Scala 中工作。为简洁起见,我简化了架构。

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })
于 2017-05-10T08:55:13.093 回答
3

我还不太熟悉 Spark 的序列化如何与新的/实验性的结构化流结合使用,但是下面的方法确实有效——尽管我不确定它是否是最好的方法(恕我直言,这种方法看起来有点尴尬'n感觉)。

我将尝试在自定义数据类型(此处:Foo案例类)而不是专门的 Avro 示例中回答您的问题,但我希望它无论如何都会对您有所帮助。这个想法是使用 Kryo 序列化来序列化/反序列化您的自定义类型,请参阅Spark 文档中的调优:数据序列化。

注意:Spark 通过内置(隐式)编码器支持开箱即用的案例类序列化,您可以通过import spark.implicits._. 但是为了这个例子,让我们忽略这个功能。

想象一下,您已将以下Foo案例类定义为您的自定义类型(TL;DR 提示:为防止遇到奇怪的 Spark 序列化投诉/错误,您应该将代码放入单独的Foo.scala文件中):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

现在您有以下结构化流代码来从 Kafka 读取数据,其中输入主题包含消息值为二进制编码的 Kafka 消息,您的目标是基于这些消息值String创建实例(即类似于您的方式) Food 将二进制数据反序列化为 Avro 类的实例):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()

现在我们将值反序列化为自定义类型的实例,Foo我们首先需要为其定义一个隐式Encoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

回到您的 Avro 问题,您需要做的是:

  1. Encoder为您的需求创建适当的。
  2. 替换Foo(new String(row.getAs[Array[Byte]]("value"))为将二进制编码的 Avro 数据反序列化为 Avro POJO 的代码,即从消息值 ( ) 中取出二进制编码的 Avro 数据row.getAs[Array[Byte]]("value")并返回 AvroGenericRecordSpecificCustomAvroObject您在其他地方定义的任何内容的代码。

如果其他人知道更简洁/更好/...的方式来回答 Tal 的问题,我会全神贯注。:-)

也可以看看:

于 2016-11-21T10:23:41.340 回答
2

所以实际上我公司的某个人为我解决了这个问题所以我会在这里为未来的读者发布它..

基本上我错过了 miguno 建议的是解码部分:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)
iter.map(message => {
  val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
  val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
  val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
        ...
  //create an object with the fields extracted from genericRecord
  })
}

现在您可以读取来自 kafka 的消息并像这样解码它们:

val ds = spark
  .readStream
  .format(config.getString(ConfigUtil.inputFormat))
  .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
  .option("subscribe", config.getString(ConfigUtil.subscribeTopic))
  .load()
  .as[KafkaMessage]

val decodedDs  = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))

*KafkaMessage只是一个案例类,其中包含从 Kafka 读取时获得的通用对象(key,value,topic,partition,offset,timestamp)

AvroTo<YourObject>Decoder是一些类,它会在给定模式注册表 url 的情况下解码你的对象。

例如使用 ConfluentKafkaAvroDeserializer和模式注册表。

val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)

// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true

// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false

从这些中调用.deserialize(topicName, bytes).asInstanceOf[GenericRecord]以获取 avro 对象。

希望这可以帮助某人

于 2016-12-14T15:53:26.660 回答
2

使用以下步骤:

  • 定义 Kafka 消息。
  • 定义一个消费者实用程序,它返回一个 YourAvroObject 的数据集。
  • 定义你的逻辑代码。

卡夫卡消息:

case class KafkaMessage(key: String, value: Array[Byte],
                                    topic: String, partition: String, offset: Long, timestamp: Timestamp)

卡夫卡消费者:

import java.util.Collections

import com.typesafe.config.{Config, ConfigFactory}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

import scala.reflect.runtime.universe._


object KafkaAvroConsumer {

  private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer")
  val valueDeserializer = new KafkaAvroDeserializer()
  valueDeserializer.configure(Collections.singletonMap("schema.registry.url",
    conf.getString("schema.registry.url")), false)

  def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = {
    val schema = new Schema.Parser().parse(schemaStr)
    Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value))
  }

  def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag]
  (schemaStr: String)
  (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = {

    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName(appName)
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of KafkaMessage from kafka
    val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option(subscribeType, topics)
      .option("startingOffsets", "earliest")
      .load()
      .as[KafkaMessage]
      .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object.

    ds
  }

}

更新

实用程序:

import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificData

import scala.reflect.runtime.universe._

object Utils {


  def convert[T <: GenericRecord: TypeTag](targetSchema: Schema)(record: AnyRef): T = {
      SpecificData.get.deepCopy(targetSchema, record).asInstanceOf[T]
  }


}
于 2017-06-08T09:08:08.553 回答