Spark - Types of Join
There are primarily three types of joins in Spark:
- Broadcast hash join
- Shuffle hash join
- Sort merge join
Broadcast Hash Join
A Broadcast Hash Join is a specific type of join operation in Spark and other distributed data processing frameworks. It's employed when one of the DataFrames involved in the join operation is compact enough to reside entirely within the memory of each worker node in a cluster. In this scenario, the smaller DataFrame is broadcast to all worker nodes and is loaded into the memory of individual executor JVMs on those worker nodes. This approach ensures that a copy of the smaller DataFrame is available locally on each executor for processing, eliminating the need to transfer data over the network to individual executors. As a result, the join is executed efficiently without the requirement for data shuffling or redistribution of the larger DataFrame.
Here's a breakdown of how a Broadcast Hash Join works:
-
Identify a Small DataFrame: One of the DataFrames involved in the join (usually the smaller one) is identified as a candidate for broadcasting. The smaller DataFrame is typically read and processed by the driver node.
-
Broadcasting: The smaller DataFrame is broadcast to all worker nodes in the cluster. This means that a copy of the entire smaller DataFrame is loaded into the memory of each worker node. On each worker node, the broadcasted smaller DataFrame is then loaded into the memory of individual executor JVMs.
-
Join Operation: The join operation is performed locally on each worker node. Each worker node combines the broadcasted smaller DataFrame with its portion of the larger DataFrame.
-
Final Result: The results of the join operation on each worker node are aggregated, resulting in the final joined DataFrame.
Shuffle Hash Join:
The shuffle hash join is the most basic type of join and is dervied from the joins in MapReduce. It's an optimized approach to perform joins in a distributed and parallelized manner, especially when the data is too large to fit into memory on a single machine.
Here's how a shuffle hash join works:
- Partitioning: Initially, the two DataFrames that you want to join are partitioned into several partitions based on a common column (the join key). The number of partitions can be configured in Spark. Each partition contains a subset of the data. For this example, let's assume we have two DataFrames: sales and customers, and we want to join them on a common column called "customer_id."
sales DataFrame: customers DataFrame:
+-------------+--------+ +-------------+--------+
| customer_id | amount | | customer_id | name |
+-------------+--------+ +-------------+--------+
| 1 | 100 | | 1 | Alice |
| 2 | 150 | | 2 | Bob |
| 3 | 200 | | 4 | Carol |
| 1 | 50 | | 5 | Dave |
+-------------+--------+ +-------------+--------+
- Hashing: A hash function (generally MurMur3 hash) is applied to the join key in each DataFrame, and the result of this hash function determines the destination partition for each row. Rows with the same hash value will end up in the same partition. This ensures that rows with the same join key end up in the same partition across DataFrames.
Number of Partitions = 2
Hashing for sales:
+-------------+--------+
| customer_id | amount |
+-------------+--------+
| 1 | 100 | (Partition 0)
| 2 | 150 | (Partition 0)
| 3 | 200 | (Partition 1)
| 1 | 50 | (Partition 0)
+-------------+--------+
Hashing for customers:
+-------------+--------+
| customer_id | name |
+-------------+--------+
| 1 | Alice | (Partition 0)
| 2 | Bob | (Partition 0)
| 4 | Carol | (Partition 0)
| 5 | Dave | (Partition 1)
+-------------+--------+
- Shuffling: The hash-partitioned data is then shuffled and exchanged between worker nodes in the cluster. Data with the same hash value, which represents the same join key, is sent to the same node. This shuffling operation can be expensive in terms of data transfer and network overhead.
Shuffled Data (based on customer_id):
Partition 0:
+-------------+--------+ +-------------+---------+
| customer_id | amount | | customer_id | name |
+-------------+--------+ +-------------+---------+
| 1 | 100 | | 1 | Alice |
| 1 | 50 | | 2 | Bob |
| 2 | 150 | | 4 | Carol |
+-------------+--------+ +-------------+---------+
Partition 1:
+-------------+--------+ +-------------+---------+
| customer_id | amount | | customer_id | name |
+-------------+--------+ +-------------+---------+
| 3 | 200 | | 5 | Dave |
+-------------+--------+ +-------------+---------+
- Local Join: On each worker node, local joins are performed between the shuffled partitions. This means that for each distinct hash value (join key), data from both datasets is joined locally. This is an efficient process because it avoids the need to move data across the network during the join operation.
Local Join within Partition 0:
+-------------+--------+--------+
| customer_id | amount | name |
+-------------+--------+--------+
| 1 | 100 | Alice |
| 1 | 50 | Alice |
| 2 | 150 | Bob |
| 4 | NULL | Carol | (No matching sales data)
+-------------+--------+--------+
Local Join within Partition 1:
+-------------+--------+--------+
| customer_id | amount | name |
+-------------+--------+--------+
| 3 | 200 | NULL | (No matching customer data)
| 5 | NULL | Dave |
+-------------+--------+--------+
-
Aggregation: After local joins, you may have multiple rows with the same join key on each worker node. These rows are aggregated into a single output row for the final result. The aggregation operation depends on the specific join type, such as INNER JOIN, LEFT JOIN, RIGHT JOIN, or FULL OUTER JOIN. The handling of NULL values also varies with the join type.
-
Final Result: The locally aggregated data from all worker nodes is then collected and merged to produce the final result of the join. This may involve further aggregation or filtering depending on the join type.
Final Aggregated DataFrame:
+-------------+--------+--------+
| customer_id | amount | name |
+-------------+--------+--------+
| 1 | 100 | Alice |
| 1 | 50 | Alice |
| 2 | 150 | Bob |
| 3 | 200 | NULL |
| 4 | NULL | Carol |
| 5 | NULL | Dave |
+-------------+--------+--------+
Shuffle hash join is efficient for large-scale data processing because it minimizes the amount of data transferred over the network and takes advantage of parallelism in a distributed computing environment. However, it can be resource-intensive due to the shuffling step, which involves substantial data movement and serialization/deserialization operations. Therefore, it's important to tune the configuration of the join operation and cluster resources to achieve optimal performance.
As in MapReduce, the shuffle hash join works best when data is not skewed and evenly distributed among the keys. However, it's worth noting that data skew, which occurs when a few keys have significantly more data than others, can impact performance. In cases of data skew, Spark does attempt to handle the skew by redistributing data to balance the workload, but excessive skew can still lead to performance issues. Therefore, it's important to monitor and address data skew in your Spark applications to maintain efficient and balanced processing.
Sort-Merge Join vs Shuffle Hash Join:
The choice between Sort-Merge Join and Shuffle Hash Join in Spark depends on the specific characteristics of your data and the join operation you need to perform. Both methods have their advantages and trade-offs. Here's a comparison of the two:
Sort-Merge Join:
-
Data Sorting: Sort-Merge Join requires data to be sorted on the join key in advance. If the data is already sorted, this method can be highly efficient, as it avoids the need for shuffling during the join.
-
Better for Sorted Data: It is particularly well-suited when your input DataFrames are already sorted on the join key, as it minimizes the need for data movement and shuffling.
-
Two Shuffling Steps: As you mentioned, Sort-Merge Join may involve two shuffling steps, one for sorting and one for the merge. This can add overhead, especially if the data is not already sorted.
Shuffle Hash Join:
-
Dynamic Hashing: Shuffle Hash Join dynamically calculates hash codes for partitioning, which can be more memory-efficient and requires less pre-processing compared to sorting.
-
One Shuffling Step: Shuffle Hash Join typically requires a single shuffling step for redistributing data based on hash codes. This can reduce the overhead compared to two shuffling steps in Sort-Merge Join.
-
Better for Unsorted Data: It can be a better choice when the data is not sorted because it avoids the initial sorting requirement of the Sort-Merge Join.
-
Memory-Efficient: Shuffle Hash Join may be more memory-efficient because it doesn't require sorting the entire dataset.
In summary, the choice between Sort-Merge Join and Shuffle Hash Join depends on your specific use case. Sort-Merge Join is advantageous when your data is already sorted, as it minimizes shuffling during the join. Shuffle Hash Join is more flexible and can be better for unsorted data. The presence of two shuffling steps in Sort-Merge Join may increase overhead, but it can still be efficient when sorting is a natural part of your data processing pipeline.
The decision also depends on factors such as data size, cluster resources, and performance considerations. You may need to test both methods on your specific data and workload to determine which one performs better for your use case.
© Waqar Ahmed.RSS