43

我正在尝试有效地加入两个 DataFrame,其中一个较大,第二个较小。

有没有办法避免所有这些洗牌?我无法设置autoBroadCastJoinThreshold,因为它仅支持整数-并且我尝试广播的表略大于整数字节数。

有没有办法强制广播忽略这个变量?

4

6 回答 6

88

Broadcast Hash Joins(类似于Mapreduce 中的 map-side join或 map-side combine):

在 SparkSQL 中,您可以通过调用queryExecution.executedPlan. 与核心 Spark 一样,如果其中一个表比另一个小得多,您可能需要广播哈希连接。broadcast您可以通过在DataFrame加入之前调用方法向 Spark SQL 提示应该广播给定的 DF 以进行加入

例子: largedataframe.join(broadcast(smalldataframe), "key")

在 DWH 术语中,largedataframe 可能类似于事实
smalldataframe 可能类似于维度

正如我最喜欢的书(HPS)所描述的那样。看下面有更好的理解.. 在此处输入图像描述

注:以上broadcast来自import org.apache.spark.sql.functions.broadcast不是来自SparkContext

Spark 还自动使用spark.sql.conf.autoBroadcastJoinThreshold来确定是否应该广播表。

提示:请参阅 DataFrame.explain() 方法

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.

有没有办法强制广播忽略这个变量?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


笔记 :

另一个类似的开箱即用注释 wrt Hive (not spark) :类似的事情可以使用蜂巢提示来实现,MAPJOIN如下所示......

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

延伸阅读:请参考我关于 BHJ, SHJ, SMJ的文章

于 2016-09-09T05:35:22.463 回答
22

您可以通过使用提示要广播的数据帧left.join(broadcast(right), ...)

于 2016-01-21T11:56:33.183 回答
7

设置spark.sql.autoBroadcastJoinThreshold = -1将完全禁用广播。请参阅 Spark SQL、DataFrames 和数据集指南中的其他配置选项

于 2016-11-04T06:55:36.557 回答
3

这是火花的电流限制,请参阅SPARK-6235。2GB 限制也适用于广播变量。

您确定没有其他好方法可以做到这一点,例如不同的分区?

否则,您可以通过手动创建多个每个 <2GB 的广播变量来破解它。

于 2015-09-07T21:55:44.577 回答
1

我发现此代码适用于 Spark 2.11 版本 2.0.0 中的广播加入。

import org.apache.spark.sql.functions.broadcast  

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))

import context.implicits._

employeesDF.join(broadcast(tmpDepartments), 
   $"depId" === $"id",  // join by employees.depID == departments.id 
   "inner").show()

这是上面代码的参考Henning Kropp Blog, Broadcast Join with Spark

于 2017-08-11T14:10:56.840 回答
0

Using join hints will take precedence over the configuration autoBroadCastJoinThreshold, so using a hint will always ignore that threshold.

In addition, when using a join hint the Adaptive Query Execution (since Spark 3.x) will also not change the strategy given in the hint.

In Spark SQL you can apply join hints as shown below:

SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

Note, that the key words BROADCAST, BROADCASTJOIN and MAPJOIN are all aliases as written in the code in hints.scala.

于 2021-04-23T08:20:02.577 回答