Spark Dynamic Allocation
Auto-Scaling for Spark Clusters
Overview
Spark Dynamic Allocation automatically scales the number of executors based on workload demand, reducing cost during idle periods and ensuring resources during peak loads.
Dynamic Allocation Architecture
How It Works
Key Components:
- Initial Executors: Starting number of executors
- Min Executors: Minimum executors to keep running
- Max Executors: Maximum executors to scale to
- Executor Idle Timeout: Time before idle executors are released
- Scheduling Backlog Timeout: Time to wait before requesting more executors
Dynamic Allocation Configuration
Basic Configuration
# Spark dynamic allocation configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("DynamicAllocation") \ .config("spark.dynamicAllocation.enabled", "true") \ .config("spark.dynamicAllocation.minExecutors", "2") \ .config("spark.dynamicAllocation.maxExecutors", "20") \ .config("spark.dynamicAllocation.initialExecutors", "4") \ .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \ .config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \ .getOrCreate()
# Configuration parameters:# - enabled: Enable/disable dynamic allocation# - minExecutors: Minimum number of executors (keep alive)# - maxExecutors: Maximum number of executors (scale limit)# - initialExecutors: Starting number of executors# - executorIdleTimeout: Seconds before idle executor is removed# - schedulerBacklogTimeout: Seconds before requesting more executorsProduction Configuration
# Production dynamic allocation configuration
spark = SparkSession.builder \ .appName("DynamicAllocationProduction") \ .config("spark.dynamicAllocation.enabled", "true") \ .config("spark.dynamicAllocation.minExecutors", "5") \ .config("spark.dynamicAllocation.maxExecutors", "100") \ .config("spark.dynamicAllocation.initialExecutors", "10") \ .config("spark.dynamicAllocation.executorIdleTimeout", "120s") \ .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s") \ .config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \ .config("spark.shuffle.service.enabled", "true") \ .config("spark.shuffle.service.port", "7337") \ .getOrCreate()
# Additional parameters:# - cachedExecutorIdleTimeout: Timeout for cached data executors (longer)# - shuffle.service.enabled: Required for dynamic allocation (external shuffle)# - shuffle.service.port: Port for shuffle serviceDynamic Allocation Strategies
Strategy by Workload
Configuration by Workload
| Workload | Min | Max | Initial | Idle Timeout | Strategy |
|---|---|---|---|---|---|
| Batch ETL | 2 | 20 | 4 | 60s | Conservative |
| Streaming | 5 | 100 | 10 | 300s | Aggressive |
| Ad-hoc queries | 2 | 10 | 2 | 30s | Default |
| ML training | 4 | 50 | 8 | 120s | Balanced |
Dynamic Allocation Monitoring
Metrics
# Monitor dynamic allocation
from pyspark.sql import SparkSessionimport json
def get_allocation_metrics(spark: SparkSession) -> dict: """Get dynamic allocation metrics"""
sc = spark.sparkContext
# Get executor info executor_info = {} for executor_id, info in sc.statusTracker().getExecutorInfos().items(): if executor_id != "driver": executor_info[executor_id] = { 'id': executor_id, 'total_cores': info.totalCores(), 'used_cores': info.numRunningTasks(), 'memory_used': info.memoryUsed(), 'max_memory': info.maxMemory(), 'active_tasks': info.numRunningTasks(), 'failed_tasks': info.numFailedTasks(), 'is_active': info.isActive() }
# Calculate cluster metrics total_executors = len(executor_info) active_executors = sum(1 for e in executor_info.values() if e['is_active'])
metrics = { 'total_executors': total_executors, 'active_executors': active_executors, 'idle_executors': total_executors - active_executors, 'total_cores': sum(e['total_cores'] for e in executor_info.values()), 'used_cores': sum(e['used_cores'] for e in executor_info.values()), 'core_utilization': 0.0, # Calculate 'executor_info': executor_info }
if metrics['total_cores'] > 0: metrics['core_utilization'] = metrics['used_cores'] / metrics['total_cores']
return metrics
# Example usagemetrics = get_allocation_metrics(spark)print(json.dumps(metrics, indent=2))Dynamic Allocation Best Practices
DO
# 1. Enable shuffle service# Required for dynamic allocation
# 2. Set appropriate timeouts# Idle timeout: 60-120s for most workloads
# 3. Use external shuffle service# Standalone shuffle service for reliability
# 4. Monitor executor allocation# Track scaling behavior
# 5. Set reasonable limits# Min: 2-5, Max: 20-100 for most workloadsDON’T
# 1. Don't set min too high# Wastes resources during idle periods
# 2. Don't set max too low# Limits peak performance
# 3. Don't disable shuffle service# Dynamic allocation won't work without it
# 4. Don't ignore executor overhead# Spinning up executors takes time
# 5. Don't use for very short jobs# Overhead exceeds benefitKey Takeaways
- Auto-scaling: Automatically adjusts executor count
- Cost savings: Reduces idle resources by 30-50%
- Configuration: Min, max, initial executors
- Workload-based: Different configs for batch vs. streaming
- Shuffle service: Required for dynamic allocation
- Monitoring: Track executor allocation, utilization
- Timeouts: 60-120s idle timeout for most workloads
- Use When: Variable workloads, batch jobs, cost optimization
Back to Module 7