Cluster Rightsizing
Data-Driven Cluster Sizing
Overview
Cluster rightsizing ensures clusters are appropriately sized for workloads, avoiding over-provisioning (wasted cost) and under-provisioning (poor performance). It’s based on analyzing historical metrics and workload patterns.
Rightsizing Analysis
Metrics to Monitor
Target Metrics:
- CPU: Average 50-70%, Peak < 90%
- Memory: Average 60-80%, Peak < 95%
- Storage: No I/O bottlenecks
- Network: No saturation (< 80%)
Rightsizing Strategies
Spark Cluster Rightsizing
# Spark cluster rightsizing analysis
from pyspark.sql import SparkSessionimport json
def analyze_cluster_metrics(spark: SparkSession) -> dict: """Analyze cluster metrics for rightsizing"""
# Get Spark metrics sc = spark.sparkContext
# Executor memory executor_memory = spark.conf.get("spark.executor.memory", "4g") executor_cores = int(spark.conf.get("spark.executor.cores", "2")) num_executors = int(spark.conf.get("spark.dynamicAllocation.maxExecutors", "10"))
# Get cluster status status = sc.statusTracker()
# Calculate total resources total_memory_gb = int(executor_memory.replace("g", "")) * num_executors total_cores = executor_cores * num_executors
# Get application metrics app_metrics = sc.getExecutorMemoryStatus()
analysis = { 'current_config': { 'executor_memory': executor_memory, 'executor_cores': executor_cores, 'num_executors': num_executors, 'total_memory_gb': total_memory_gb, 'total_cores': total_cores }, 'recommendations': [] }
# CPU utilization avg_cpu = get_avg_cpu_usage() peak_cpu = get_peak_cpu_usage()
if avg_cpu < 50: recommended_executors = max(1, int(num_executors * 0.6)) analysis['recommendations'].append({ 'type': 'reduce_executors', 'current': num_executors, 'recommended': recommended_executors, 'reason': f'Low CPU utilization ({avg_cpu:.1f}%)', 'savings_pct': (1 - recommended_executors / num_executors) * 100 })
# Memory utilization avg_memory = get_avg_memory_usage() peak_memory = get_peak_memory_usage()
if peak_memory < 0.6: recommended_memory = int(int(executor_memory.replace("g", "")) * 0.7) analysis['recommendations'].append({ 'type': 'reduce_memory', 'current': executor_memory, 'recommended': f"{recommended_memory}g", 'reason': f'Low memory usage ({peak_memory:.1%} peak)', 'savings_pct': 30 })
return analysis
# Example usageanalysis = analyze_cluster_metrics(spark)print(json.dumps(analysis, indent=2))EMR Rightsizing
# EMR cluster rightsizing
import boto3
def analyze_emr_cluster(cluster_id: str, region: str = "us-east-1") -> dict: """Analyze EMR cluster for rightsizing"""
client = boto3.client('emr', region_name=region)
# Get cluster info cluster_info = client.describe_cluster(ClusterId=cluster_id) instance_groups = cluster_info['Cluster']['InstanceGroups']
analysis = { 'cluster_id': cluster_id, 'instance_groups': [], 'recommendations': [] }
for group in instance_groups: group_id = group['Id'] group_type = group['InstanceGroupType'] instance_type = group['InstanceType'] instance_count = group['RunningInstanceCount']
# Get cloud watch metrics cloudwatch = boto3.client('cloudwatch', region_name=region)
# CPU utilization metric cpu_stats = cloudwatch.get_metric_statistics( Namespace='AWS/ElasticMapReduce', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': group_id}], StartTime=datetime.now() - timedelta(days=7), EndTime=datetime.now(), Period=3600, Statistics=['Average', 'Maximum'] )
avg_cpu = cpu_stats['Datapoints'][0]['Average'] max_cpu = cpu_stats['Datapoints'][0]['Maximum']
group_info = { 'id': group_id, 'type': group_type, 'instance_type': instance_type, 'count': instance_count, 'avg_cpu': avg_cpu, 'max_cpu': max_cpu }
# Check if over-provisioned if avg_cpu < 50 and group_type == 'CORE': recommended_type = get_smaller_instance_type(instance_type) analysis['recommendations'].append({ 'group_id': group_id, 'type': 'downsize_instance', 'current': instance_type, 'recommended': recommended_type, 'reason': f'Low CPU utilization ({avg_cpu:.1f}%)', 'savings_pct': 30 })
analysis['instance_groups'].append(group_info)
return analysis
def get_smaller_instance_type(instance_type: str) -> str: """Get next smaller instance type"""
# Instance type mapping (simplified) size_order = [ 'metal', '8xlarge', '4xlarge', '2xlarge', 'xlarge', 'large', 'medium', 'small' ]
family = instance_type.split('.')[0] size = instance_type.split('.')[1]
current_index = size_order.index(size) if current_index < len(size_order) - 1: smaller_size = size_order[current_index + 1] return f"{family}.{smaller_size}"
return instance_typeRightsizing Patterns
Vertical Scaling
Horizontal Scaling
Rightsizing by Workload
Batch Workloads
# Batch workload rightsizing
def rightsize_batch_cluster( job_duration_minutes: int, data_size_tb: float, num_tasks: int) -> dict: """Rightsize cluster for batch workload"""
# Estimate required resources # Rule of thumb: 1 core per 1TB data for simple ETL required_cores = max(4, int(data_size_tb))
# Estimate executor memory # Rule of thumb: 4GB per core for most jobs required_memory_gb = required_cores * 4
# Estimate executors required_executors = max(2, int(required_cores / 4))
# Spot instance percentage spot_percentage = 80 # 80% spot, 20% on-demand
return { 'executor_cores': 4, 'executor_memory': '4g', 'num_executors': required_executors, 'spot_instance_percentage': spot_percentage, 'estimated_cost': estimate_batch_cost( required_executors, job_duration_minutes, spot_percentage ) }
def estimate_batch_cost( num_executors: int, duration_minutes: int, spot_percentage: float) -> dict: """Estimate batch job cost"""
# Pricing (simplified) on_demand_per_hour = 0.10 # USD spot_per_hour = 0.03 # USD
duration_hours = duration_minutes / 60
spot_executors = int(num_executors * spot_percentage / 100) on_demand_executors = num_executors - spot_executors
spot_cost = spot_executors * duration_hours * spot_per_hour on_demand_cost = on_demand_executors * duration_hours * on_demand_per_hour
total_cost = spot_cost + on_demand_cost
return { 'spot_cost': spot_cost, 'on_demand_cost': on_demand_cost, 'total_cost': total_cost, 'savings_vs_all_on_demand': total_cost / (num_executors * duration_hours * on_demand_per_hour) }Streaming Workloads
# Streaming workload rightsizing
def rightsize_streaming_cluster( throughput_mb_per_sec: float, latency_requirement_ms: int, peak_multiplier: float = 2.0) -> dict: """Rightsize cluster for streaming workload"""
# Base capacity for current load base_executors = int(throughput_mb_per_sec / 100) # 100MB/sec per executor
# Peak capacity peak_executors = int(base_executors * peak_multiplier)
return { 'min_executors': base_executors, 'max_executors': peak_executors, 'initial_executors': base_executors, 'dynamic_allocation': { 'enabled': True, 'min_executors': base_executors, 'max_executors': peak_executors, 'initial_executors': base_executors }, 'recommended_instance': 'm5.xlarge', # Memory optimized 'executor_memory': '8g', 'executor_cores': 4 }Rightsizing Tools
Tool Comparison
| Tool | Platform | Automation | Cost | Best For |
|---|---|---|---|---|
| AWS Compute Optimizer | AWS | Automatic | Free | AWS services |
| Google Recommender | GCP | Automatic | Free | GCP services |
| Azure Advisor | Azure | Automatic | Free | Azure services |
| Datadog | Multi-cloud | Manual | Paid | Comprehensive monitoring |
| Prometheus/Grafana | Open source | Manual | Free | Custom solutions |
Rightsizing Automation
Automated Rightsizing
# Automated cluster rightsizing
import boto3from datetime import datetime, timedelta
def auto_rightsize_cluster( cluster_id: str, cpu_threshold_low: float = 50, cpu_threshold_high: float = 80, min_size: int = 2, max_size: int = 100): """Automatically rightsize cluster based on metrics"""
client = boto3.client('emr')
# Get current size cluster_info = client.describe_cluster(ClusterId=cluster_id) current_size = cluster_info['Cluster']['InstanceGroups'][0]['RunningInstanceCount']
# Get metrics avg_cpu = get_cluster_cpu_avg(cluster_id, hours=24)
if avg_cpu < cpu_threshold_low and current_size > min_size: # Downsize new_size = max(min_size, int(current_size * 0.8))
client.set_termination_protection( JobFlowIds=[cluster_id], TerminationProtected=False )
client.modify_instance_groups( ClusterId=cluster_id, InstanceGroups=[{ 'InstanceGroupId': cluster_info['Cluster']['InstanceGroups'][0]['Id'], 'InstanceCount': new_size }] )
return {'action': 'downsized', 'from': current_size, 'to': new_size}
elif avg_cpu > cpu_threshold_high and current_size < max_size: # Upsize new_size = min(max_size, int(current_size * 1.2))
client.modify_instance_groups( ClusterId=cluster_id, InstanceGroups=[{ 'InstanceGroupId': cluster_info['Cluster']['InstanceGroups'][0]['Id'], 'InstanceCount': new_size }] )
return {'action': 'upsized', 'from': current_size, 'to': new_size}
else: return {'action': 'no_change', 'size': current_size}Rightsizing Best Practices
DO
# 1. Monitor metrics continuously# CPU, memory, storage, network
# 2. Use auto-scaling when possible# Dynamic allocation for variable workloads
# 3. Right-size regularly# Monthly or quarterly review
# 4. Consider spot instances# 60-80% cost savings
# 5. Test before production# Validate performance after changesDON’T
# 1. Don't oversize "just in case"# Wastes significant cost
# 2. Don't ignore workload patterns# Batch vs. streaming need different approaches
# 3. Don't forget peak loads# Ensure capacity for spikes
# 4. Don't resize too frequently# Causes instability
# 5. Don't ignore memory# OOM errors are worse than slow queriesKey Takeaways
- Target metrics: CPU 50-70%, Memory 60-80%
- Vertical scaling: Change instance size
- Horizontal scaling: Add/remove nodes
- Batch workloads: Size for job duration
- Streaming workloads: Size for peak with buffer
- Automation: Use auto-scaling when possible
- Spot instances: 60-80% cost savings
- Use When: All clusters, regular optimization
Back to Module 7