Skip to content

Compaction Strategies

Solving the Small Files Problem


Overview

Compaction combines small files into larger, optimized files to improve query performance and reduce storage costs. It’s essential for data platforms with streaming ingestion or frequent updates.


The Small Files Problem

Problem Analysis

Impact Metrics:

  • Query planning: 10-100x slower for thousands of files
  • I/O efficiency: Random reads vs. sequential
  • Metadata pressure: NameNode/metastore memory limits
  • Network overhead: More requests, more latency

Optimal File Sizes

File FormatOptimal SizeMin SizeMax Size
Parquet256MB-1GB128MB2GB
ORC256MB-1GB128MB2GB
Avro128MB-512MB64MB1GB
Delta Lake256MB-1GB128MB2GB
Iceberg256MB-1GB128MB2GB

Compaction Patterns

Full Compaction

When to Use:

  • End-of-day batch processing
  • Low-write tables
  • Nightly maintenance windows

Pros/Cons:

  • ✅ Simple, deterministic
  • ✅ Optimal file size
  • ❌ High compute cost
  • ❌ Not suitable for high-frequency writes

Incremental Compaction

When to Use:

  • Streaming ingestion
  • High-write tables
  • Real-time optimization

Pros/Cons:

  • ✅ Continuous optimization
  • ✅ Lower compute cost per run
  • ❌ More complex logic
  • ❌ Requires monitoring

Compaction Implementation

Spark Compaction Job

# Spark compaction job for Delta Lake
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("DeltaCompaction") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
def compact_delta_table(
table_path: str,
target_file_size_mb: int = 256,
max_concurrent_runs: int = 10
):
"""Compact Delta Lake table"""
# Configure compaction
spark.conf.set(
f"spark.databricks.delta.optimize.maxFileSize",
f"{target_file_size_mb}mb"
)
# Run optimize (Z-Order by ingestion time)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, table_path)
# Optimize and Z-Order
delta_table.optimize() \
.executeZOrderBy("ingestion_timestamp")
# Clean up old files (vacuum)
delta_table.vacuum(240) # Retain 7 days
print(f"Compaction complete for {table_path}")
# Example usage
compact_delta_table(
table_path="s3://bucket/tables/customers/",
target_file_size_mb=256
)

Iceberg Compaction

# Iceberg compaction with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergCompaction") \
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.my_catalog.type", "hadoop") \
.config("spark.sql.catalog.my_catalog.warehouse", "s3://bucket/warehouse") \
.getOrCreate()
def compact_iceberg_table(
table: str,
target_file_size_mb: int = 256
):
"""Compact Iceberg table"""
# Rewrite data files
spark.read.format("iceberg").load(table) \
.coalesce(1) \
.write \
.format("iceberg") \
.mode("overwrite") \
.option("target-file-size-bytes", target_file_size_mb * 1024 * 1024) \
.save(table)
# Expire snapshots
spark.sql(f"""
CALL my_catalog.system.expire_snapshots(
table => '{table}',
older_than => TIMESTAMP 'NOW() - INTERVAL 7 DAYS'
)
""")
print(f"Compaction complete for {table}")
# Example usage
compact_iceberg_table(
table="my_catalog.analytics.customers",
target_file_size_mb=256
)

Streaming Compaction

# Streaming compaction with Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamingCompaction") \
.getOrCreate()
def continuous_compaction(
source_path: str,
target_path: str,
trigger_interval: str = "5 minutes",
max_files_per_micro_batch: int = 100
):
"""Continuous compaction for streaming data"""
# Read streaming data
streaming_df = spark.readStream \
.format("parquet") \
.load(source_path)
# Write with compaction
query = streaming_df.writeStream \
.format("parquet") \
.outputMode("append") \
.partitionBy("date", "hour") \
.option("checkpointLocation", f"{target_path}/_checkpoint") \
.option("maxFilesPerTrigger", max_files_per_micro_batch) \
.trigger(processingTime=trigger_interval) \
.start(target_path)
return query
# Example usage
query = continuous_compaction(
source_path="s3://bucket/streaming/raw/",
target_path="s3://bucket/streaming/compact/",
trigger_interval="5 minutes"
)

