Skip to content

Backpressure in Streaming Systems

Handling Slow Consumers and Overloaded Pipelines


Overview

Backpressure occurs when a streaming system cannot process data as fast as it’s being produced. Without proper handling, backpressure causes unbounded memory growth, crashes, and data loss. This guide covers backpressure detection, mitigation strategies, and prevention techniques.


What is Backpressure?

Backpressure: Mechanism to signal producers to slow down when consumers cannot keep up.

Without Backpressure:

  • Buffers fill up
  • Memory grows unbounded
  • OOM crashes
  • Data loss

With Backpressure:

  • Producer slows down
  • Consumer catches up
  • System remains stable

Backpressure Scenarios

Scenario 1: Slow Sink

Problem: Sink cannot write fast enough (database throttling, slow network)

Example:

# Sink is slow (100 writes/sec)
# Producer is fast (10K events/sec)
# Bad: Buffer grows unbounded
events_df.writeStream \
.foreachBatch(write_to_database) \ # Slow sink
.start()
# Good: Control consumption rate
events_df.writeStream \
.foreachBatch(write_to_database) \
.trigger(processingTime='10 seconds') \ # Slow down consumption
.start()

Scenario 2: Slow Transformation

Problem: Complex operation (ML inference, API calls) slows down pipeline

Example:

# Bad: Blocking API call in stream
def enrich_event(event):
# HTTP call (blocking, slow)
response = requests.get(f"https://api.example.com/{event['user_id']}")
event['user_data'] = response.json()
return event
# Good: Async API call with rate limiting
import asyncio
async def enrich_event_async(event):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{event['user_id']}") as resp:
event['user_data'] = await resp.json()
return event
# Or: Use batch API calls
def enrich_events_batch(events):
# Batch API call (more efficient)
user_ids = [e['user_id'] for e in events]
response = requests.post("https://api.example.com/batch", json=user_ids)
user_data = response.json()
for event, data in zip(events, user_data):
event['user_data'] = data
return events

Scenario 3: Hot Partitions

Problem: Skewed key distribution causes single partition to be hot

Example:

# Bad: Single hot key causes bottleneck
events_df.groupBy("hot_user_id").agg(count("*"))
# Good: Redistribute with salting
events_df = events_df.withColumn(
"salted_key",
concat(col("hot_user_id"), lit("_"), (rand() * 100).cast("int"))
)
result = events_df.groupBy("salted_key").agg(count("*"))

Backpressure Detection

Flink Web UI:

  1. Navigate to running job
  2. Click on “Backpressure” tab
  3. Trigger backpressure detection
  4. View results (OK, HIGH)

Flink REST API:

Terminal window
# Trigger backpressure detection
curl -X POST http://flink-jobmanager:8081/jobs/my-job/backpressure
# Get backpressure status
curl http://flink-jobmanager:8081/jobs/my-job/backpressure
# Response:
{
"backpressure_levels": [
{
"subtask": 0,
"status": "HIGH", # OK, LOW, HIGH
"ratio": 0.95 # 95% backpressure
}
]
}

Flink Metrics:

backpressure_metrics:
- name: "Mailbox size"
metric: task.mailbox.size
alert: "If > 1000"
- name: "Backpressure ratio"
metric: task.backpressure.ratio
alert: "If > 0.5"
- name: "Buffer pool usage"
metric: task.buffer_pool.usage
alert: "If > 0.8"

Spark Monitoring

# Spark Structured Streaming: Monitor query progress
query = spark.streams.get("query-id")
# Get last progress
progress = query.lastProgress
# Key metrics:
print(f"Input rate: {progress['numInputRows'] / progress['durationMs']['triggerExecution'] * 1000:.2f} rows/sec")
print(f"Processing rate: {progress['processedRowsPerSecond']:.2f} rows/sec")
print(f"Batch duration: {progress['durationMs']['triggerExecution'] / 1000:.2f} seconds")
# Backpressure indicator:
# If input rate >> processing rate, backpressure exists

Spark UI:

  1. Navigate to “Streaming” tab
  2. View “Input Rate” vs “Processing Rate”
  3. If input > processing, backpressure exists

Backpressure Mitigation

Strategy 1: Scale Out

Flink:

// Increase parallelism
env.setParallelism(200); // Increase from 100 to 200
// Or: Operator-specific parallelism
dataStream.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum("amount")
.setParallelism(200); // Specific operator

Spark:

# Increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "400") # Increase from 200
# Or: Repartition before expensive operation
events_df = events_df.repartition(400)
result = events_df.groupBy("user_id").agg(count("*"))

Trade-off: More resources = higher cost

Strategy 2: Optimize Operations

Optimization Techniques:

  1. Reduce State Size
// Bad: Store full state
ListState<Event> events = getRuntimeContext().getListState(descriptor);
// Good: Store only aggregates
ValueState<Long> count = getRuntimeContext().getValueState(descriptor);
  1. Use Windowing
# Bad: Process all data
events_df.groupBy("user_id").agg(count("*"))
# Good: Windowed aggregation
events_df \
.withWatermark("timestamp", "1 hour") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("user_id")
) \
.count()
  1. Filter Early
# Bad: Process all data, then filter
events_df \
.groupBy("user_id") \
.agg(count("*")) \
.filter(col("count") > 100)
# Good: Filter first
events_df \
.filter(col("event_type") == "important") \
.groupBy("user_id") \
.count()

Strategy 3: Buffer Management

Flink Buffer Configuration:

