Skip to content

State Management in Streaming Systems

Handling State for Fault-Tolerant Stream Processing


Overview

State management is the cornerstone of reliable stream processing. Stateful operations like aggregations, joins, and windowing require maintaining state across multiple events. This guide covers state management patterns, strategies, and best practices for production streaming systems.


Why State Management Matters

Without State: Stateless operations (map, filter) With State: Stateful operations (aggregations, joins, windowing)


State Types

Keyed State

Keyed State: State partitioned by key (user_id, session_id, etc.)

Example: Count events per user

# Flink
dataStream.keyBy(Event::getUserId)
.sum("amount")
# Spark Structured Streaming
events_df.groupBy("user_id").agg(count("*").alias("event_count"))

Operator State

Operator State: Local state for each parallel instance

Use Case: Buffer state, connection state

Broadcast State

Broadcast State: Same state across all parallel instances

Use Case: Dynamic rules, configuration updates


State Backend Comparison

State Backend Options

BackendMax StateLatencyUse CaseCost
Memory< 100GBLowestTesting, small stateLow
RocksDB10TB+Low-MediumProduction, large stateMedium
HashMapUnlimitedHighVery large stateHigh

Memory State Backend

// Flink: Memory state backend
Configuration config = new Configuration();
config.setString("state.backend", "memory");
// Use: Testing, small state (< 100GB)
// Pros: Fast, no serialization overhead
// Cons: State lost on failure, limited size
# Spark: In-memory state (default)
# Use: Small aggregations, short windows
# Pros: Fast, simple
# Cons: Lost on failure, limited by heap

When to Use:

  • Development and testing
  • Small state (< 10GB)
  • Short window sizes (< 1 hour)
  • State can be recomputed from source

RocksDB State Backend

// Flink: RocksDB state backend (recommended for production)
Configuration config = new Configuration();
config.setString("state.backend", "rocksdb");
config.setString("state.checkpoints.dir", "s3://flink/checkpoints/");
config.setLong("state.checkpoints.num-retained", "3");
// Configuration
RocksDBStateBackend backend = new RocksDBStateBackend("s3://flink/checkpoints/");
backend.setNumberOfTransferThreads(4);
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
// Use: Production, large state (10TB+)
// Pros: Scalable, fault-tolerant, spills to disk
// Cons: Higher latency, serialization overhead

When to Use:

  • Production workloads
  • Large state (> 100GB)
  • Long windows (hours to days)
  • Need fault tolerance

Cost Optimization:

  • Use local SSD for RocksDB data directory
  • Configure incremental checkpoints (reduce S3 writes)
  • Tune block cache and write buffer sizes

State Size Management

State TTL (Time-To-Live)

Flink State TTL:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
// Configure TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // 24 hour TTL
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // Update on write
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // Don't return expired
.cleanupInRocksdbCompactFilter(1000) // Cleanup during RocksDB compaction
.build();
// Apply to state descriptor
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"eventCount",
Long.class
);
descriptor.enableTimeToLive(ttlConfig);

Spark Watermark TTL:

# Spark uses watermarks for state cleanup
events_df = events_df.withWatermark("timestamp", "24 hours")
# State is retained for: watermark + window size
# Old state is automatically dropped

State Partitioning

Problem: Hot keys cause state imbalance

Solution 1: Salting

# Add salt to key
from pyspark.sql.functions import concat, lit, rand
events_df = events_df.withColumn(
"salted_key",
concat(col("user_id"), lit("_"), (rand() * 100).cast("int"))
)
# Process
result = events_df.groupBy("salted_key").agg(count("*"))
# Remove salt
result = result.withColumn(
"user_id",
split(col("salted_key"), "_")[0]
)

Solution 2: Custom Partitioner (Flink)

// Custom partitioner to redistribute hot keys
dataStream.partitionCustom(new CustomPartitioner(), key);

Checkpointing and Savepoints

Checkpointing

Checkpointing: Periodic snapshots of state and offsets

Flink Checkpointing:

