Join Strategies
Optimizing Join Performance
Overview
Join strategy selection dramatically impacts query performance. Different join types (broadcast, shuffle, sort-merge) have different performance characteristics based on data size, distribution, and cluster resources.
Join Strategies
Strategy Comparison
Strategy Selection:
| Strategy | Table Sizes | Shuffle | Best For |
|---|---|---|---|
| Broadcast | Small x Large | No | Small dimension tables |
| Shuffle Hash | Large x Large | Yes | Large tables, skewed data |
| Sort-Merge | Large x Large | Yes | Sorted data, large tables |
Broadcast Join
How It Works
Broadcast Join Implementation
-- Broadcast join hint (Spark SQL)
SELECT /*+ BROADCAST(customers) */ o.order_id, o.order_date, o.amount, c.customer_nameFROM orders oJOIN customers c ON o.customer_id = c.customer_id;
-- Spark configurationSET spark.sql.autoBroadcastJoinThreshold = 10485760; -- 10MB
-- Broadcast small dimension table-- Customers < 10MB → Broadcast-- Customers > 10MB → Shuffle joinBroadcast Join Configuration
# Broadcast join configuration with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("BroadcastJoins") \ .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \ .config("spark.sql.broadcastTimeout", "300") \ .getOrCreate()
# Configuration parameters:# - autoBroadcastJoinThreshold: Max size for broadcast (bytes)# - Default: 10MB# - Recommended: 10-50MB (enough for most dimension tables)# - broadcastTimeout: Timeout for broadcasting (seconds)# - Default: 300s# - Increase for large dimension tables
# Use broadcast hintfrom pyspark.sql.functions import broadcast
result = spark.table("orders").join( broadcast(spark.table("customers")), "orders.customer_id = customers.customer_id")Shuffle Hash Join
How It Works
Shuffle Hash Join Configuration
# Shuffle hash join configuration
spark = SparkSession.builder \ .appName("ShuffleJoins") \ .config("spark.sql.join.preferSortMergeJoin", "false") \ .config("spark.sql.shuffle.partitions", "200") \ .getOrCreate()
# Configuration parameters:# - join.preferSortMergeJoin: Prefer sort-merge over shuffle hash# - Default: false# - Set to true for sorted data# - shuffle.partitions: Number of shuffle partitions# - Default: 200# - Increase for large datasets: 2x number of coresSort-Merge Join
How It Works
Sort-Merge Join Configuration
# Sort-merge join configuration
spark = SparkSession.builder \ .appName("SortMergeJoins") \ .config("spark.sql.join.preferSortMergeJoin", "true") \ .config("spark.sql.sortMergeJoinPrefetchEnabled", "true") \ .getOrCreate()
# Configuration parameters:# - join.preferSortMergeJoin: Prefer sort-merge join# - Default: false# - Set to true for pre-sorted data# - sortMergeJoinPrefetchEnabled: Prefetch during merge# - Default: true# - Improves performance for large joinsJoin Optimization
Join Ordering
-- Join order optimization
-- Bad: Cross join then filterSELECT *FROM orders o, customers cWHERE o.customer_id = c.customer_id AND c.country = 'US';
-- Good: Filter first, then joinSELECT *FROM customers cJOIN orders o ON c.customer_id = o.customer_idWHERE c.country = 'US';
-- Spark Catalyst optimizer handles this,-- but explicit filtering helpsJoin Hints
-- Join hints for strategy selection
-- Broadcast join hintSELECT /*+ BROADCAST(dim_table) */ *FROM fact_table fJOIN dim_table d ON f.id = d.id;
-- Shuffle hash join hintSELECT /*+ SHUFFLE_HASH(fact_table, dim_table) */ *FROM fact_table fJOIN dim_table d ON f.id = d.id;
-- Sort-merge join hintSELECT /*+ SORTMERGEJOIN(fact_table, dim_table) */ *FROM fact_table fJOIN dim_table d ON f.id = d.id;Join Best Practices
DO
-- 1. Filter before joining-- Reduce data size early
-- 2. Broadcast small tables-- Dimension tables < 50MB
-- 3. Use appropriate join strategy-- Based on table sizes
-- 4. Denormalize when appropriate-- Avoid joins for hot queries
-- 5. Use join keys with same data types-- Avoid type conversionDON’T
-- 1. Don't join on skewed keys-- Causes data skew
-- 2. Don't use CROSS JOIN unnecessarily-- Cartesian products are expensive
-- 3. Don't broadcast large tables-- Causes OOM errors
-- 4. Don't ignore join ordering-- Affects query plan
-- 5. Don't forget statistics-- Needed for optimizationKey Takeaways
- Broadcast join: Small x Large, no shuffle, fastest
- Shuffle hash: Large x Large, scalable
- Sort-merge: Large x Large, best for sorted data
- Threshold: 10-50MB for broadcast
- Configuration: Set broadcast threshold appropriately
- Hints: Use hints when optimizer fails
- Filter early: Reduce data before joining
- Use When: All joins, performance optimization
Back to Module 7