Skip to content

Apache Spark Guide

Distributed Batch Processing at Scale


Overview

Apache Spark is the unified analytics engine for large-scale data processing. It combines batch processing, streaming, ML, and graph processing in a single framework. Spark is the workhorse of modern data platforms, handling ETL, data transformation, and machine learning at TB-PB scale.


Core Concepts

Spark Architecture

Key Components

ComponentDescriptionRole
DriverControl processCoordinates execution, holds SparkContext
ExecutorWorker processExecutes tasks, stores data in memory/disk
TaskUnit of workProcesses one partition of data
Cluster ManagerResource allocatorYARN, K8s, Mesos, Standalone
SparkContextEntry pointConnection to cluster

DataFrame API (Primary Interface)

DataFrame vs. RDD

Use DataFrame/Dataset API for all new code. RDD is legacy.

DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
# Create SparkSession
spark = SparkSession.builder \
.appName("Example") \
.getOrCreate()
# Read data
df = spark.read.parquet("s3://bucket/data/")
# Transformations (lazy)
df_filtered = df.filter(col("amount") > 100)
df_grouped = df_filtered.groupBy("category").agg(
expr("sum(amount) as total")
)
# Action (triggers execution)
df_grouped.write.parquet("s3://bucket/output/")

Transformation vs. Action:

  • Transformations: Lazy, build execution plan (filter, groupBy, join)
  • Actions: Eager, trigger execution (write, count, collect, show)

Performance Optimization

1. Parallelism Tuning

# Configure shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
# Rule of thumb:
# - Default: 200
# - For large data: 2-3x number of cores
# - For small data: Reduce to avoid overhead
# Repartition vs. Coalesce
df.repartition(100) # Increases partitions (shuffle)
df.coalesce(10) # Decreases partitions (no shuffle)

Guidelines:

  • Target: 128MB-256MB per partition
  • Too few partitions: Underutilized cluster
  • Too many partitions: Task scheduling overhead

2. Memory Management

# Memory fraction configuration
spark.conf.set("spark.memory.fraction", "0.6") # 60% for execution/storage
spark.conf.set("spark.memory.storageFraction", "0.5") # 50% of that for storage
# Serialization (Kryo is faster)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Garbage collection
spark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")

3. Broadcast Joins

# Small table broadcasted to all executors
from pyspark.sql.functions import broadcast
df_large.join(
broadcast(df_small), # < 10MB recommended
"key"
)
# Configuration
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Benefit: Avoids shuffle for small tables, 10-100x faster.

4. Caching

# Cache in memory
df_cached = df.cache()
df_cached.count() # Action to materialize cache
# Persist with storage level
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
# Unmatch when done
df_cached.unpersist()

Use cases:

  • Reusing DataFrame in multiple actions
  • ML iterations
  • Interactive analysis

Cost: Memory pressure, cache invalidation on data changes.


Common Patterns

Pattern 1: Medallion ETL

def process_bronze_to_silver(spark, table_name):
# Read bronze
bronze = spark.read.format("delta").load(f"s3://bucket/bronze/{table_name}")
# Transform
silver = bronze.filter(col("valid") == True) \
.dropDuplicates(["id"]) \
.withColumn("processed_at", current_timestamp())
# Write silver
silver.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save(f"s3://bucket/silver/{table_name}")
# Process multiple tables
tables = ["users", "events", "transactions"]
for table in tables:
process_bronze_to_silver(spark, table)

Pattern 2: Slowly Changing Dimensions (SCD Type 2)

from delta.tables import DeltaTable
# Target (existing)
target = DeltaTable.forPath(spark, "s3://bucket/silver/customers")
# Source (updates)
source = spark.read.parquet("s3://bucket/bronze/customers_updates")
# Merge (SCD Type 2)
target.alias("t").merge(
source.alias("s"),
"t.customer_id = s.customer_id AND t.current = true"
).whenMatchedUpdate(
condition = "s.updated_at > t.updated_at",
set = {
"current": "false",
"end_date": "s.updated_at"
}
).whenNotMatchedInsert(
values = {
"customer_id": "s.customer_id",
"name": "s.name",
"start_date": "s.updated_at",
"end_date": "null",
"current": "true"
}
).execute()

Pattern 3: Window Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
# Define window
window_spec = Window.partitionBy("user_id") \
.orderBy(col("event_timestamp").desc())
# Rank events per user
df_with_rank = df.withColumn(
"rank",
row_number().over(window_spec)
)
# Filter to latest 10 events per user
df_latest = df_with_rank.filter(col("rank") <= 10)

Spark SQL

SQL as Primary Interface

# Create view from DataFrame
df.createOrReplaceTempView("events")
# Run SQL
result = spark.sql("""
SELECT
user_id,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM events
WHERE event_date >= '2025-01-01'
GROUP BY user_id
HAVING SUM(amount) > 1000
ORDER BY total_amount DESC
""")

When to use SQL:

  • Standard transformations
  • Team members with SQL background
  • BI-friendly transformations

When to use DataFrame API:

  • Complex business logic
  • Type-safe operations (Dataset API in Scala)
  • Programmatic transformations

