top of page

Apache Spark - Join Principles

A very common scenario for a data engineer is to perform a Join operation on two data sets during data preparation or analysis. In the physical planning stage of Spark, Spark's Join Selection class will select the final Join strategy according to the Join hints strategy, the size of the Join table, whether the Join is equal or unequal, and whether the keys participating in the Join can be sorted, etc., to select the final Join strategy. The Spark execution engine will use the selected Join strategy to perform the operation, finally.


Currently Spark supports a total of five Join strategies:

  • Broadcast hash join (BHJ)

  • Shuffle hash join(SHJ)

  • Shuffle sort merge join (SMJ)

  • Shuffle-and-replicate nested loop join(Cartesian product join)

  • Broadcast nested loop join (BNLJ)

Where BHJ and SMJ are the two most common strategies in use. Join selection first selects one of Broadcast hash join, Shuffle hash join and Shuffle sort merge join. When equivalent Join Key is not specified or no join condition is specified then, Broadcast nested loop join or Shuffle-and-replicate nested loop join are selected.


Each Join strategy has a very different execution efficiency. In this blog, we will help you understand the high level execution process and applicable conditions of each Join strategy.


1. Broadcast Hash Join

Broadcast Hash Join Implementation is to broadcast data of a small table to all executors of spark. Use the collect operator to pull the data of the small table from the Executor side to the Driver side. Call sparkContext.broadcast on the Driver side to broadcast it to all Executor sides. Use the broadcast data on the Executor side to perform the Join operation with the large table (actually perform a map operation). This Join strategy avoids the Shuffle operation. In general, Broadcast Hash Join will execute faster than other Join strategies.


Join Using this strategy must meet the following criteria: small data table must be small, spark.sql.autoBroadcastJoinThreshold can be configured, if the default is 10MB memory is relatively large, may be appropriate to increase the threshold value spark.sql.autoBroadcastJoinThreshold parameter is set to -1, this connection may be closed It can only be used for equivalent Join, and the keys participating are not required to be sortable.


2. Shuffle Hash Join

When the data in the table is relatively large and it is not suitable for broadcasting, you can consider using it at this time Shuffle Hash Join. Shuffle Hash Join is also a strategy chosen when joining large tables and small tables. Its calculation idea is: partition large tables and small tables according to the same partitioning algorithm and number of partitions (partitioned according to the keys participating in the Join), so as to ensure that the data with the same hash value is distributed to the same partition, and then In the same Executor, the partitions with the same hash value of the two tables can be hash joined locally. Hash Map will be constructed for the partition of the small table before the join operation. Shuffle hash join is using the idea of ​​divide and conquer, big problems are broken down into small problems to solve.


To enable Shuffle Hash Join must meet the following conditions:

Only equivalent Join and participating Keys in the join are not necessary be sort. spark.sql.join.preferSortMergeJoin parameter must be set to false, the argument is introduced from Spark 2.0.0 version, the default value is true, that is, choose Sort by default.


The size of the Merge Join small table ( plan.stats.sizeInBytes ) must be less than spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(the default value is 200) and three times the size of the small table (stats.sizeInBytes) must be less than or equal to the size of the large table (stats.sizeInBytes), that is a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes


3. Shuffle Sort Merge Join

The first two Join strategies discussed above, are conditional on the size of the table. If the tables participating in the Join are very large, then you have to consider using Shuffle Sort Merge Join. Shuffle Sort Merge Join Implementation idea: the two tables in accordance join key conducted shuffle to ensure that join key the same recorded value will be divided to record the corresponding partition in the corresponding partition Sort of data for each partition after partition connected no matter how big, You don't need to load all the data on one side into the memory, but lose it when you use it; because the two sequences are in order. Traverse from the beginning, output if the key is the same, if it is different, continue to take the left, otherwise take the right. This greatly improves the sql join stability under large amounts of data .


To enable Shuffle Sort Merge Join must meet the following criteria: Only supports the equivalent Join, and asked to participate in Join the Keys sortable.


4. Cartesian product join

If Spark participate in two Join tables join condition is not specified, it will produce Cartesian product join,results that actually obtained Join. It is the product of the number of rows in the two tables.


5. Broadcast nested loop join

The execution of Broadcast nested loop join can be regarded as the following calculation:


for record_1 in relation_1:

for record_2 in relation_2:


join condition is executed


It can be seen that Broadcast nested loop join will scan a table multiple times in some cases, which is very inefficient. As can be seen from the name, this Join will broadcast the small table based on relevant conditions to reduce the number of table scans.


Broadcast nested loop joinSupports equal and unequal Join, and supports all Join types.


Read more about Apache Spark tuning:


8 views

コメント


bottom of page