Apache Spark Guide
Distributed Batch Processing at Scale
Overview
Apache Spark is the unified analytics engine for large-scale data processing. It combines batch processing, streaming, ML, and graph processing in a single framework. Spark is the workhorse of modern data platforms, handling ETL, data transformation, and machine learning at TB-PB scale.
Core Concepts
Spark Architecture
Key Components
| Component | Description | Role |
|---|---|---|
| Driver | Control process | Coordinates execution, holds SparkContext |
| Executor | Worker process | Executes tasks, stores data in memory/disk |
| Task | Unit of work | Processes one partition of data |
| Cluster Manager | Resource allocator | YARN, K8s, Mesos, Standalone |
| SparkContext | Entry point | Connection to cluster |
DataFrame API (Primary Interface)
DataFrame vs. RDD
Use DataFrame/Dataset API for all new code. RDD is legacy.
DataFrame Operations
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, expr
# Create SparkSessionspark = SparkSession.builder \ .appName("Example") \ .getOrCreate()
# Read datadf = spark.read.parquet("s3://bucket/data/")
# Transformations (lazy)df_filtered = df.filter(col("amount") > 100)df_grouped = df_filtered.groupBy("category").agg( expr("sum(amount) as total"))
# Action (triggers execution)df_grouped.write.parquet("s3://bucket/output/")Transformation vs. Action:
- Transformations: Lazy, build execution plan (filter, groupBy, join)
- Actions: Eager, trigger execution (write, count, collect, show)
Performance Optimization
1. Parallelism Tuning
# Configure shuffle partitionsspark.conf.set("spark.sql.shuffle.partitions", "200")
# Rule of thumb:# - Default: 200# - For large data: 2-3x number of cores# - For small data: Reduce to avoid overhead
# Repartition vs. Coalescedf.repartition(100) # Increases partitions (shuffle)df.coalesce(10) # Decreases partitions (no shuffle)Guidelines:
- Target: 128MB-256MB per partition
- Too few partitions: Underutilized cluster
- Too many partitions: Task scheduling overhead
2. Memory Management
# Memory fraction configurationspark.conf.set("spark.memory.fraction", "0.6") # 60% for execution/storagespark.conf.set("spark.memory.storageFraction", "0.5") # 50% of that for storage
# Serialization (Kryo is faster)spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Garbage collectionspark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")3. Broadcast Joins
# Small table broadcasted to all executorsfrom pyspark.sql.functions import broadcast
df_large.join( broadcast(df_small), # < 10MB recommended "key")
# Configurationspark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")Benefit: Avoids shuffle for small tables, 10-100x faster.
4. Caching
# Cache in memorydf_cached = df.cache()df_cached.count() # Action to materialize cache
# Persist with storage levelfrom pyspark.storagelevel import StorageLeveldf.persist(StorageLevel.MEMORY_AND_DISK)
# Unmatch when donedf_cached.unpersist()Use cases:
- Reusing DataFrame in multiple actions
- ML iterations
- Interactive analysis
Cost: Memory pressure, cache invalidation on data changes.
Common Patterns
Pattern 1: Medallion ETL
def process_bronze_to_silver(spark, table_name): # Read bronze bronze = spark.read.format("delta").load(f"s3://bucket/bronze/{table_name}")
# Transform silver = bronze.filter(col("valid") == True) \ .dropDuplicates(["id"]) \ .withColumn("processed_at", current_timestamp())
# Write silver silver.write.format("delta") \ .mode("overwrite") \ .partitionBy("date") \ .save(f"s3://bucket/silver/{table_name}")
# Process multiple tablestables = ["users", "events", "transactions"]for table in tables: process_bronze_to_silver(spark, table)Pattern 2: Slowly Changing Dimensions (SCD Type 2)
from delta.tables import DeltaTable
# Target (existing)target = DeltaTable.forPath(spark, "s3://bucket/silver/customers")
# Source (updates)source = spark.read.parquet("s3://bucket/bronze/customers_updates")
# Merge (SCD Type 2)target.alias("t").merge( source.alias("s"), "t.customer_id = s.customer_id AND t.current = true").whenMatchedUpdate( condition = "s.updated_at > t.updated_at", set = { "current": "false", "end_date": "s.updated_at" }).whenNotMatchedInsert( values = { "customer_id": "s.customer_id", "name": "s.name", "start_date": "s.updated_at", "end_date": "null", "current": "true" }).execute()Pattern 3: Window Functions
from pyspark.sql.window import Windowfrom pyspark.sql.functions import row_number, rank
# Define windowwindow_spec = Window.partitionBy("user_id") \ .orderBy(col("event_timestamp").desc())
# Rank events per userdf_with_rank = df.withColumn( "rank", row_number().over(window_spec))
# Filter to latest 10 events per userdf_latest = df_with_rank.filter(col("rank") <= 10)Spark SQL
SQL as Primary Interface
# Create view from DataFramedf.createOrReplaceTempView("events")
# Run SQLresult = spark.sql(""" SELECT user_id, COUNT(*) as event_count, SUM(amount) as total_amount FROM events WHERE event_date >= '2025-01-01' GROUP BY user_id HAVING SUM(amount) > 1000 ORDER BY total_amount DESC""")When to use SQL:
- Standard transformations
- Team members with SQL background
- BI-friendly transformations
When to use DataFrame API:
- Complex business logic
- Type-safe operations (Dataset API in Scala)
- Programmatic transformations
Streaming (Structured Streaming)
Streaming Concepts
Streaming Example
# Read streamstreaming_df = spark.readStream \ .format("delta") \ .load("s3://bucket/bronze/events")
# Transformtransformed = streaming_df.filter(col("valid") == True) \ .withColumn("processed_at", current_timestamp())
# Write streamquery = transformed.writeStream \ .format("delta") \ .outputMode("append") \ .partitionBy("date") \ .option("checkpointLocation", "s3://bucket/checkpoint/events") \ .start("s3://bucket/silver/events")
# Wait for terminationquery.awaitTermination()Output modes:
- Append: Only new rows (default)
- Complete: All rows (stateful aggregations)
- Update: Changed rows (incremental aggregations)
Configuration and Tuning
Executor Sizing
# Example: Cluster with 10 nodes, each with 16 cores, 64GB RAM
# Option 1: Many small executors# 10 nodes × 5 executors × 4 cores + 8GB = 200 cores, 400GBspark.conf.set("spark.executor.cores", "4")spark.conf.set("spark.executor.memory", "8g")spark.conf.set("spark.executor.instances", "50")
# Option 2: Few large executors# 10 nodes × 2 executors × 8 cores + 24GB = 160 cores, 480GBspark.conf.set("spark.executor.cores", "8")spark.conf.set("spark.executor.memory", "24g")spark.conf.set("spark.executor.instances", "20")Guidelines:
- 5 cores per executor: Good balance (allows 1 core for OS)
- 30-40GB per executor: Avoids GC overhead
- More small executors: Better for fault tolerance
- Fewer large executors: Better for memory-intensive workloads
Dynamic Allocation
# Enable dynamic allocationspark.conf.set("spark.dynamicAllocation.enabled", "true")spark.conf.set("spark.dynamicAllocation.minExecutors", "2")spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")Benefit: Automatically scales executors based on workload.
Shuffle Optimization
# Compress shufflespark.conf.set("spark.shuffle.compress", "true")spark.conf.set("spark.shuffle.spill.compress", "true")
# Shuffle file consolidationspark.conf.set("spark.shuffle.consolidateFiles", "true")
# Shuffle service (for dynamic allocation)spark.conf.set("spark.shuffle.service.enabled", "true")Cost Optimization
Use Spot Instances
# On EMR, use spot instances# Configure spot fleet with 70% spot, 30% on-demand
# Spot instance pricing (approximate):# - On-demand: $1.00# - Spot: $0.20-0.40 (60-80% discount)
# Trade-off: Possible interruptions# Mitigation: Use checkpointing, speculative executionRight-Size Clusters
# Calculate required resources:# - Data size: 10TB# - Processing time: 2 hours desired# - Per-node throughput: 500GB/hour
# Nodes needed = 10TB / 500GB/hour / 2 hours = 10 nodes# Add 20% buffer = 12 nodesFile Sizing
# Optimal file size: 128MB-1GB Parquet# Configure writes
df.write \ .option("maxRecordsPerFile", 1000000) \ .parquet("s3://bucket/output")
# Repartition before writedf.repartition(100) \ .write.parquet("s3://bucket/output")Senior Level Gotchas
Gotcha 1: Skewed Joins
Problem: One key has many more values than others → one task takes forever.
Solution: Salting or skew hint.
# Detect skewdf.explain()
# Solution 1: Add saltfrom pyspark.sql.functions import rand, floordf_salted = df.withColumn("salt", floor(rand() * 10))
# Solution 2: Skew hint (Spark 3.0+)df1.hint("skew").join(df2, "key")Gotcha 2: Large Shuffle
Problem: Shuffle moves billions of rows → slow, memory pressure.
Solutions:
- Filter before shuffle
- Broadcast join for small tables
- Repartition by key before join
- Use approximate algorithms (approx_count_distinct)
Gotcha 3: Executor OOM
Problem: OutOfMemoryError on executors.
Solutions:
- Increase executor memory
- Reduce memory fraction
- Increase parallelism (fewer data per executor)
- Use coalesce to reduce partitions
Gotcha 4: Small Files
Problem: Streaming creates thousands of small files → slow queries.
Solution: Configure file size and compaction.
# Structured Streamingdf.writeStream \ .option("checkpointLocation", ...) \ .option("maxFilesPerTrigger", 1000) \ .option("parquet.block.size", 256 * 1024 * 1024) \ .start(...)Best Practices
DO
- Use DataFrame/Dataset API (not RDD)
- Filter early (reduce data movement)
- Use broadcast joins for small tables
- Cache reused DataFrames
- Use spot instances for batch workloads
- Monitor Spark UI for bottlenecks
DON’T
- Use RDD for new code (legacy)
- Collect large DataFrames to driver (OOM)
- Ignore skew (one slow task kills job)
- Use default configurations (tune for workload)
- Forget to unpersist cached DataFrames
Key Takeaways
- DataFrame API is primary interface: RDD is legacy
- Tune parallelism: Target 128MB-256MB per partition
- Use broadcast joins: 10-100x faster for small tables
- Cache strategically: Reused DataFrames only
- Spot instances: 60-80% savings for batch workloads
- Monitor Spark UI: Identify bottlenecks and skew
- File size matters: Target 256MB-1GB Parquet files
Back to Compute Engines