// Network buffers
config.setString("taskmanager.memory.network.min", "256m");
config.setString("taskmanager.memory.network.max", "1g");
config.setInteger("taskmanager.network.numberOfBuffers", 2048);
config.setInteger("taskmanager.network.bufferSize", 32768); // 32KB
// Float buffers (for backpressure)
config.setFloat("taskmanager.network.memory.fraction", 0.2);
config.setFloat("taskmanager.memory.fraction", 0.7);

Spark Buffer Configuration:

# Reduce batch size to improve latency
spark.conf.set("spark.streaming.backpressure.enabled", "true")
spark.conf.set("spark.streaming.backpressure.initialRate", "1000")
spark.conf.set("spark.streaming.backpressure.pid.p", "1.0")
spark.conf.set("spark.streaming.backpressure.pid.i", "0.1")
spark.conf.set("spark.streaming.backpressure.pid.d", "0.0")

Strategy 4: Rate Limiting

Kafka Consumer Rate Limiting:

# Limit fetch rate
from pyspark.sql.functions import col, window
# Process in micro-batches (controlled rate)
events_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092") \
.option("subscribe", "events") \
.option("maxOffsetsPerTrigger", "10000") \ # Limit to 10K records per trigger
.load()
query = events_df.writeStream \
.trigger(processingTime='10 seconds') \ # Process every 10 seconds
.format("delta") \
.start()

Kinesis Rate Limiting:

# Configure KCL to limit rate
config = {
'streamName': 'events-stream',
'applicationName': 'event-processor',
'maxRecords': 100, # Limit to 100 records per poll
'idleTimeBetweenReadsInMillis': 1000 # Wait 1 second between polls
}

Backpressure Prevention

Design for Throughput

Capacity Planning:

  1. Measure Peak Throughput
# Historical data: Peak = 10K msg/s
# Design for 2x peak = 20K msg/s
  1. Design for Peak
// Flink: Set parallelism for peak throughput
int parallelism = (int) (peak_throughput / per_task_capacity * 2);
env.setParallelism(parallelism);
  1. Auto-scaling
# Kubernetes: Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: flink-job
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: flink-job
minReplicas: 4
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70

Use Appropriate Operations

Operation Complexity:

OperationStateBackpressure Risk
Map, FilterNoneLow
Window (tumbling)LightLow
Window (session)HeavyHigh
Join (stream-static)MediumMedium
Join (stream-stream)HeavyHigh

Prevention: Use lighter operations when possible

Monitor and Alert

backpressure_alerts:
- name: "High backpressure ratio"
metric: flink.backpressure.ratio
threshold: 0.8 # 80%
action: "Scale out or optimize"
- name: "Input rate >> Processing rate"
metric: spark.input_rate / spark.processing_rate
threshold: 2.0 # 2x difference
action: "Add resources or rate limit"
- name: "Buffer usage > 80%"
metric: task.buffer_pool.usage
threshold: 0.8
action: "Increase buffer size or optimize"

Backpressure in Different Systems

Kafka

Kafka Consumer Lag:

# Measure consumer lag
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['kafka1:9092'],
group_id='my-group',
enable_auto_commit=False
)
# Get current offset
current_offset = consumer.committed(TopicPartition('events', 0))
# Get latest offset
consumer.seek_to_end(TopicPartition('events', 0))
latest_offset = consumer.position(TopicPartition('events', 0))
# Calculate lag
lag = latest_offset - current_offset
print(f"Consumer lag: {lag} messages")
# Alert if lag > 100000

Flink Backpressure Handling:

  • Automatic: Flink handles backpressure natively
  • Monitoring: Use Flink UI to detect
  • Mitigation: Scale out, optimize operations

Spark Structured Streaming

Spark Backpressure Handling:

  • Automatic: Micro-batches handle backpressure
  • Monitoring: Check batch duration vs trigger interval
  • Mitigation: Increase trigger interval, optimize operations

Cost Implications

Backpressure and Cost

Cost Example:

ScenarioCompute CostDurationTotal Cost
No backpressure$10/hour10 hours$100
With backpressure$10/hour30 hours$300
Fixed backpressure$20/hour10 hours$200

Trade-off: Scale out (more cost) vs. Optimize (engineering time)


Best Practices

DO

// 1. Monitor for backpressure
// Use Flink UI or Spark metrics
// 2. Scale out when needed
env.setParallelism(200);
// 3. Optimize slow operations
// Filter early, reduce state, use windowing
// 4. Use rate limiting for uncontrolled sources
.option("maxOffsetsPerTrigger", "10000")
// 5. Design for peak throughput
// 2x peak throughput as headroom

DON’T

// 1. Don't ignore backpressure monitoring
// Leads to crashes and data loss
// 2. Don't scale out indefinitely
// Fix root cause (slow operations)
// 3. Don't use heavy state operations unnecessarily
// Prefer stateless operations
// 4. Don't buffer indefinitely
// Set appropriate buffer sizes
// 5. Don't process all data if not needed
// Filter early to reduce load

Key Takeaways

  1. Backpressure: Occurs when consumer cannot keep up with producer
  2. Detection: Monitor mailbox usage (Flink), batch duration (Spark), consumer lag (Kafka)
  3. Mitigation: Scale out, optimize operations, rate limiting, buffer management
  4. Prevention: Design for peak throughput, use appropriate operations, monitor continuously
  5. Cost: Backpressure increases cost 2-3x (longer processing time)
  6. Flink: Automatic backpressure handling, monitor via UI
  7. Spark: Micro-batches handle backpressure, monitor batch duration
  8. Kafka: Monitor consumer lag, alert on high lag

Back to Module 2