Streaming (Structured Streaming)

Streaming Concepts

Streaming Example

# Read stream
streaming_df = spark.readStream \
.format("delta") \
.load("s3://bucket/bronze/events")
# Transform
transformed = streaming_df.filter(col("valid") == True) \
.withColumn("processed_at", current_timestamp())
# Write stream
query = transformed.writeStream \
.format("delta") \
.outputMode("append") \
.partitionBy("date") \
.option("checkpointLocation", "s3://bucket/checkpoint/events") \
.start("s3://bucket/silver/events")
# Wait for termination
query.awaitTermination()

Output modes:

  • Append: Only new rows (default)
  • Complete: All rows (stateful aggregations)
  • Update: Changed rows (incremental aggregations)

Configuration and Tuning

Executor Sizing

# Example: Cluster with 10 nodes, each with 16 cores, 64GB RAM
# Option 1: Many small executors
# 10 nodes × 5 executors × 4 cores + 8GB = 200 cores, 400GB
spark.conf.set("spark.executor.cores", "4")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.instances", "50")
# Option 2: Few large executors
# 10 nodes × 2 executors × 8 cores + 24GB = 160 cores, 480GB
spark.conf.set("spark.executor.cores", "8")
spark.conf.set("spark.executor.memory", "24g")
spark.conf.set("spark.executor.instances", "20")

Guidelines:

  • 5 cores per executor: Good balance (allows 1 core for OS)
  • 30-40GB per executor: Avoids GC overhead
  • More small executors: Better for fault tolerance
  • Fewer large executors: Better for memory-intensive workloads

Dynamic Allocation

# Enable dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")

Benefit: Automatically scales executors based on workload.

Shuffle Optimization

# Compress shuffle
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
# Shuffle file consolidation
spark.conf.set("spark.shuffle.consolidateFiles", "true")
# Shuffle service (for dynamic allocation)
spark.conf.set("spark.shuffle.service.enabled", "true")

Cost Optimization

Use Spot Instances

# On EMR, use spot instances
# Configure spot fleet with 70% spot, 30% on-demand
# Spot instance pricing (approximate):
# - On-demand: $1.00
# - Spot: $0.20-0.40 (60-80% discount)
# Trade-off: Possible interruptions
# Mitigation: Use checkpointing, speculative execution

Right-Size Clusters

# Calculate required resources:
# - Data size: 10TB
# - Processing time: 2 hours desired
# - Per-node throughput: 500GB/hour
# Nodes needed = 10TB / 500GB/hour / 2 hours = 10 nodes
# Add 20% buffer = 12 nodes

File Sizing

# Optimal file size: 128MB-1GB Parquet
# Configure writes
df.write \
.option("maxRecordsPerFile", 1000000) \
.parquet("s3://bucket/output")
# Repartition before write
df.repartition(100) \
.write.parquet("s3://bucket/output")

Senior Level Gotchas

Gotcha 1: Skewed Joins

Problem: One key has many more values than others → one task takes forever.

Solution: Salting or skew hint.

# Detect skew
df.explain()
# Solution 1: Add salt
from pyspark.sql.functions import rand, floor
df_salted = df.withColumn("salt", floor(rand() * 10))
# Solution 2: Skew hint (Spark 3.0+)
df1.hint("skew").join(df2, "key")

Gotcha 2: Large Shuffle

Problem: Shuffle moves billions of rows → slow, memory pressure.

Solutions:

  1. Filter before shuffle
  2. Broadcast join for small tables
  3. Repartition by key before join
  4. Use approximate algorithms (approx_count_distinct)

Gotcha 3: Executor OOM

Problem: OutOfMemoryError on executors.

Solutions:

  1. Increase executor memory
  2. Reduce memory fraction
  3. Increase parallelism (fewer data per executor)
  4. Use coalesce to reduce partitions

Gotcha 4: Small Files

Problem: Streaming creates thousands of small files → slow queries.

Solution: Configure file size and compaction.

# Structured Streaming
df.writeStream \
.option("checkpointLocation", ...) \
.option("maxFilesPerTrigger", 1000) \
.option("parquet.block.size", 256 * 1024 * 1024) \
.start(...)

Best Practices

DO

  • Use DataFrame/Dataset API (not RDD)
  • Filter early (reduce data movement)
  • Use broadcast joins for small tables
  • Cache reused DataFrames
  • Use spot instances for batch workloads
  • Monitor Spark UI for bottlenecks

DON’T

  • Use RDD for new code (legacy)
  • Collect large DataFrames to driver (OOM)
  • Ignore skew (one slow task kills job)
  • Use default configurations (tune for workload)
  • Forget to unpersist cached DataFrames

Key Takeaways

  1. DataFrame API is primary interface: RDD is legacy
  2. Tune parallelism: Target 128MB-256MB per partition
  3. Use broadcast joins: 10-100x faster for small tables
  4. Cache strategically: Reused DataFrames only
  5. Spot instances: 60-80% savings for batch workloads
  6. Monitor Spark UI: Identify bottlenecks and skew
  7. File size matters: Target 256MB-1GB Parquet files

Back to Compute Engines