// Enable checkpointing
env.enableCheckpointing(
60000, // Checkpoint interval (1 minute)
CheckpointingMode.EXACTLY_ONCE // Exactly-once semantics
);
// Configure checkpointing
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(30000); // 30 seconds
config.setCheckpointTimeout(600000); // 10 minutes
config.setMaxConcurrentCheckpoints(1); // Only 1 at a time
config.enableUnalignedCheckpoints(true); // Faster checkpoints
config.setAlignedCheckpointTimeout(60000); // Fall back to unaligned after 60s
// Retain checkpoints
config.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION // Keep on cancellation
);

Spark Checkpointing:

# Checkpointing is automatic with Structured Streaming
query = events_df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://bucket/checkpoints/") \
.start()
# Checkpoint contains:
# - Offset log: Which offsets processed
# - Commit log: Which batches committed
# - State snapshots: State store

Savepoints

Savepoints: Manual checkpoints for job upgrades

// Flink: Trigger savepoint
curl -X POST http://flink-jobmanager:8081/jobs/my-job/savepoints \
-H "Content-Type: application/json" \
-d '{"savepoint": "s3://flink/savepoints/my-savepoint/"}'
// Restore from savepoint
env.restoreSavepoint("s3://flink/savepoints/my-savepoint/");
// Use cases:
// 1. Job upgrades (code changes)
// 2. Parallelism changes (scaling)
// 3. A/B testing (fork job)
// 4. Disaster recovery (manual restore)

Spark: No savepoints equivalent (use checkpointing)


State Recovery

Failure Scenarios

Recovery Strategy:

  1. Load state: From checkpoint/savepoint
  2. Reset offsets: To checkpoint offsets
  3. Resume processing: Continue from last committed

Flink Recovery:

// High availability for Job Manager
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.path.root", "/flink");
config.setString("high-availability.cluster-id", "my-cluster");
config.setString("high-availability.storageDir", "s3://flink/ha/");
// On failure, Flink automatically:
// 1. Restores state from last checkpoint
// 2. Resets offsets to checkpoint
// 3. Resumes processing

Spark Recovery:

# Automatic recovery from checkpoint
# On failure, Spark automatically:
# 1. Restores state from checkpoint
# 2. Resets offsets to checkpoint
# 3. Resumes processing
# No configuration needed (automatic)

State Performance Tuning

State Size Optimization

Problem: Large state causes slow checkpoints and high memory usage

Solutions:

  1. Reduce State Size
# Bad: Store full event objects in state
state.update(event)
# Good: Store only aggregates
count = state.value() or 0
state.update(count + 1)
  1. Use Appropriate Data Types
// Bad: Use String for numeric state
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>(
"count",
String.class
);
// Good: Use Long for numeric state
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"count",
Long.class
);
  1. Compress State
// Enable compression for checkpoints
config.setBoolean("state.savepoints.compression", true);

Checkpoint Tuning

Flink Checkpoint Tuning:

// Unaligned checkpoints (faster, more overhead)
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// Incremental checkpoints (only store changes)
config.setString("state.backend.incremental", "true");
// Local recovery (faster restart)
config.setString("state.backend.local-recovery", "true");
config.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/");
config.setString("state.savepoints.dir", "s3://flink/savepoints/");

Spark Checkpoint Tuning:

# Reduce checkpoint frequency (trades off recovery time)
spark.conf.set("spark.sql.streaming.checkpointLocation", "s3://bucket/checkpoints/")
# Use faster storage (EFS vs. S3)
# EFS: Faster, more expensive
# S3: Slower, cheaper, more durable

State Monitoring

Critical Metrics

state_metrics:
- name: "State size"
metric: flink.job.metrics.state_size
alert: "If > 10GB"
- name: "Checkpoint duration"
metric: flink.job.metrics.checkpoint_duration
alert: "If > 1 minute"
- name: "Checkpoint failure rate"
metric: flink.job.metrics.checkpoint_failures
alert: "If > 0"
- name: "State access latency"
metric: flink.job.metrics.state_access_latency
alert: "If > 100ms"
- name: "State backend recovery time"
metric: flink.job.metrics.recovery_time
alert: "If > 5 minutes"

