Backpressure in Streaming Systems
Handling Slow Consumers and Overloaded Pipelines
Overview
Backpressure occurs when a streaming system cannot process data as fast as it’s being produced. Without proper handling, backpressure causes unbounded memory growth, crashes, and data loss. This guide covers backpressure detection, mitigation strategies, and prevention techniques.
What is Backpressure?
Backpressure: Mechanism to signal producers to slow down when consumers cannot keep up.
Without Backpressure:
- Buffers fill up
- Memory grows unbounded
- OOM crashes
- Data loss
With Backpressure:
- Producer slows down
- Consumer catches up
- System remains stable
Backpressure Scenarios
Scenario 1: Slow Sink
Problem: Sink cannot write fast enough (database throttling, slow network)
Example:
# Sink is slow (100 writes/sec)# Producer is fast (10K events/sec)
# Bad: Buffer grows unboundedevents_df.writeStream \ .foreachBatch(write_to_database) \ # Slow sink .start()
# Good: Control consumption rateevents_df.writeStream \ .foreachBatch(write_to_database) \ .trigger(processingTime='10 seconds') \ # Slow down consumption .start()Scenario 2: Slow Transformation
Problem: Complex operation (ML inference, API calls) slows down pipeline
Example:
# Bad: Blocking API call in streamdef enrich_event(event): # HTTP call (blocking, slow) response = requests.get(f"https://api.example.com/{event['user_id']}") event['user_data'] = response.json() return event
# Good: Async API call with rate limitingimport asyncio
async def enrich_event_async(event): async with aiohttp.ClientSession() as session: async with session.get(f"https://api.example.com/{event['user_id']}") as resp: event['user_data'] = await resp.json() return event
# Or: Use batch API callsdef enrich_events_batch(events): # Batch API call (more efficient) user_ids = [e['user_id'] for e in events] response = requests.post("https://api.example.com/batch", json=user_ids) user_data = response.json()
for event, data in zip(events, user_data): event['user_data'] = data
return eventsScenario 3: Hot Partitions
Problem: Skewed key distribution causes single partition to be hot
Example:
# Bad: Single hot key causes bottleneckevents_df.groupBy("hot_user_id").agg(count("*"))
# Good: Redistribute with saltingevents_df = events_df.withColumn( "salted_key", concat(col("hot_user_id"), lit("_"), (rand() * 100).cast("int")))result = events_df.groupBy("salted_key").agg(count("*"))Backpressure Detection
Flink Monitoring
Flink Web UI:
- Navigate to running job
- Click on “Backpressure” tab
- Trigger backpressure detection
- View results (OK, HIGH)
Flink REST API:
# Trigger backpressure detectioncurl -X POST http://flink-jobmanager:8081/jobs/my-job/backpressure
# Get backpressure statuscurl http://flink-jobmanager:8081/jobs/my-job/backpressure
# Response:{ "backpressure_levels": [ { "subtask": 0, "status": "HIGH", # OK, LOW, HIGH "ratio": 0.95 # 95% backpressure } ]}Flink Metrics:
backpressure_metrics: - name: "Mailbox size" metric: task.mailbox.size alert: "If > 1000"
- name: "Backpressure ratio" metric: task.backpressure.ratio alert: "If > 0.5"
- name: "Buffer pool usage" metric: task.buffer_pool.usage alert: "If > 0.8"Spark Monitoring
# Spark Structured Streaming: Monitor query progressquery = spark.streams.get("query-id")
# Get last progressprogress = query.lastProgress
# Key metrics:print(f"Input rate: {progress['numInputRows'] / progress['durationMs']['triggerExecution'] * 1000:.2f} rows/sec")print(f"Processing rate: {progress['processedRowsPerSecond']:.2f} rows/sec")print(f"Batch duration: {progress['durationMs']['triggerExecution'] / 1000:.2f} seconds")
# Backpressure indicator:# If input rate >> processing rate, backpressure existsSpark UI:
- Navigate to “Streaming” tab
- View “Input Rate” vs “Processing Rate”
- If input > processing, backpressure exists
Backpressure Mitigation
Strategy 1: Scale Out
Flink:
// Increase parallelismenv.setParallelism(200); // Increase from 100 to 200
// Or: Operator-specific parallelismdataStream.keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .sum("amount") .setParallelism(200); // Specific operatorSpark:
# Increase shuffle partitionsspark.conf.set("spark.sql.shuffle.partitions", "400") # Increase from 200
# Or: Repartition before expensive operationevents_df = events_df.repartition(400)result = events_df.groupBy("user_id").agg(count("*"))Trade-off: More resources = higher cost
Strategy 2: Optimize Operations
Optimization Techniques:
- Reduce State Size
// Bad: Store full stateListState<Event> events = getRuntimeContext().getListState(descriptor);
// Good: Store only aggregatesValueState<Long> count = getRuntimeContext().getValueState(descriptor);- Use Windowing
# Bad: Process all dataevents_df.groupBy("user_id").agg(count("*"))
# Good: Windowed aggregationevents_df \ .withWatermark("timestamp", "1 hour") \ .groupBy( window(col("timestamp"), "5 minutes"), col("user_id") ) \ .count()- Filter Early
# Bad: Process all data, then filterevents_df \ .groupBy("user_id") \ .agg(count("*")) \ .filter(col("count") > 100)
# Good: Filter firstevents_df \ .filter(col("event_type") == "important") \ .groupBy("user_id") \ .count()Strategy 3: Buffer Management
Flink Buffer Configuration:
// Network buffersconfig.setString("taskmanager.memory.network.min", "256m");config.setString("taskmanager.memory.network.max", "1g");config.setInteger("taskmanager.network.numberOfBuffers", 2048);config.setInteger("taskmanager.network.bufferSize", 32768); // 32KB
// Float buffers (for backpressure)config.setFloat("taskmanager.network.memory.fraction", 0.2);config.setFloat("taskmanager.memory.fraction", 0.7);Spark Buffer Configuration:
# Reduce batch size to improve latencyspark.conf.set("spark.streaming.backpressure.enabled", "true")spark.conf.set("spark.streaming.backpressure.initialRate", "1000")spark.conf.set("spark.streaming.backpressure.pid.p", "1.0")spark.conf.set("spark.streaming.backpressure.pid.i", "0.1")spark.conf.set("spark.streaming.backpressure.pid.d", "0.0")Strategy 4: Rate Limiting
Kafka Consumer Rate Limiting:
# Limit fetch ratefrom pyspark.sql.functions import col, window
# Process in micro-batches (controlled rate)events_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka1:9092") \ .option("subscribe", "events") \ .option("maxOffsetsPerTrigger", "10000") \ # Limit to 10K records per trigger .load()
query = events_df.writeStream \ .trigger(processingTime='10 seconds') \ # Process every 10 seconds .format("delta") \ .start()Kinesis Rate Limiting:
# Configure KCL to limit rateconfig = { 'streamName': 'events-stream', 'applicationName': 'event-processor', 'maxRecords': 100, # Limit to 100 records per poll 'idleTimeBetweenReadsInMillis': 1000 # Wait 1 second between polls}Backpressure Prevention
Design for Throughput
Capacity Planning:
- Measure Peak Throughput
# Historical data: Peak = 10K msg/s# Design for 2x peak = 20K msg/s- Design for Peak
// Flink: Set parallelism for peak throughputint parallelism = (int) (peak_throughput / per_task_capacity * 2);env.setParallelism(parallelism);- Auto-scaling
# Kubernetes: Horizontal Pod AutoscalerapiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: flink-jobspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: flink-job minReplicas: 4 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70Use Appropriate Operations
Operation Complexity:
| Operation | State | Backpressure Risk |
|---|---|---|
| Map, Filter | None | Low |
| Window (tumbling) | Light | Low |
| Window (session) | Heavy | High |
| Join (stream-static) | Medium | Medium |
| Join (stream-stream) | Heavy | High |
Prevention: Use lighter operations when possible
Monitor and Alert
backpressure_alerts: - name: "High backpressure ratio" metric: flink.backpressure.ratio threshold: 0.8 # 80% action: "Scale out or optimize"
- name: "Input rate >> Processing rate" metric: spark.input_rate / spark.processing_rate threshold: 2.0 # 2x difference action: "Add resources or rate limit"
- name: "Buffer usage > 80%" metric: task.buffer_pool.usage threshold: 0.8 action: "Increase buffer size or optimize"Backpressure in Different Systems
Kafka
Kafka Consumer Lag:
# Measure consumer lagfrom kafka import KafkaConsumerfrom kafka.structs import TopicPartition
consumer = KafkaConsumer( bootstrap_servers=['kafka1:9092'], group_id='my-group', enable_auto_commit=False)
# Get current offsetcurrent_offset = consumer.committed(TopicPartition('events', 0))
# Get latest offsetconsumer.seek_to_end(TopicPartition('events', 0))latest_offset = consumer.position(TopicPartition('events', 0))
# Calculate laglag = latest_offset - current_offset
print(f"Consumer lag: {lag} messages")# Alert if lag > 100000Flink
Flink Backpressure Handling:
- Automatic: Flink handles backpressure natively
- Monitoring: Use Flink UI to detect
- Mitigation: Scale out, optimize operations
Spark Structured Streaming
Spark Backpressure Handling:
- Automatic: Micro-batches handle backpressure
- Monitoring: Check batch duration vs trigger interval
- Mitigation: Increase trigger interval, optimize operations
Cost Implications
Backpressure and Cost
Cost Example:
| Scenario | Compute Cost | Duration | Total Cost |
|---|---|---|---|
| No backpressure | $10/hour | 10 hours | $100 |
| With backpressure | $10/hour | 30 hours | $300 |
| Fixed backpressure | $20/hour | 10 hours | $200 |
Trade-off: Scale out (more cost) vs. Optimize (engineering time)
Best Practices
DO
// 1. Monitor for backpressure// Use Flink UI or Spark metrics
// 2. Scale out when neededenv.setParallelism(200);
// 3. Optimize slow operations// Filter early, reduce state, use windowing
// 4. Use rate limiting for uncontrolled sources.option("maxOffsetsPerTrigger", "10000")
// 5. Design for peak throughput// 2x peak throughput as headroomDON’T
// 1. Don't ignore backpressure monitoring// Leads to crashes and data loss
// 2. Don't scale out indefinitely// Fix root cause (slow operations)
// 3. Don't use heavy state operations unnecessarily// Prefer stateless operations
// 4. Don't buffer indefinitely// Set appropriate buffer sizes
// 5. Don't process all data if not needed// Filter early to reduce loadKey Takeaways
- Backpressure: Occurs when consumer cannot keep up with producer
- Detection: Monitor mailbox usage (Flink), batch duration (Spark), consumer lag (Kafka)
- Mitigation: Scale out, optimize operations, rate limiting, buffer management
- Prevention: Design for peak throughput, use appropriate operations, monitor continuously
- Cost: Backpressure increases cost 2-3x (longer processing time)
- Flink: Automatic backpressure handling, monitor via UI
- Spark: Micro-batches handle backpressure, monitor batch duration
- Kafka: Monitor consumer lag, alert on high lag
Back to Module 2