Skip to content

Cluster Rightsizing

Data-Driven Cluster Sizing


Overview

Cluster rightsizing ensures clusters are appropriately sized for workloads, avoiding over-provisioning (wasted cost) and under-provisioning (poor performance). It’s based on analyzing historical metrics and workload patterns.


Rightsizing Analysis

Metrics to Monitor

Target Metrics:

  • CPU: Average 50-70%, Peak < 90%
  • Memory: Average 60-80%, Peak < 95%
  • Storage: No I/O bottlenecks
  • Network: No saturation (< 80%)

Rightsizing Strategies

Spark Cluster Rightsizing

# Spark cluster rightsizing analysis
from pyspark.sql import SparkSession
import json
def analyze_cluster_metrics(spark: SparkSession) -> dict:
"""Analyze cluster metrics for rightsizing"""
# Get Spark metrics
sc = spark.sparkContext
# Executor memory
executor_memory = spark.conf.get("spark.executor.memory", "4g")
executor_cores = int(spark.conf.get("spark.executor.cores", "2"))
num_executors = int(spark.conf.get("spark.dynamicAllocation.maxExecutors", "10"))
# Get cluster status
status = sc.statusTracker()
# Calculate total resources
total_memory_gb = int(executor_memory.replace("g", "")) * num_executors
total_cores = executor_cores * num_executors
# Get application metrics
app_metrics = sc.getExecutorMemoryStatus()
analysis = {
'current_config': {
'executor_memory': executor_memory,
'executor_cores': executor_cores,
'num_executors': num_executors,
'total_memory_gb': total_memory_gb,
'total_cores': total_cores
},
'recommendations': []
}
# CPU utilization
avg_cpu = get_avg_cpu_usage()
peak_cpu = get_peak_cpu_usage()
if avg_cpu < 50:
recommended_executors = max(1, int(num_executors * 0.6))
analysis['recommendations'].append({
'type': 'reduce_executors',
'current': num_executors,
'recommended': recommended_executors,
'reason': f'Low CPU utilization ({avg_cpu:.1f}%)',
'savings_pct': (1 - recommended_executors / num_executors) * 100
})
# Memory utilization
avg_memory = get_avg_memory_usage()
peak_memory = get_peak_memory_usage()
if peak_memory < 0.6:
recommended_memory = int(int(executor_memory.replace("g", "")) * 0.7)
analysis['recommendations'].append({
'type': 'reduce_memory',
'current': executor_memory,
'recommended': f"{recommended_memory}g",
'reason': f'Low memory usage ({peak_memory:.1%} peak)',
'savings_pct': 30
})
return analysis
# Example usage
analysis = analyze_cluster_metrics(spark)
print(json.dumps(analysis, indent=2))

EMR Rightsizing

# EMR cluster rightsizing
import boto3
def analyze_emr_cluster(cluster_id: str, region: str = "us-east-1") -> dict:
"""Analyze EMR cluster for rightsizing"""
client = boto3.client('emr', region_name=region)
# Get cluster info
cluster_info = client.describe_cluster(ClusterId=cluster_id)
instance_groups = cluster_info['Cluster']['InstanceGroups']
analysis = {
'cluster_id': cluster_id,
'instance_groups': [],
'recommendations': []
}
for group in instance_groups:
group_id = group['Id']
group_type = group['InstanceGroupType']
instance_type = group['InstanceType']
instance_count = group['RunningInstanceCount']
# Get cloud watch metrics
cloudwatch = boto3.client('cloudwatch', region_name=region)
# CPU utilization metric
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/ElasticMapReduce',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': group_id}],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now(),
Period=3600,
Statistics=['Average', 'Maximum']
)
avg_cpu = cpu_stats['Datapoints'][0]['Average']
max_cpu = cpu_stats['Datapoints'][0]['Maximum']
group_info = {
'id': group_id,
'type': group_type,
'instance_type': instance_type,
'count': instance_count,
'avg_cpu': avg_cpu,
'max_cpu': max_cpu
}
# Check if over-provisioned
if avg_cpu < 50 and group_type == 'CORE':
recommended_type = get_smaller_instance_type(instance_type)
analysis['recommendations'].append({
'group_id': group_id,
'type': 'downsize_instance',
'current': instance_type,
'recommended': recommended_type,
'reason': f'Low CPU utilization ({avg_cpu:.1f}%)',
'savings_pct': 30
})
analysis['instance_groups'].append(group_info)
return analysis
def get_smaller_instance_type(instance_type: str) -> str:
"""Get next smaller instance type"""
# Instance type mapping (simplified)
size_order = [
'metal', '8xlarge', '4xlarge', '2xlarge', 'xlarge',
'large', 'medium', 'small'
]
family = instance_type.split('.')[0]
size = instance_type.split('.')[1]
current_index = size_order.index(size)
if current_index < len(size_order) - 1:
smaller_size = size_order[current_index + 1]
return f"{family}.{smaller_size}"
return instance_type

Rightsizing Patterns

Vertical Scaling

Horizontal Scaling


Rightsizing by Workload

Batch Workloads

