Skip to content

Spark Structured Streaming Deep Dive

Micro-Batch Stream Processing at Scale


Overview

Apache Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It provides exactly-once semantics, event-time processing, and integrates seamlessly with batch processing, making it ideal for lambda architectures and unified batch/streaming workloads.


Structured Streaming Architecture

Micro-Batch Processing Model

Key Difference from Flink: Spark uses micro-batches (100ms minimum latency) vs. Flink’s true streaming (10-50ms latency).


Core Concepts

Streaming DataFrame

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, explode
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
# Create Spark Session
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Define schema for JSON data
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
# Read from Kafka
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
from pyspark.sql.functions import from_json, to_json
events_df = kafka_df \
.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", schema).alias("data")) \
.select("data.*")
# This is a Streaming DataFrame - looks like batch DataFrame
# but processes data continuously

Output Modes

Output Mode Selection:

ModeDescriptionUse Case
AppendOnly new rows added to resultSimple aggregations, without watermarks
CompleteFull result refreshed each timeNon-aggregating queries, stateless
UpdateOnly changed rows emittedAggregations with watermarks

Windowing Operations

Tumbling Windows

from pyspark.sql.functions import window, col
# Tumbling window (1 minute)
tumbling_df = events_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "1 minute"),
col("event_type")
) \
.count() \
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("event_type"),
col("count").alias("event_count")
)
# Write to console (for testing)
query = tumbling_df \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime='1 minute') \
.start()

Sliding Windows

# Sliding window (1 minute size, 30 second slide)
sliding_df = events_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "1 minute", "30 seconds"),
col("user_id")
) \
.count()
query = sliding_df \
.writeStream \
.outputMode("update") \
.format("console") \
.start()

Session Windows

# Session window (5 minute gap)
from pyspark.sql.functions import session_window
session_df = events_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(col("timestamp"), "5 minute"),
col("user_id")
) \
.count()
query = session_df \
.writeStream \
.outputMode("update") \
.format("console") \
.start()

Watermarks and Late Data

Watermark Strategy

Watermark Configuration:

from pyspark.sql.functions import col
# Define watermark: allow 10 minutes of lateness
watermarked_df = events_df \
.withWatermark("timestamp", "10 minutes")
# How it works:
# - Watermark = max(event_time) - 10 minutes
# - State for older windows is dropped
# - Late data (before watermark) is dropped
# - On-time and slightly late data is processed

Late Data Example

# Scenario: Event arrives 15 minutes late
# - Event time: 10:00
# - Processing time: 10:15
# - Watermark: 10:05 (max event time 10:15 - 10 minutes)
# - Result: Event is DROPPED (before watermark)
# Solution: Increase watermark tolerance
# - Watermark: 20 minutes allows more late data
# - Trade-off: More state retention (higher memory)
from pyspark.sql.functions import col, count
aggregated_df = events_df \
.withWatermark("timestamp", "20 minutes") \ # More tolerance
.groupBy(
window(col("timestamp"), "5 minutes"),
col("user_id")
) \
.agg(count("*").alias("event_count"))

State Management

State Store

State Backend Configuration:

# Configure state store
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
# Configure state retention (for watermarks)
spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")
spark.conf.set("spark.sql.streaming.stateStore.compress", "true")
# Configure checkpointing
checkpoint_path = "s3://bucket/checkpoints/event-aggregation/"

State TTL

# State is automatically cleaned up based on watermark
# No manual TTL configuration needed
# Example: 5-minute tumbling windows with 10-minute watermark
aggregated_df = events_df \
.withWatermark("timestamp", "10 minutes") \ # State TTL
.groupBy(
window(col("timestamp"), "5 minutes"),
col("user_id")
) \
.count()
# State is retained for: 10 minutes (watermark) + 5 minutes (window size)
# Old state is automatically dropped

Join Strategies

Stream-Static Join

