2

用例:我在 spark 中有一个小表(约 1000 行)和一个巨大的 hive 表(200 亿条记录)。让我们将小表称为基础,将大表称为主表。现在,基表有一个“id”列,我需要从主表中获取所有记录,其中main.external_id等于base.id。external_id 和 id 列都只有唯一值。

问题显而易见的方法是将基表注册为spark中的临时表,并使用类似的东西

sparkSession.sql("select * from base_table JOIN main_table ON base_table.id = main_table.external_id")

然而,这意味着 spark 会从巨大的 hive 表中获取所有行,并带入内存,考虑到我们只需要大约 1000 行,我觉得这非常昂贵。我正在寻找一种方法来最小化这种网络数据传输。

我试过的

  1. 分区/分桶:这是我们想到的第一个选项,但两者都不可行,因为当列具有离散值(如城市/国家)时分区更好,而“id”列是唯一键列。对于分桶,问题是我们需要创建大量的桶,这意味着大量的文件会产生一些问题。

  2. 通过 Hiveserver2 进行 JDBC 查询:到目前为止,我们可以通过 JDBC 驱动程序对 Hive 引擎进行读取查询。我想知道是否有办法将基表从 spark 发送到 hive 引擎并在那里执行广播连接,这样网络 shuffle 只涉及较小的 table,我们不需要将较大的 table 带到 spark记忆。但是,我找不到任何可以帮助实现这一点的东西。

(显然我们可以先将基表写入 hive,然后再进行连接,但根据我从团队获得的信息,hive 写入的性能效率不高,过去几乎没有引起过问题)

有人对我上面提到的问题有任何解决方案吗?或者如果有另一种方法来达到这个结果?

PS:我使用的是 spark 2.3.2,并且对于 spark-sql、spark-hive 和 hive-jdbc jar 具有相同的版本。

4

1 回答 1

0

如果只需要主表值,可以使用“in”子句:

val ids = base_table.select("id").as(Encoders.INT).collect().mkString(",")
sparkSession.sql(s"select * from  main_table where external_id in ($ids)")
于 2021-08-06T13:54:49.933 回答