# Batch workload rightsizing
def rightsize_batch_cluster(
job_duration_minutes: int,
data_size_tb: float,
num_tasks: int
) -> dict:
"""Rightsize cluster for batch workload"""
# Estimate required resources
# Rule of thumb: 1 core per 1TB data for simple ETL
required_cores = max(4, int(data_size_tb))
# Estimate executor memory
# Rule of thumb: 4GB per core for most jobs
required_memory_gb = required_cores * 4
# Estimate executors
required_executors = max(2, int(required_cores / 4))
# Spot instance percentage
spot_percentage = 80 # 80% spot, 20% on-demand
return {
'executor_cores': 4,
'executor_memory': '4g',
'num_executors': required_executors,
'spot_instance_percentage': spot_percentage,
'estimated_cost': estimate_batch_cost(
required_executors,
job_duration_minutes,
spot_percentage
)
}
def estimate_batch_cost(
num_executors: int,
duration_minutes: int,
spot_percentage: float
) -> dict:
"""Estimate batch job cost"""
# Pricing (simplified)
on_demand_per_hour = 0.10 # USD
spot_per_hour = 0.03 # USD
duration_hours = duration_minutes / 60
spot_executors = int(num_executors * spot_percentage / 100)
on_demand_executors = num_executors - spot_executors
spot_cost = spot_executors * duration_hours * spot_per_hour
on_demand_cost = on_demand_executors * duration_hours * on_demand_per_hour
total_cost = spot_cost + on_demand_cost
return {
'spot_cost': spot_cost,
'on_demand_cost': on_demand_cost,
'total_cost': total_cost,
'savings_vs_all_on_demand': total_cost / (num_executors * duration_hours * on_demand_per_hour)
}

Streaming Workloads

# Streaming workload rightsizing
def rightsize_streaming_cluster(
throughput_mb_per_sec: float,
latency_requirement_ms: int,
peak_multiplier: float = 2.0
) -> dict:
"""Rightsize cluster for streaming workload"""
# Base capacity for current load
base_executors = int(throughput_mb_per_sec / 100) # 100MB/sec per executor
# Peak capacity
peak_executors = int(base_executors * peak_multiplier)
return {
'min_executors': base_executors,
'max_executors': peak_executors,
'initial_executors': base_executors,
'dynamic_allocation': {
'enabled': True,
'min_executors': base_executors,
'max_executors': peak_executors,
'initial_executors': base_executors
},
'recommended_instance': 'm5.xlarge', # Memory optimized
'executor_memory': '8g',
'executor_cores': 4
}

Rightsizing Tools

Tool Comparison

ToolPlatformAutomationCostBest For
AWS Compute OptimizerAWSAutomaticFreeAWS services
Google RecommenderGCPAutomaticFreeGCP services
Azure AdvisorAzureAutomaticFreeAzure services
DatadogMulti-cloudManualPaidComprehensive monitoring
Prometheus/GrafanaOpen sourceManualFreeCustom solutions

Rightsizing Automation

Automated Rightsizing

# Automated cluster rightsizing
import boto3
from datetime import datetime, timedelta
def auto_rightsize_cluster(
cluster_id: str,
cpu_threshold_low: float = 50,
cpu_threshold_high: float = 80,
min_size: int = 2,
max_size: int = 100
):
"""Automatically rightsize cluster based on metrics"""
client = boto3.client('emr')
# Get current size
cluster_info = client.describe_cluster(ClusterId=cluster_id)
current_size = cluster_info['Cluster']['InstanceGroups'][0]['RunningInstanceCount']
# Get metrics
avg_cpu = get_cluster_cpu_avg(cluster_id, hours=24)
if avg_cpu < cpu_threshold_low and current_size > min_size:
# Downsize
new_size = max(min_size, int(current_size * 0.8))
client.set_termination_protection(
JobFlowIds=[cluster_id],
TerminationProtected=False
)
client.modify_instance_groups(
ClusterId=cluster_id,
InstanceGroups=[{
'InstanceGroupId': cluster_info['Cluster']['InstanceGroups'][0]['Id'],
'InstanceCount': new_size
}]
)
return {'action': 'downsized', 'from': current_size, 'to': new_size}
elif avg_cpu > cpu_threshold_high and current_size < max_size:
# Upsize
new_size = min(max_size, int(current_size * 1.2))
client.modify_instance_groups(
ClusterId=cluster_id,
InstanceGroups=[{
'InstanceGroupId': cluster_info['Cluster']['InstanceGroups'][0]['Id'],
'InstanceCount': new_size
}]
)
return {'action': 'upsized', 'from': current_size, 'to': new_size}
else:
return {'action': 'no_change', 'size': current_size}

Rightsizing Best Practices

DO

# 1. Monitor metrics continuously
# CPU, memory, storage, network
# 2. Use auto-scaling when possible
# Dynamic allocation for variable workloads
# 3. Right-size regularly
# Monthly or quarterly review
# 4. Consider spot instances
# 60-80% cost savings
# 5. Test before production
# Validate performance after changes

DON’T

# 1. Don't oversize "just in case"
# Wastes significant cost
# 2. Don't ignore workload patterns
# Batch vs. streaming need different approaches
# 3. Don't forget peak loads
# Ensure capacity for spikes
# 4. Don't resize too frequently
# Causes instability
# 5. Don't ignore memory
# OOM errors are worse than slow queries

Key Takeaways

  1. Target metrics: CPU 50-70%, Memory 60-80%
  2. Vertical scaling: Change instance size
  3. Horizontal scaling: Add/remove nodes
  4. Batch workloads: Size for job duration
  5. Streaming workloads: Size for peak with buffer
  6. Automation: Use auto-scaling when possible
  7. Spot instances: 60-80% cost savings
  8. Use When: All clusters, regular optimization

Back to Module 7