4

我在一个项目中使用 spark 2.2 struct streaming 将 kafka msg 读入 oracle 数据库。进入 kafka 的消息流约为每秒 4000-6000 条消息。


当使用 hdfs 文件系统作为接收器目标时,它工作正常。当使用 foreach jdbc writer 时,随着时间的推移会有很大的延迟。我认为滞后是由 foreach 循环引起的。

jdbc sink 类(独立类文件):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "oracle.jdbc.driver.OracleDriver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }

  def process(value: org.apache.spark.sql.Row): Unit = {
    statement.setString(1, value(0).toString)
    statement.setString(2, value(1).toString)
    statement.setString(3, value(2).toString)
    statement.setString(4, value(3).toString)
    statement.executeUpdate()        
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

水槽部分:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
  .option("subscribe", "rawdb.raw_data")
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .filter(some logic).select(some logic) 
  .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

如果我改变最后一行

.writeStream.format("csv")...

进入 jdbc foreach 接收器,如下所示:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()

滞后出现。

我猜这个问题很可能是由 foreach 循环机制引起的——它不是在批处理模式下处理的,就像一个批处理中的几千行一样,作为一个 oracle DBA,我已经微调了 oracle 数据库端,主要是数据库正在等待空闲事件。已经尝试通过设置来避免过度提交connection.setAutoCommit(false),任何建议都将不胜感激。

4

3 回答 3

3

尽管我没有关于您的应用程序中花费时间最长的实际配置文件,但我认为这是因为 usingForeachWriter将在每次运行时有效地关闭并重新打开您的 JDBC 连接,因为这就是ForeachWriter工作原理。

我建议不要使用它,而是Sink为 JDBC 编写一个自定义项,您可以在其中控制连接的打开或关闭方式。

有一个开放的pull request 向 Spark 添加 JDBC 驱动程序,您可以查看它以了解实现的可能方法。

于 2017-11-06T07:29:21.727 回答
2

通过将结果注入另一个 Kafka 主题来解决问题,然后编写另一个从新主题读取的程序,将它们批量写入数据库。

我认为在下一个 spark 版本中,他们可能会提供 jdbc sink 并有一些参数设置批量大小。

主要代码如下:

写到另一个话题:

  .writeStream.format("kafka")
  .option("kafka.bootstrap.servers", "x.x.x.x:9092")
  .option("topic", "fastdbtest")
  .option("checkpointLocation", "/user/root/chk")
  .start()

阅读主题并写入数据库,我正在使用 c3p0 连接池

lines.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
      //get a connection from connection pool
      val conn = ConnManager.getManager.getConnection
      val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)")
      try {
        conn.setAutoCommit(false)
        partitionRecords.foreach(record => {
          insertIntoDB(ps, record)
        }
        )
        ps.executeBatch()
        conn.commit()
      } catch {
        case e: Exception =>{}
        // do some log
      } finally {
        ps.close()
        conn.close()
      }
    })
  }
})
于 2017-11-10T16:22:16.157 回答
1

您是否尝试过使用触发器?

我注意到,当我不使用触发器时,我的 Foreach Sink 会多次打开和关闭与数据库的连接。

writeStream.foreach(writer).start()

但是当我使用触发器时,Foreach 只打开和关闭一次连接,例如处理 200 个查询,当微批处理结束时,它关闭连接,直到收到新的微批处理。

writeStream.trigger(Trigger.ProcessingTime("3 seconds")).foreach(writer).start()

我的用例是从只有一个分区的 Kafka 主题中读取数据,所以我认为 Spark 使用的是一个分区。我不知道这个解决方案是否适用于多个 Spark 分区,但我在这里的结论是 Foreach 在 process 方法中一次(逐行)处理所有微批处理,并且不调用 open() 和 close()对于很多人认为的每一行。

于 2019-02-08T10:08:50.470 回答