Skip to content

Spark Dynamic Allocation

Auto-Scaling for Spark Clusters


Overview

Spark Dynamic Allocation automatically scales the number of executors based on workload demand, reducing cost during idle periods and ensuring resources during peak loads.


Dynamic Allocation Architecture

How It Works

Key Components:

  • Initial Executors: Starting number of executors
  • Min Executors: Minimum executors to keep running
  • Max Executors: Maximum executors to scale to
  • Executor Idle Timeout: Time before idle executors are released
  • Scheduling Backlog Timeout: Time to wait before requesting more executors

Dynamic Allocation Configuration

Basic Configuration

# Spark dynamic allocation configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "20") \
.config("spark.dynamicAllocation.initialExecutors", "4") \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \
.getOrCreate()
# Configuration parameters:
# - enabled: Enable/disable dynamic allocation
# - minExecutors: Minimum number of executors (keep alive)
# - maxExecutors: Maximum number of executors (scale limit)
# - initialExecutors: Starting number of executors
# - executorIdleTimeout: Seconds before idle executor is removed
# - schedulerBacklogTimeout: Seconds before requesting more executors

Production Configuration

# Production dynamic allocation configuration
spark = SparkSession.builder \
.appName("DynamicAllocationProduction") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "5") \
.config("spark.dynamicAllocation.maxExecutors", "100") \
.config("spark.dynamicAllocation.initialExecutors", "10") \
.config("spark.dynamicAllocation.executorIdleTimeout", "120s") \
.config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s") \
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.shuffle.service.port", "7337") \
.getOrCreate()
# Additional parameters:
# - cachedExecutorIdleTimeout: Timeout for cached data executors (longer)
# - shuffle.service.enabled: Required for dynamic allocation (external shuffle)
# - shuffle.service.port: Port for shuffle service

Dynamic Allocation Strategies

Strategy by Workload

Configuration by Workload

WorkloadMinMaxInitialIdle TimeoutStrategy
Batch ETL220460sConservative
Streaming510010300sAggressive
Ad-hoc queries210230sDefault
ML training4508120sBalanced

Dynamic Allocation Monitoring

Metrics

# Monitor dynamic allocation
from pyspark.sql import SparkSession
import json
def get_allocation_metrics(spark: SparkSession) -> dict:
"""Get dynamic allocation metrics"""
sc = spark.sparkContext
# Get executor info
executor_info = {}
for executor_id, info in sc.statusTracker().getExecutorInfos().items():
if executor_id != "driver":
executor_info[executor_id] = {
'id': executor_id,
'total_cores': info.totalCores(),
'used_cores': info.numRunningTasks(),
'memory_used': info.memoryUsed(),
'max_memory': info.maxMemory(),
'active_tasks': info.numRunningTasks(),
'failed_tasks': info.numFailedTasks(),
'is_active': info.isActive()
}
# Calculate cluster metrics
total_executors = len(executor_info)
active_executors = sum(1 for e in executor_info.values() if e['is_active'])
metrics = {
'total_executors': total_executors,
'active_executors': active_executors,
'idle_executors': total_executors - active_executors,
'total_cores': sum(e['total_cores'] for e in executor_info.values()),
'used_cores': sum(e['used_cores'] for e in executor_info.values()),
'core_utilization': 0.0, # Calculate
'executor_info': executor_info
}
if metrics['total_cores'] > 0:
metrics['core_utilization'] = metrics['used_cores'] / metrics['total_cores']
return metrics
# Example usage
metrics = get_allocation_metrics(spark)
print(json.dumps(metrics, indent=2))

Dynamic Allocation Best Practices

DO

# 1. Enable shuffle service
# Required for dynamic allocation
# 2. Set appropriate timeouts
# Idle timeout: 60-120s for most workloads
# 3. Use external shuffle service
# Standalone shuffle service for reliability
# 4. Monitor executor allocation
# Track scaling behavior
# 5. Set reasonable limits
# Min: 2-5, Max: 20-100 for most workloads

DON’T

# 1. Don't set min too high
# Wastes resources during idle periods
# 2. Don't set max too low
# Limits peak performance
# 3. Don't disable shuffle service
# Dynamic allocation won't work without it
# 4. Don't ignore executor overhead
# Spinning up executors takes time
# 5. Don't use for very short jobs
# Overhead exceeds benefit

Key Takeaways

  1. Auto-scaling: Automatically adjusts executor count
  2. Cost savings: Reduces idle resources by 30-50%
  3. Configuration: Min, max, initial executors
  4. Workload-based: Different configs for batch vs. streaming
  5. Shuffle service: Required for dynamic allocation
  6. Monitoring: Track executor allocation, utilization
  7. Timeouts: 60-120s idle timeout for most workloads
  8. Use When: Variable workloads, batch jobs, cost optimization

Back to Module 7