Compaction Strategies

Strategy Selection

Strategy by Table Type

Table TypeCompaction StrategyFrequencyTarget Size
Fact table (batch)Full compactionDaily512MB-1GB
Fact table (streaming)IncrementalHourly256MB-512MB
Dimension tableFull compactionWeekly128MB-256MB
Staging tableIncrementalPer run256MB-512MB
Raw dataFull compactionDaily512MB-1GB

Compaction Tools

Tool Comparison

ToolFormatAutomationCostBest For
Delta OPTIMIZEDelta LakeBuilt-inIncludedDatabricks, Delta Lake
Iceberg RewriteIcebergManual/ScriptedComputeIceberg tables
Hudi CompactionHudiBuilt-inIncludedStreaming tables
Spark coalesceAnyManualComputeSimple jobs
EMRFS S3-OptimizedS3+EMRBuilt-inIncludedEMR workflows

Compaction Monitoring

Metrics

# Compaction monitoring
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CompactionMonitor") \
.getOrCreate()
def get_file_stats(table_path: str, format: str = "parquet"):
"""Get file statistics for table"""
# List files
files_df = spark.read.format(format) \
.load(table_path) \
.inputFiles()
# Get file sizes
file_sizes = []
for file in files_df:
path = file.split("/")[-1]
# Get file size from HDFS/S3 API
size = get_file_size(file)
file_sizes.append({
'path': path,
'size_bytes': size,
'size_mb': size / (1024 * 1024)
})
# Calculate statistics
total_files = len(file_sizes)
total_size_mb = sum(f['size_mb'] for f in file_sizes)
avg_size_mb = total_size_mb / total_files if total_files > 0 else 0
max_size_mb = max(f['size_mb'] for f in file_sizes) if file_sizes else 0
min_size_mb = min(f['size_mb'] for f in file_sizes) if file_sizes else 0
# Small files (< 128MB)
small_files = [f for f in file_sizes if f['size_mb'] < 128]
small_file_count = len(small_files)
small_file_percentage = small_file_count / total_files * 100 if total_files > 0 else 0
stats = {
'table_path': table_path,
'total_files': total_files,
'total_size_mb': total_size_mb,
'avg_size_mb': avg_size_mb,
'max_size_mb': max_size_mb,
'min_size_mb': min_size_mb,
'small_file_count': small_file_count,
'small_file_percentage': small_file_percentage,
'needs_compaction': small_file_percentage > 20 # > 20% small files
}
return stats
# Example usage
stats = get_file_stats("s3://bucket/tables/customers/")
print(f"File statistics: {stats}")
if stats['needs_compaction']:
print(f"WARNING: {stats['small_file_percentage']:.1f}% small files detected")
print("Consider running compaction")

Compaction Best Practices

DO

# 1. Compact during off-peak hours
# Nightly for batch tables
# 2. Use incremental for streaming
# Continuous optimization
# 3. Monitor file size distribution
# Detect fragmentation early
# 4. Combine with Z-Ordering
# Multi-dimensional optimization
# 5. Set appropriate target sizes
# 256MB-1GB for most cases

DON’T

# 1. Don't compact too frequently
# Wastes compute, minimal benefit
# 2. Don't ignore streaming tables
# Need continuous compaction
# 3. Don't forget to vacuum
# Clean up old files
# 4. Don't compact during peak hours
# Impacts query performance
# 5. Don't use wrong target size
# Balance between size and parallelism

Key Takeaways

  1. Small files problem: 10-100x query slowdown, metadata pressure
  2. Optimal size: 256MB-1GB for most formats
  3. Full compaction: Best for batch, nightly windows
  4. Incremental compaction: Best for streaming, continuous
  5. Delta Lake: OPTIMIZE command for automatic compaction
  6. Iceberg: Rewrite data files with size target
  7. Monitoring: Track file size distribution, small file percentage
  8. Use When: Streaming ingestion, frequent updates, query performance issues

Back to Module 7