Monitoring State Size

Flink:

// Get state size via REST API
curl http://flink-jobmanager:8081/jobs/my-job/checkpoints
// Output:
{
"id": "12345",
"status": "COMPLETED",
"duration": 45000,
"state_size": 5368709120 // 5GB
}

Spark:

# Get state size from Spark UI
# Streaming Tab → Statistics → State Size
# Or programmatically
query = spark.streams.get("query-id")
status = query.status()
# status contains: stateSize, numRows, etc.

State Anti-Patterns

Anti-Pattern 1: Unbounded State Growth

// Bad: State grows without bound
public class UserFunction extends KeyedProcessFunction<String, Event, Result> {
private ListState<Event> userEvents;
@Override
public void processElement(Event event, Context ctx) {
// Add to state (never removed)
userEvents.add(event);
}
}
// Good: Use TTL or windowing
public class UserFunction extends KeyedProcessFunction<String, Event, Result> {
private ValueState<Long> eventCount;
@Override
public void open(Configuration parameters) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.build();
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"eventCount",
Long.class
);
descriptor.enableTimeToLive(ttlConfig);
eventCount = getRuntimeContext().getState(descriptor);
}
}

Anti-Pattern 2: Hot Keys

# Bad: Single hot key causes state imbalance
events_df.groupBy("hot_user_id").agg(count("*"))
# Good: Redistribute with salting
events_df = events_df.withColumn(
"salted_key",
concat(col("hot_user_id"), lit("_"), (rand() * 100).cast("int"))
)
result = events_df.groupBy("salted_key").agg(count("*"))

Anti-Pattern 3: No Checkpointing

// Bad: No checkpointing (job failure = lose all state)
env.disableCheckpointing();
// Good: Enable checkpointing
env.enableCheckpointing(60000); // 1 minute

Cost Optimization

State Storage Cost

Cost Trade-offs:

OptionCostPerformanceUse Case
Local SSD$0.10/GB/monthFastestActive state (RocksDB)
HDFS$0.03/GB/monthFastCheckpoints (on-prem)
S3$0.023/GB/monthMediumCheckpoints (cloud)

Optimization Strategies:

  1. Use RocksDB: Spills to local SSD (cheaper than memory)
  2. Incremental Checkpoints: Only store changes (reduce S3 writes)
  3. TTL Configuration: Auto-cleanup old state (reduce storage)
  4. Local Recovery: Faster recovery (reduce downtime cost)

Best Practices

DO

// 1. Use RocksDB for production
config.setString("state.backend", "rocksdb");
// 2. Enable checkpointing
env.enableCheckpointing(60000);
// 3. Configure TTL for all state
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24)).build();
descriptor.enableTimeToLive(ttlConfig);
// 4. Monitor state size
// Alert if > 10GB
// 5. Use unaligned checkpoints for slow backpressure
env.getCheckpointConfig().enableUnalignedCheckpoints(true);

DON’T

// 1. Don't ignore state size
// State grows unbounded → OOM
// 2. Don't use memory state backend in production
// State lost on failure
// 3. Don't forget checkpointing
// Job failure = lose all state
// 4. Don't use state for simple operations
// Use stateless operations when possible
// 5. Don't ignore hot keys
// Causes state imbalance and slow processing

Key Takeaways

  1. State is critical: Required for aggregations, joins, windowing
  2. State types: Keyed, operator, broadcast
  3. State backends: Memory (testing), RocksDB (production)
  4. Checkpointing: Enables fault tolerance and exactly-once
  5. Savepoints: Manual checkpoints for upgrades
  6. TTL: Auto-cleanup old state to prevent unbounded growth
  7. Monitoring: Track state size, checkpoint duration, failures
  8. Cost: RocksDB + local SSD = best performance/cost tradeoff

Back to Module 2