Spark Structured Streaming Deep Dive
Micro-Batch Stream Processing at Scale
Overview
Apache Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It provides exactly-once semantics, event-time processing, and integrates seamlessly with batch processing, making it ideal for lambda architectures and unified batch/streaming workloads.
Structured Streaming Architecture
Micro-Batch Processing Model
Key Difference from Flink: Spark uses micro-batches (100ms minimum latency) vs. Flink’s true streaming (10-50ms latency).
Core Concepts
Streaming DataFrame
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, window, explodefrom pyspark.sql.types import StructType, StructField, StringType, TimestampType
# Create Spark Sessionspark = SparkSession.builder \ .appName("StructuredStreamingExample") \ .config("spark.sql.streaming.schemaInference", "true") \ .getOrCreate()
# Define schema for JSON dataschema = StructType([ StructField("event_id", StringType(), True), StructField("user_id", StringType(), True), StructField("event_type", StringType(), True), StructField("timestamp", TimestampType(), True)])
# Read from Kafkakafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") \ .option("subscribe", "events") \ .option("startingOffsets", "latest") \ .load()
# Parse JSONfrom pyspark.sql.functions import from_json, to_json
events_df = kafka_df \ .selectExpr("CAST(value AS STRING)") \ .select(from_json("value", schema).alias("data")) \ .select("data.*")
# This is a Streaming DataFrame - looks like batch DataFrame# but processes data continuouslyOutput Modes
Output Mode Selection:
| Mode | Description | Use Case |
|---|---|---|
| Append | Only new rows added to result | Simple aggregations, without watermarks |
| Complete | Full result refreshed each time | Non-aggregating queries, stateless |
| Update | Only changed rows emitted | Aggregations with watermarks |
Windowing Operations
Tumbling Windows
from pyspark.sql.functions import window, col
# Tumbling window (1 minute)tumbling_df = events_df \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(col("timestamp"), "1 minute"), col("event_type") ) \ .count() \ .select( col("window.start").alias("window_start"), col("window.end").alias("window_end"), col("event_type"), col("count").alias("event_count") )
# Write to console (for testing)query = tumbling_df \ .writeStream \ .outputMode("update") \ .format("console") \ .option("truncate", "false") \ .trigger(processingTime='1 minute') \ .start()Sliding Windows
# Sliding window (1 minute size, 30 second slide)sliding_df = events_df \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(col("timestamp"), "1 minute", "30 seconds"), col("user_id") ) \ .count()
query = sliding_df \ .writeStream \ .outputMode("update") \ .format("console") \ .start()Session Windows
# Session window (5 minute gap)from pyspark.sql.functions import session_window
session_df = events_df \ .withWatermark("timestamp", "10 minutes") \ .groupBy( session_window(col("timestamp"), "5 minute"), col("user_id") ) \ .count()
query = session_df \ .writeStream \ .outputMode("update") \ .format("console") \ .start()Watermarks and Late Data
Watermark Strategy
Watermark Configuration:
from pyspark.sql.functions import col
# Define watermark: allow 10 minutes of latenesswatermarked_df = events_df \ .withWatermark("timestamp", "10 minutes")
# How it works:# - Watermark = max(event_time) - 10 minutes# - State for older windows is dropped# - Late data (before watermark) is dropped# - On-time and slightly late data is processedLate Data Example
# Scenario: Event arrives 15 minutes late# - Event time: 10:00# - Processing time: 10:15# - Watermark: 10:05 (max event time 10:15 - 10 minutes)# - Result: Event is DROPPED (before watermark)
# Solution: Increase watermark tolerance# - Watermark: 20 minutes allows more late data# - Trade-off: More state retention (higher memory)
from pyspark.sql.functions import col, count
aggregated_df = events_df \ .withWatermark("timestamp", "20 minutes") \ # More tolerance .groupBy( window(col("timestamp"), "5 minutes"), col("user_id") ) \ .agg(count("*").alias("event_count"))State Management
State Store
State Backend Configuration:
# Configure state storespark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
# Configure state retention (for watermarks)spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")spark.conf.set("spark.sql.streaming.stateStore.compress", "true")
# Configure checkpointingcheckpoint_path = "s3://bucket/checkpoints/event-aggregation/"State TTL
# State is automatically cleaned up based on watermark# No manual TTL configuration needed
# Example: 5-minute tumbling windows with 10-minute watermarkaggregated_df = events_df \ .withWatermark("timestamp", "10 minutes") \ # State TTL .groupBy( window(col("timestamp"), "5 minutes"), col("user_id") ) \ .count()
# State is retained for: 10 minutes (watermark) + 5 minutes (window size)# Old state is automatically droppedJoin Strategies
Stream-Static Join
# Join streaming data with static dimension table# Common pattern: enrichment
# Static dimension table (loaded once)customers_df = spark.read.parquet("s3://bucket/dims/customers/")
# Stream-stream join (enrichment)enriched_df = events_df \ .join( customers_df, on="customer_id", how="left" # Keep all events ) \ .select( col("event_id"), col("customer_id"), col("customer_name"), col("segment"), col("event_type"), col("timestamp") )
query = enriched_df \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "s3://bucket/output/enriched-events/") \ .option("checkpointLocation", "s3://bucket/checkpoints/enrichment/") \ .trigger(processingTime='1 minute') \ .start()Stream-Stream Join
# Join two streaming sources# Use case: real-time correlation
# Events streamevents_stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka1:9092") \ .option("subscribe", "events") \ .load()
# Enrichment streamenrichment_stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka1:9092") \ .option("subscribe", "enrichment") \ .load()
# Stream-stream join (with watermarks)joined_stream = events_stream \ .withWatermark("timestamp", "10 minutes") \ .join( enrichment_stream \ .withWatermark("timestamp", "10 minutes"), on="user_id", joinType="inner", conditionExpr="events.timestamp BETWEEN enrichment.timestamp - INTERVAL 5 MINUTES AND enrichment.timestamp + INTERVAL 5 MINUTES" )
query = joined_stream \ .writeStream \ .outputMode("append") \ .format("console") \ .start()Sink Options
Console Sink (Testing)
# Console sink: For development and testingquery = events_df \ .writeStream \ .outputMode("append") \ .format("console") \ .option("truncate", "false") \ .trigger(processingTime='10 seconds') \ .start()Memory Sink (Testing)
# Memory sink: For testing in notebooksquery = events_df \ .writeStream \ .outputMode("append") \ .format("memory") \ .queryName("events_table") \ .start()
# Query the in-memory tablespark.sql("SELECT * FROM events_table").show()File Sink (Parquet/JSON)
# File sink: Write to Parquet/JSON/CSVquery = aggregated_df \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "s3://bucket/output/events/") \ .option("checkpointLocation", "s3://bucket/checkpoints/events/") \ .partitionBy("date", "hour") \ .trigger(processingTime='5 minutes') \ .start()Kafka Sink
# Kafka sink: Write back to Kafkaquery = enriched_df \ .selectExpr( "to_json(struct(*)) AS value" ) \ .writeStream \ .outputMode("append") \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \ .option("topic", "enriched-events") \ .option("checkpointLocation", "s3://bucket/checkpoints/kafka-sink/") \ .trigger(processingTime='1 minute') \ .start()Delta Lake Sink (Recommended)
# Delta Lake sink: ACID transactions, time travelfrom delta.tables import DeltaTable
query = enriched_df \ .writeStream \ .outputMode("append") \ .format("delta") \ .option("path", "s3://bucket/delta/events/") \ .option("checkpointLocation", "s3://bucket/checkpoints/delta/") \ .trigger(processingTime='1 minute') \ .start()
# Benefits:# - Exactly-once semantics# - ACID transactions# - Time travel (query previous versions)# - Schema enforcement# - Upsert support (MERGE)Performance Tuning
Trigger Configuration
# Processing time trigger (micro-batches)query = df.writeStream \ .trigger(processingTime='1 minute') \ # Micro-batch interval .start()
# One-time trigger (manual batches)query = df.writeStream \ .trigger(once=True) \ .start()
# Continuous trigger (experimental, low latency)# Requires specific operations (map, filter)query = df.writeStream \ .trigger(continuous='1 second') \ # Checkpoint interval .start()Shuffle Partitions
# Configure shuffle partitionsspark.conf.set("spark.sql.shuffle.partitions", "200")
# For streaming: Use fewer partitions than batch# - More partitions = more overhead# - Fewer partitions = larger tasks
# Auto-coalescespark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")State Store Optimization
# State store configurationspark.conf.set("spark.sql.streaming.stateStore.compression.codec", "zstd")spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true")
# Use RocksDB for large statespark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")Monitoring and Observability
Streaming Query Progress
# Get active streaming queriesspark.streams.active # List active queries
# Get query statusquery = spark.streams.get("query-id")query.status # JSON statusquery.lastProgress # Last micro-batch progressquery.recentProgress # Last 10 micro-batches
# Example output:# {# "id": "query-id",# "runId": "run-id",# "name": "events-aggregation",# "timestamp": "2025-01-27T10:00:00.000Z",# "batchId": 100,# "numInputRows": 1000000,# "inputRowsPerSecond": 10000.0,# "processedRowsPerSecond": 9500.0,# "durationMs": {# "triggerExecution": 1000,# "queryPlanning": 100,# "walCommit": 50# }# }Spark UI
Critical Metrics:
- Input Rate: Rows entering the stream per second
- Processing Rate: Rows processed per second
- Batch Duration: Time to process each micro-batch
- Lag: Difference between event time and processing time
Fault Tolerance
Checkpointing
# Checkpointing enables exactly-once semanticscheckpoint_path = "s3://bucket/checkpoints/my-stream/"
query = events_df \ .writeStream \ .format("delta") \ .option("path", "s3://bucket/output/") \ .option("checkpointLocation", checkpoint_path) \ .start()
# Checkpoint contains:# - Offset log: Which offsets have been processed# - Commit log: Which micro-batches committed# - State snapshots: State store snapshots
# Recovery: Query resumes from last checkpointExactly-Once Semantics
Requirements for Exactly-Once:
- Idempotent Sink: Must handle duplicate writes
- Checkpointing: Track processed offsets
- Transaction Support: Delta Lake, Hudi, Iceberg
Cost Optimization
Resource Management
# Auto-scaling for streamingspark.conf.set("spark.dynamicAllocation.enabled", "true")spark.conf.set("spark.dynamicAllocation.minExecutors", "4")spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")spark.conf.set("spark.dynamicAllocation.initialExecutors", "4")
# Spot instances for streaming (use with checkpointing)spark.conf.set("spark.executor.instances", "10")spark.conf.set("spark.task.maxFailures", "4") # Tolerate spot failures
# Memory tuningspark.conf.set("spark.executor.memory", "8g")spark.conf.set("spark.executor.memoryOverhead", "2g")spark.conf.set("spark.driver.memory", "4g")Data Skipping
# Partitioning for outputquery = aggregated_df \ .withColumn("date", col("window_start").cast("date")) \ .withColumn("hour", hour(col("window_start"))) \ .writeStream \ .format("delta") \ .partitionBy("date", "hour") \ # Partition pruning .option("path", "s3://bucket/output/") \ .start()
# Z-Ordering for Delta Lakefrom delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://bucket/output/")delta_table.optimize() \ .executeZOrderBy("user_id", "event_type")Comparison with Flink
| Feature | Spark Structured Streaming | Flink |
|---|---|---|
| Latency | 100ms - 5s (micro-batch) | 10-50ms (true streaming) |
| State Management | Automatic with watermarks | Manual, more control |
| API | DataFrame/Dataset (SQL-like) | DataStream API (more verbose) |
| Ecosystem | Spark ML, GraphX, SQL | Flink ML, Gelly, Table API |
| Maturity | Very mature | Mature |
| Use Case | Lambda architectures, unified batch/stream | Real-time, low-latency |
When to Choose Spark:
- Need unified batch + streaming (lambda architecture)
- Team already knows Spark
- Lower latency not critical
- Need Spark ML/GraphX integration
When to Choose Flink:
- Sub-second latency required
- Complex stateful operations
- Event time processing critical
- Need more control over state
Best Practices
DO
# 1. Always specify watermark for late data handlingdf = df.withWatermark("timestamp", "10 minutes")
# 2. Use Delta Lake sink for ACID transactionsdf.writeStream.format("delta").start()
# 3. Monitor query progressquery.lastProgress
# 4. Set appropriate trigger interval# - Development: 10 seconds# - Production: 1-5 minutes
# 5. Use checkpointing for fault tolerance.option("checkpointLocation", "s3://bucket/checkpoints/")DON’T
# 1. Don't ignore watermark configuration# Late data will be dropped without watermark
# 2. Don't use complete output mode for unbounded data# Causes memory issues
# 3. Don't forget to handle backpressure# If processing rate < input rate, increase resources
# 4. Don't use state operations without watermark# State grows unbounded
# 5. Don't use complex aggregations without state TTL# Causes OOM errorsKey Takeaways
- Micro-batch processing: 100ms minimum latency (vs. Flink’s 10-50ms)
- Unified API: Same DataFrame API for batch and streaming
- Exactly-once: Built-in with checkpointing and idempotent sinks
- Watermarks: Critical for late data handling and state cleanup
- State management: Automatic with watermarks, manual tuning for large state
- Delta Lake: Best sink for ACID transactions and time travel
- Monitoring: Use Spark UI and query.lastProgress for observability
- Integration: Seamless with Spark ML, GraphX, and batch processing
Back to Module 2