我正在尝试有效地加入两个 DataFrame,其中一个较大,第二个较小。
有没有办法避免所有这些洗牌?我无法设置autoBroadCastJoinThreshold
,因为它仅支持整数-并且我尝试广播的表略大于整数字节数。
有没有办法强制广播忽略这个变量?
我正在尝试有效地加入两个 DataFrame,其中一个较大,第二个较小。
有没有办法避免所有这些洗牌?我无法设置autoBroadCastJoinThreshold
,因为它仅支持整数-并且我尝试广播的表略大于整数字节数。
有没有办法强制广播忽略这个变量?
在 SparkSQL 中,您可以通过调用queryExecution.executedPlan
. 与核心 Spark 一样,如果其中一个表比另一个小得多,您可能需要广播哈希连接。broadcast
您可以通过在DataFrame
加入之前调用方法向 Spark SQL 提示应该广播给定的 DF 以进行加入
例子:
largedataframe.join(broadcast(smalldataframe), "key")
在 DWH 术语中,largedataframe 可能类似于事实
smalldataframe 可能类似于维度
正如我最喜欢的书(HPS)所描述的那样。看下面有更好的理解..
注:以上broadcast
来自import org.apache.spark.sql.functions.broadcast
不是来自SparkContext
Spark 还自动使用spark.sql.conf.autoBroadcastJoinThreshold
来确定是否应该广播表。
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
有没有办法强制广播忽略这个变量?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
笔记 :
另一个类似的开箱即用注释 wrt Hive (not spark) :类似的事情可以使用蜂巢提示来实现,
MAPJOIN
如下所示......
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
延伸阅读:请参考我关于 BHJ, SHJ, SMJ的文章
您可以通过使用提示要广播的数据帧left.join(broadcast(right), ...)
设置spark.sql.autoBroadcastJoinThreshold = -1
将完全禁用广播。请参阅
Spark SQL、DataFrames 和数据集指南中的其他配置选项。
我发现此代码适用于 Spark 2.11 版本 2.0.0 中的广播加入。
import org.apache.spark.sql.functions.broadcast
val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF
// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))
import context.implicits._
employeesDF.join(broadcast(tmpDepartments),
$"depId" === $"id", // join by employees.depID == departments.id
"inner").show()
Using join hints will take precedence over the configuration autoBroadCastJoinThreshold
, so using a hint will always ignore that threshold.
In addition, when using a join hint the Adaptive Query Execution (since Spark 3.x) will also not change the strategy given in the hint.
In Spark SQL you can apply join hints as shown below:
SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
Note, that the key words BROADCAST, BROADCASTJOIN and MAPJOIN are all aliases as written in the code in hints.scala.