Compaction Strategies
Solving the Small Files Problem
Overview
Compaction combines small files into larger, optimized files to improve query performance and reduce storage costs. It’s essential for data platforms with streaming ingestion or frequent updates.
The Small Files Problem
Problem Analysis
Impact Metrics:
- Query planning: 10-100x slower for thousands of files
- I/O efficiency: Random reads vs. sequential
- Metadata pressure: NameNode/metastore memory limits
- Network overhead: More requests, more latency
Optimal File Sizes
| File Format | Optimal Size | Min Size | Max Size |
|---|---|---|---|
| Parquet | 256MB-1GB | 128MB | 2GB |
| ORC | 256MB-1GB | 128MB | 2GB |
| Avro | 128MB-512MB | 64MB | 1GB |
| Delta Lake | 256MB-1GB | 128MB | 2GB |
| Iceberg | 256MB-1GB | 128MB | 2GB |
Compaction Patterns
Full Compaction
When to Use:
- End-of-day batch processing
- Low-write tables
- Nightly maintenance windows
Pros/Cons:
- ✅ Simple, deterministic
- ✅ Optimal file size
- ❌ High compute cost
- ❌ Not suitable for high-frequency writes
Incremental Compaction
When to Use:
- Streaming ingestion
- High-write tables
- Real-time optimization
Pros/Cons:
- ✅ Continuous optimization
- ✅ Lower compute cost per run
- ❌ More complex logic
- ❌ Requires monitoring
Compaction Implementation
Spark Compaction Job
# Spark compaction job for Delta Lake
from delta import *from pyspark.sql import SparkSessionfrom pyspark.sql.functions import *
spark = SparkSession.builder \ .appName("DeltaCompaction") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
def compact_delta_table( table_path: str, target_file_size_mb: int = 256, max_concurrent_runs: int = 10): """Compact Delta Lake table"""
# Configure compaction spark.conf.set( f"spark.databricks.delta.optimize.maxFileSize", f"{target_file_size_mb}mb" )
# Run optimize (Z-Order by ingestion time) from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, table_path)
# Optimize and Z-Order delta_table.optimize() \ .executeZOrderBy("ingestion_timestamp")
# Clean up old files (vacuum) delta_table.vacuum(240) # Retain 7 days
print(f"Compaction complete for {table_path}")
# Example usagecompact_delta_table( table_path="s3://bucket/tables/customers/", target_file_size_mb=256)Iceberg Compaction
# Iceberg compaction with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("IcebergCompaction") \ .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.my_catalog.type", "hadoop") \ .config("spark.sql.catalog.my_catalog.warehouse", "s3://bucket/warehouse") \ .getOrCreate()
def compact_iceberg_table( table: str, target_file_size_mb: int = 256): """Compact Iceberg table"""
# Rewrite data files spark.read.format("iceberg").load(table) \ .coalesce(1) \ .write \ .format("iceberg") \ .mode("overwrite") \ .option("target-file-size-bytes", target_file_size_mb * 1024 * 1024) \ .save(table)
# Expire snapshots spark.sql(f""" CALL my_catalog.system.expire_snapshots( table => '{table}', older_than => TIMESTAMP 'NOW() - INTERVAL 7 DAYS' ) """)
print(f"Compaction complete for {table}")
# Example usagecompact_iceberg_table( table="my_catalog.analytics.customers", target_file_size_mb=256)Streaming Compaction
# Streaming compaction with Structured Streaming
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import *
spark = SparkSession.builder \ .appName("StreamingCompaction") \ .getOrCreate()
def continuous_compaction( source_path: str, target_path: str, trigger_interval: str = "5 minutes", max_files_per_micro_batch: int = 100): """Continuous compaction for streaming data"""
# Read streaming data streaming_df = spark.readStream \ .format("parquet") \ .load(source_path)
# Write with compaction query = streaming_df.writeStream \ .format("parquet") \ .outputMode("append") \ .partitionBy("date", "hour") \ .option("checkpointLocation", f"{target_path}/_checkpoint") \ .option("maxFilesPerTrigger", max_files_per_micro_batch) \ .trigger(processingTime=trigger_interval) \ .start(target_path)
return query
# Example usagequery = continuous_compaction( source_path="s3://bucket/streaming/raw/", target_path="s3://bucket/streaming/compact/", trigger_interval="5 minutes")Compaction Strategies
Strategy Selection
Strategy by Table Type
| Table Type | Compaction Strategy | Frequency | Target Size |
|---|---|---|---|
| Fact table (batch) | Full compaction | Daily | 512MB-1GB |
| Fact table (streaming) | Incremental | Hourly | 256MB-512MB |
| Dimension table | Full compaction | Weekly | 128MB-256MB |
| Staging table | Incremental | Per run | 256MB-512MB |
| Raw data | Full compaction | Daily | 512MB-1GB |
Compaction Tools
Tool Comparison
| Tool | Format | Automation | Cost | Best For |
|---|---|---|---|---|
| Delta OPTIMIZE | Delta Lake | Built-in | Included | Databricks, Delta Lake |
| Iceberg Rewrite | Iceberg | Manual/Scripted | Compute | Iceberg tables |
| Hudi Compaction | Hudi | Built-in | Included | Streaming tables |
| Spark coalesce | Any | Manual | Compute | Simple jobs |
| EMRFS S3-Optimized | S3+EMR | Built-in | Included | EMR workflows |
Compaction Monitoring
Metrics
# Compaction monitoring
from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("CompactionMonitor") \ .getOrCreate()
def get_file_stats(table_path: str, format: str = "parquet"): """Get file statistics for table"""
# List files files_df = spark.read.format(format) \ .load(table_path) \ .inputFiles()
# Get file sizes file_sizes = [] for file in files_df: path = file.split("/")[-1] # Get file size from HDFS/S3 API size = get_file_size(file) file_sizes.append({ 'path': path, 'size_bytes': size, 'size_mb': size / (1024 * 1024) })
# Calculate statistics total_files = len(file_sizes) total_size_mb = sum(f['size_mb'] for f in file_sizes) avg_size_mb = total_size_mb / total_files if total_files > 0 else 0 max_size_mb = max(f['size_mb'] for f in file_sizes) if file_sizes else 0 min_size_mb = min(f['size_mb'] for f in file_sizes) if file_sizes else 0
# Small files (< 128MB) small_files = [f for f in file_sizes if f['size_mb'] < 128] small_file_count = len(small_files) small_file_percentage = small_file_count / total_files * 100 if total_files > 0 else 0
stats = { 'table_path': table_path, 'total_files': total_files, 'total_size_mb': total_size_mb, 'avg_size_mb': avg_size_mb, 'max_size_mb': max_size_mb, 'min_size_mb': min_size_mb, 'small_file_count': small_file_count, 'small_file_percentage': small_file_percentage, 'needs_compaction': small_file_percentage > 20 # > 20% small files }
return stats
# Example usagestats = get_file_stats("s3://bucket/tables/customers/")print(f"File statistics: {stats}")
if stats['needs_compaction']: print(f"WARNING: {stats['small_file_percentage']:.1f}% small files detected") print("Consider running compaction")Compaction Best Practices
DO
# 1. Compact during off-peak hours# Nightly for batch tables
# 2. Use incremental for streaming# Continuous optimization
# 3. Monitor file size distribution# Detect fragmentation early
# 4. Combine with Z-Ordering# Multi-dimensional optimization
# 5. Set appropriate target sizes# 256MB-1GB for most casesDON’T
# 1. Don't compact too frequently# Wastes compute, minimal benefit
# 2. Don't ignore streaming tables# Need continuous compaction
# 3. Don't forget to vacuum# Clean up old files
# 4. Don't compact during peak hours# Impacts query performance
# 5. Don't use wrong target size# Balance between size and parallelismKey Takeaways
- Small files problem: 10-100x query slowdown, metadata pressure
- Optimal size: 256MB-1GB for most formats
- Full compaction: Best for batch, nightly windows
- Incremental compaction: Best for streaming, continuous
- Delta Lake: OPTIMIZE command for automatic compaction
- Iceberg: Rewrite data files with size target
- Monitoring: Track file size distribution, small file percentage
- Use When: Streaming ingestion, frequent updates, query performance issues
Back to Module 7