# Join streaming data with static dimension table
# Common pattern: enrichment
# Static dimension table (loaded once)
customers_df = spark.read.parquet("s3://bucket/dims/customers/")
# Stream-stream join (enrichment)
enriched_df = events_df \
.join(
customers_df,
on="customer_id",
how="left" # Keep all events
) \
.select(
col("event_id"),
col("customer_id"),
col("customer_name"),
col("segment"),
col("event_type"),
col("timestamp")
)
query = enriched_df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3://bucket/output/enriched-events/") \
.option("checkpointLocation", "s3://bucket/checkpoints/enrichment/") \
.trigger(processingTime='1 minute') \
.start()

Stream-Stream Join

# Join two streaming sources
# Use case: real-time correlation
# Events stream
events_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092") \
.option("subscribe", "events") \
.load()
# Enrichment stream
enrichment_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092") \
.option("subscribe", "enrichment") \
.load()
# Stream-stream join (with watermarks)
joined_stream = events_stream \
.withWatermark("timestamp", "10 minutes") \
.join(
enrichment_stream \
.withWatermark("timestamp", "10 minutes"),
on="user_id",
joinType="inner",
conditionExpr="events.timestamp BETWEEN enrichment.timestamp - INTERVAL 5 MINUTES AND enrichment.timestamp + INTERVAL 5 MINUTES"
)
query = joined_stream \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

Sink Options

Console Sink (Testing)

# Console sink: For development and testing
query = events_df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime='10 seconds') \
.start()

Memory Sink (Testing)

# Memory sink: For testing in notebooks
query = events_df \
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("events_table") \
.start()
# Query the in-memory table
spark.sql("SELECT * FROM events_table").show()

File Sink (Parquet/JSON)

# File sink: Write to Parquet/JSON/CSV
query = aggregated_df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3://bucket/output/events/") \
.option("checkpointLocation", "s3://bucket/checkpoints/events/") \
.partitionBy("date", "hour") \
.trigger(processingTime='5 minutes') \
.start()

Kafka Sink

# Kafka sink: Write back to Kafka
query = enriched_df \
.selectExpr(
"to_json(struct(*)) AS value"
) \
.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
.option("topic", "enriched-events") \
.option("checkpointLocation", "s3://bucket/checkpoints/kafka-sink/") \
.trigger(processingTime='1 minute') \
.start()
# Delta Lake sink: ACID transactions, time travel
from delta.tables import DeltaTable
query = enriched_df \
.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "s3://bucket/delta/events/") \
.option("checkpointLocation", "s3://bucket/checkpoints/delta/") \
.trigger(processingTime='1 minute') \
.start()
# Benefits:
# - Exactly-once semantics
# - ACID transactions
# - Time travel (query previous versions)
# - Schema enforcement
# - Upsert support (MERGE)

Performance Tuning

Trigger Configuration

# Processing time trigger (micro-batches)
query = df.writeStream \
.trigger(processingTime='1 minute') \ # Micro-batch interval
.start()
# One-time trigger (manual batches)
query = df.writeStream \
.trigger(once=True) \
.start()
# Continuous trigger (experimental, low latency)
# Requires specific operations (map, filter)
query = df.writeStream \
.trigger(continuous='1 second') \ # Checkpoint interval
.start()

Shuffle Partitions

# Configure shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
# For streaming: Use fewer partitions than batch
# - More partitions = more overhead
# - Fewer partitions = larger tasks
# Auto-coalesce
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

State Store Optimization

# State store configuration
spark.conf.set("spark.sql.streaming.stateStore.compression.codec", "zstd")
spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true")
# Use RocksDB for large state
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Monitoring and Observability

Streaming Query Progress

# Get active streaming queries
spark.streams.active # List active queries
# Get query status
query = spark.streams.get("query-id")
query.status # JSON status
query.lastProgress # Last micro-batch progress
query.recentProgress # Last 10 micro-batches
# Example output:
# {
# "id": "query-id",
# "runId": "run-id",
# "name": "events-aggregation",
# "timestamp": "2025-01-27T10:00:00.000Z",
# "batchId": 100,
# "numInputRows": 1000000,
# "inputRowsPerSecond": 10000.0,
# "processedRowsPerSecond": 9500.0,
# "durationMs": {
# "triggerExecution": 1000,
# "queryPlanning": 100,
# "walCommit": 50
# }
# }

