State Management in Streaming Systems
Handling State for Fault-Tolerant Stream Processing
Overview
State management is the cornerstone of reliable stream processing. Stateful operations like aggregations, joins, and windowing require maintaining state across multiple events. This guide covers state management patterns, strategies, and best practices for production streaming systems.
Why State Management Matters
Without State: Stateless operations (map, filter) With State: Stateful operations (aggregations, joins, windowing)
State Types
Keyed State
Keyed State: State partitioned by key (user_id, session_id, etc.)
Example: Count events per user
# FlinkdataStream.keyBy(Event::getUserId) .sum("amount")
# Spark Structured Streamingevents_df.groupBy("user_id").agg(count("*").alias("event_count"))Operator State
Operator State: Local state for each parallel instance
Use Case: Buffer state, connection state
Broadcast State
Broadcast State: Same state across all parallel instances
Use Case: Dynamic rules, configuration updates
State Backend Comparison
State Backend Options
| Backend | Max State | Latency | Use Case | Cost |
|---|---|---|---|---|
| Memory | < 100GB | Lowest | Testing, small state | Low |
| RocksDB | 10TB+ | Low-Medium | Production, large state | Medium |
| HashMap | Unlimited | High | Very large state | High |
Memory State Backend
// Flink: Memory state backendConfiguration config = new Configuration();config.setString("state.backend", "memory");
// Use: Testing, small state (< 100GB)// Pros: Fast, no serialization overhead// Cons: State lost on failure, limited size# Spark: In-memory state (default)# Use: Small aggregations, short windows# Pros: Fast, simple# Cons: Lost on failure, limited by heapWhen to Use:
- Development and testing
- Small state (< 10GB)
- Short window sizes (< 1 hour)
- State can be recomputed from source
RocksDB State Backend
// Flink: RocksDB state backend (recommended for production)Configuration config = new Configuration();config.setString("state.backend", "rocksdb");config.setString("state.checkpoints.dir", "s3://flink/checkpoints/");config.setLong("state.checkpoints.num-retained", "3");
// ConfigurationRocksDBStateBackend backend = new RocksDBStateBackend("s3://flink/checkpoints/");backend.setNumberOfTransferThreads(4);backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
// Use: Production, large state (10TB+)// Pros: Scalable, fault-tolerant, spills to disk// Cons: Higher latency, serialization overheadWhen to Use:
- Production workloads
- Large state (> 100GB)
- Long windows (hours to days)
- Need fault tolerance
Cost Optimization:
- Use local SSD for RocksDB data directory
- Configure incremental checkpoints (reduce S3 writes)
- Tune block cache and write buffer sizes
State Size Management
State TTL (Time-To-Live)
Flink State TTL:
import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.time.Time;
// Configure TTLStateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.hours(24)) // 24 hour TTL .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // Update on write .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // Don't return expired .cleanupInRocksdbCompactFilter(1000) // Cleanup during RocksDB compaction .build();
// Apply to state descriptorValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( "eventCount", Long.class);descriptor.enableTimeToLive(ttlConfig);Spark Watermark TTL:
# Spark uses watermarks for state cleanupevents_df = events_df.withWatermark("timestamp", "24 hours")
# State is retained for: watermark + window size# Old state is automatically droppedState Partitioning
Problem: Hot keys cause state imbalance
Solution 1: Salting
# Add salt to keyfrom pyspark.sql.functions import concat, lit, rand
events_df = events_df.withColumn( "salted_key", concat(col("user_id"), lit("_"), (rand() * 100).cast("int")))
# Processresult = events_df.groupBy("salted_key").agg(count("*"))
# Remove saltresult = result.withColumn( "user_id", split(col("salted_key"), "_")[0])Solution 2: Custom Partitioner (Flink)
// Custom partitioner to redistribute hot keysdataStream.partitionCustom(new CustomPartitioner(), key);Checkpointing and Savepoints
Checkpointing
Checkpointing: Periodic snapshots of state and offsets
Flink Checkpointing:
// Enable checkpointingenv.enableCheckpointing( 60000, // Checkpoint interval (1 minute) CheckpointingMode.EXACTLY_ONCE // Exactly-once semantics);
// Configure checkpointingCheckpointConfig config = env.getCheckpointConfig();config.setMinPauseBetweenCheckpoints(30000); // 30 secondsconfig.setCheckpointTimeout(600000); // 10 minutesconfig.setMaxConcurrentCheckpoints(1); // Only 1 at a timeconfig.enableUnalignedCheckpoints(true); // Faster checkpointsconfig.setAlignedCheckpointTimeout(60000); // Fall back to unaligned after 60s
// Retain checkpointsconfig.setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION // Keep on cancellation);Spark Checkpointing:
# Checkpointing is automatic with Structured Streamingquery = events_df.writeStream \ .format("delta") \ .option("checkpointLocation", "s3://bucket/checkpoints/") \ .start()
# Checkpoint contains:# - Offset log: Which offsets processed# - Commit log: Which batches committed# - State snapshots: State storeSavepoints
Savepoints: Manual checkpoints for job upgrades
// Flink: Trigger savepointcurl -X POST http://flink-jobmanager:8081/jobs/my-job/savepoints \ -H "Content-Type: application/json" \ -d '{"savepoint": "s3://flink/savepoints/my-savepoint/"}'
// Restore from savepointenv.restoreSavepoint("s3://flink/savepoints/my-savepoint/");
// Use cases:// 1. Job upgrades (code changes)// 2. Parallelism changes (scaling)// 3. A/B testing (fork job)// 4. Disaster recovery (manual restore)Spark: No savepoints equivalent (use checkpointing)
State Recovery
Failure Scenarios
Recovery Strategy:
- Load state: From checkpoint/savepoint
- Reset offsets: To checkpoint offsets
- Resume processing: Continue from last committed
Flink Recovery:
// High availability for Job Managerconfig.setString("high-availability", "zookeeper");config.setString("high-availability.zookeeper.path.root", "/flink");config.setString("high-availability.cluster-id", "my-cluster");config.setString("high-availability.storageDir", "s3://flink/ha/");
// On failure, Flink automatically:// 1. Restores state from last checkpoint// 2. Resets offsets to checkpoint// 3. Resumes processingSpark Recovery:
# Automatic recovery from checkpoint# On failure, Spark automatically:# 1. Restores state from checkpoint# 2. Resets offsets to checkpoint# 3. Resumes processing
# No configuration needed (automatic)State Performance Tuning
State Size Optimization
Problem: Large state causes slow checkpoints and high memory usage
Solutions:
- Reduce State Size
# Bad: Store full event objects in statestate.update(event)
# Good: Store only aggregatescount = state.value() or 0state.update(count + 1)- Use Appropriate Data Types
// Bad: Use String for numeric stateValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>( "count", String.class);
// Good: Use Long for numeric stateValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( "count", Long.class);- Compress State
// Enable compression for checkpointsconfig.setBoolean("state.savepoints.compression", true);Checkpoint Tuning
Flink Checkpoint Tuning:
// Unaligned checkpoints (faster, more overhead)env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// Incremental checkpoints (only store changes)config.setString("state.backend.incremental", "true");
// Local recovery (faster restart)config.setString("state.backend.local-recovery", "true");config.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/");config.setString("state.savepoints.dir", "s3://flink/savepoints/");Spark Checkpoint Tuning:
# Reduce checkpoint frequency (trades off recovery time)spark.conf.set("spark.sql.streaming.checkpointLocation", "s3://bucket/checkpoints/")
# Use faster storage (EFS vs. S3)# EFS: Faster, more expensive# S3: Slower, cheaper, more durableState Monitoring
Critical Metrics
state_metrics: - name: "State size" metric: flink.job.metrics.state_size alert: "If > 10GB"
- name: "Checkpoint duration" metric: flink.job.metrics.checkpoint_duration alert: "If > 1 minute"
- name: "Checkpoint failure rate" metric: flink.job.metrics.checkpoint_failures alert: "If > 0"
- name: "State access latency" metric: flink.job.metrics.state_access_latency alert: "If > 100ms"
- name: "State backend recovery time" metric: flink.job.metrics.recovery_time alert: "If > 5 minutes"Monitoring State Size
Flink:
// Get state size via REST APIcurl http://flink-jobmanager:8081/jobs/my-job/checkpoints
// Output:{ "id": "12345", "status": "COMPLETED", "duration": 45000, "state_size": 5368709120 // 5GB}Spark:
# Get state size from Spark UI# Streaming Tab → Statistics → State Size
# Or programmaticallyquery = spark.streams.get("query-id")status = query.status()# status contains: stateSize, numRows, etc.State Anti-Patterns
Anti-Pattern 1: Unbounded State Growth
// Bad: State grows without boundpublic class UserFunction extends KeyedProcessFunction<String, Event, Result> { private ListState<Event> userEvents;
@Override public void processElement(Event event, Context ctx) { // Add to state (never removed) userEvents.add(event); }}
// Good: Use TTL or windowingpublic class UserFunction extends KeyedProcessFunction<String, Event, Result> { private ValueState<Long> eventCount;
@Override public void open(Configuration parameters) { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.hours(24)) .build();
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( "eventCount", Long.class ); descriptor.enableTimeToLive(ttlConfig); eventCount = getRuntimeContext().getState(descriptor); }}Anti-Pattern 2: Hot Keys
# Bad: Single hot key causes state imbalanceevents_df.groupBy("hot_user_id").agg(count("*"))
# Good: Redistribute with saltingevents_df = events_df.withColumn( "salted_key", concat(col("hot_user_id"), lit("_"), (rand() * 100).cast("int")))result = events_df.groupBy("salted_key").agg(count("*"))Anti-Pattern 3: No Checkpointing
// Bad: No checkpointing (job failure = lose all state)env.disableCheckpointing();
// Good: Enable checkpointingenv.enableCheckpointing(60000); // 1 minuteCost Optimization
State Storage Cost
Cost Trade-offs:
| Option | Cost | Performance | Use Case |
|---|---|---|---|
| Local SSD | $0.10/GB/month | Fastest | Active state (RocksDB) |
| HDFS | $0.03/GB/month | Fast | Checkpoints (on-prem) |
| S3 | $0.023/GB/month | Medium | Checkpoints (cloud) |
Optimization Strategies:
- Use RocksDB: Spills to local SSD (cheaper than memory)
- Incremental Checkpoints: Only store changes (reduce S3 writes)
- TTL Configuration: Auto-cleanup old state (reduce storage)
- Local Recovery: Faster recovery (reduce downtime cost)
Best Practices
DO
// 1. Use RocksDB for productionconfig.setString("state.backend", "rocksdb");
// 2. Enable checkpointingenv.enableCheckpointing(60000);
// 3. Configure TTL for all stateStateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24)).build();descriptor.enableTimeToLive(ttlConfig);
// 4. Monitor state size// Alert if > 10GB
// 5. Use unaligned checkpoints for slow backpressureenv.getCheckpointConfig().enableUnalignedCheckpoints(true);DON’T
// 1. Don't ignore state size// State grows unbounded → OOM
// 2. Don't use memory state backend in production// State lost on failure
// 3. Don't forget checkpointing// Job failure = lose all state
// 4. Don't use state for simple operations// Use stateless operations when possible
// 5. Don't ignore hot keys// Causes state imbalance and slow processingKey Takeaways
- State is critical: Required for aggregations, joins, windowing
- State types: Keyed, operator, broadcast
- State backends: Memory (testing), RocksDB (production)
- Checkpointing: Enables fault tolerance and exactly-once
- Savepoints: Manual checkpoints for upgrades
- TTL: Auto-cleanup old state to prevent unbounded growth
- Monitoring: Track state size, checkpoint duration, failures
- Cost: RocksDB + local SSD = best performance/cost tradeoff
Back to Module 2