Spark UI

Critical Metrics:

  • Input Rate: Rows entering the stream per second
  • Processing Rate: Rows processed per second
  • Batch Duration: Time to process each micro-batch
  • Lag: Difference between event time and processing time

Fault Tolerance

Checkpointing

# Checkpointing enables exactly-once semantics
checkpoint_path = "s3://bucket/checkpoints/my-stream/"
query = events_df \
.writeStream \
.format("delta") \
.option("path", "s3://bucket/output/") \
.option("checkpointLocation", checkpoint_path) \
.start()
# Checkpoint contains:
# - Offset log: Which offsets have been processed
# - Commit log: Which micro-batches committed
# - State snapshots: State store snapshots
# Recovery: Query resumes from last checkpoint

Exactly-Once Semantics

Requirements for Exactly-Once:

  1. Idempotent Sink: Must handle duplicate writes
  2. Checkpointing: Track processed offsets
  3. Transaction Support: Delta Lake, Hudi, Iceberg

Cost Optimization

Resource Management

# Auto-scaling for streaming
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "4")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "4")
# Spot instances for streaming (use with checkpointing)
spark.conf.set("spark.executor.instances", "10")
spark.conf.set("spark.task.maxFailures", "4") # Tolerate spot failures
# Memory tuning
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")
spark.conf.set("spark.driver.memory", "4g")

Data Skipping

# Partitioning for output
query = aggregated_df \
.withColumn("date", col("window_start").cast("date")) \
.withColumn("hour", hour(col("window_start"))) \
.writeStream \
.format("delta") \
.partitionBy("date", "hour") \ # Partition pruning
.option("path", "s3://bucket/output/") \
.start()
# Z-Ordering for Delta Lake
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://bucket/output/")
delta_table.optimize() \
.executeZOrderBy("user_id", "event_type")

FeatureSpark Structured StreamingFlink
Latency100ms - 5s (micro-batch)10-50ms (true streaming)
State ManagementAutomatic with watermarksManual, more control
APIDataFrame/Dataset (SQL-like)DataStream API (more verbose)
EcosystemSpark ML, GraphX, SQLFlink ML, Gelly, Table API
MaturityVery matureMature
Use CaseLambda architectures, unified batch/streamReal-time, low-latency

When to Choose Spark:

  • Need unified batch + streaming (lambda architecture)
  • Team already knows Spark
  • Lower latency not critical
  • Need Spark ML/GraphX integration

When to Choose Flink:

  • Sub-second latency required
  • Complex stateful operations
  • Event time processing critical
  • Need more control over state

Best Practices

DO

# 1. Always specify watermark for late data handling
df = df.withWatermark("timestamp", "10 minutes")
# 2. Use Delta Lake sink for ACID transactions
df.writeStream.format("delta").start()
# 3. Monitor query progress
query.lastProgress
# 4. Set appropriate trigger interval
# - Development: 10 seconds
# - Production: 1-5 minutes
# 5. Use checkpointing for fault tolerance
.option("checkpointLocation", "s3://bucket/checkpoints/")

DON’T

# 1. Don't ignore watermark configuration
# Late data will be dropped without watermark
# 2. Don't use complete output mode for unbounded data
# Causes memory issues
# 3. Don't forget to handle backpressure
# If processing rate < input rate, increase resources
# 4. Don't use state operations without watermark
# State grows unbounded
# 5. Don't use complex aggregations without state TTL
# Causes OOM errors

Key Takeaways

  1. Micro-batch processing: 100ms minimum latency (vs. Flink’s 10-50ms)
  2. Unified API: Same DataFrame API for batch and streaming
  3. Exactly-once: Built-in with checkpointing and idempotent sinks
  4. Watermarks: Critical for late data handling and state cleanup
  5. State management: Automatic with watermarks, manual tuning for large state
  6. Delta Lake: Best sink for ACID transactions and time travel
  7. Monitoring: Use Spark UI and query.lastProgress for observability
  8. Integration: Seamless with Spark ML, GraphX, and batch processing

Back to Module 2