Windowing Strategies
Tumbling, Sliding, Session Windows and Watermarks
Overview
Windowing is fundamental to streaming analytics. It enables aggregations over finite subsets of infinite data streams. This document covers windowing strategies, watermarks for handling late data, and implementation in both Flink and Spark Structured Streaming.
Window Types
Tumbling Windows
Concept
Characteristics:
- Fixed-size, non-overlapping windows
- Each event belongs to exactly one window
- Deterministic (easy to reason about)
- Most common window type
Use Cases
- Per-minute/hour/day metrics
- Periodic reporting
- Simple aggregations
Flink Implementation
DataStream<Event> events = ...;
// Tumbling window of 5 minutesDataStream<Aggregated> result = events .keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .aggregate(new AggregateFunction());
// With offset (e.g., windows start at :00, :05, :10) .window(TumblingEventTimeWindows.of( Time.minutes(5), Time.minutes(0) // Offset ))Spark Implementation
from pyspark.sql.functions import window
df = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load()
# Tumbling window of 5 minuteswindowed = df.groupBy( window(col("timestamp"), "5 minutes"), col("key")).agg( count("*").alias("count"), sum("value").alias("total"))Sliding Windows
Concept
Characteristics:
- Fixed-size, overlapping windows
- Each event can belong to multiple windows
- Slide interval controls overlap
- Higher computational cost
Use Cases
- Moving averages
- Trend analysis
- Anomaly detection over time windows
Flink Implementation
// Sliding window: 5 minutes size, 1 minute slideDataStream<Aggregated> result = events .keyBy(Event::getKey) .window(SlidingEventTimeWindows.of( Time.minutes(5), // Window size Time.minutes(1) // Slide interval )) .aggregate(new AggregateFunction());
// Note: This creates 5x the windows of tumbling!Spark Implementation
# Sliding window: 5 minutes size, 1 minute slide# In Spark: Use multiple tumbling windows with different offsetsfrom pyspark.sql.functions import window, col
# Create 5 windows offset by 1 minute eachresults = []for offset in range(5): windowed = df.groupBy( window( col("timestamp"), "5 minutes", f"{offset} minute" ), col("key") ).agg(count("*").alias("count")) results.append(windowed)
# Union results (inefficient in Spark)# Alternative: Use session windows or custom logicNote: Spark doesn’t support sliding windows efficiently. Consider Flink for true sliding windows.
Session Windows
Concept
Characteristics:
- Dynamic windows based on activity
- Gap timeout determines session boundaries
- Captures user sessions naturally
- Variable window sizes
Use Cases
- User session analytics
- Website/app session tracking
- Activity-based grouping
- Clickstream analysis
Flink Implementation
// Session window with 15 minute gapDataStream<Aggregated> result = events .keyBy(Event::getUserId) .window(EventTimeSessionWindows.withGap(Time.minutes(15))) .aggregate(new AggregateFunction());
// Session windows merge when events arrive within gapSpark Implementation
from pyspark.sql.functions import session_window
# Session window with 15 minute gapwindowed = df.groupBy( session_window(col("timestamp"), "15 minutes"), col("user_id")).agg( count("*").alias("event_count"), min("timestamp").alias("session_start"), max("timestamp").alias("session_end"))Watermarks and Late Data
The Problem of Late Data
Watermark Strategy
Watermark: Timestamp indicating when we believe all events for a given window have arrived.
Lateness tolerance: How long to wait for late events before closing window.
Flink Implementation
// Watermark strategy: 5 minute lateness toleranceDataStream<Event> events = env .addSource(kafkaSource) .assignTimestampsAndWatermarks( WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofMinutes(5)) .withTimestampAssigner((event, timestamp) -> event.getEventTime()) .withIdleness(Duration.ofMinutes(1)) // Handle idle partitions );
// Side output for late dataOutputTag<Event> lateTag = new OutputTag<Event>("late-data") {};
SingleOutputStreamOperator<Aggregated> result = events .keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .sideOutputLateData(lateTag) // Capture late data .aggregate(new AggregateFunction());
// Get late dataDataStream<Event> lateData = result.getSideOutput(lateTag);Spark Implementation
from pyspark.sql.functions import col
# Define watermark with 5 minute latenessdf = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load() \ .withWatermark("timestamp", "5 minutes") # Lateness tolerance
# Late data will be dropped by default# Use update mode to handle late data (for stateful operations)windowed = df.groupBy( window(col("timestamp"), "5 minutes"), col("key")).agg( count("*").alias("count"))
# Update mode allows updates (late data updates results)query = windowed.writeStream \ .format("delta") \ .outputMode("update") \ .option("checkpointLocation", "/checkpoint") \ .start("/delta/results")Window Size Selection
Choosing Window Size
Guidelines
| Latency | Window Size | Type | Use Case |
|---|---|---|---|
| Seconds | 5-30s | Sliding/Tumbling | Real-time dashboards |
| Minutes | 1-15min | Tumbling | Near-real-time analytics |
| Hours | 1-24h | Tumbling | Hourly/daily reporting |
Trade-off: Smaller windows = higher latency cost (more windows to process).
State Size Considerations
Window State Explosion
State Management Strategies
| Strategy | Description | Use Case |
|---|---|---|
| TTL on state | Evict old state | Session windows |
| State partitioning | Distribute state | High-cardinality keys |
| RocksDB state backend | Spill to disk | Large state |
| Mini-batch aggregation | Reduce updates | High-velocity data |
Advanced Patterns
Per-Key Watermarks
// Different watermarks per key (advanced)WatermarkStrategy .<Event>forGenerator(context -> new WatermarkGenerator() { private long maxTimestamp = Long.MIN_VALUE; private final long maxOutOfOrderness = 5000; // 5 seconds
@Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); }
@Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness)); } });Dynamic Window Sizes
# Dynamic windowing based on data characteristicsfrom pyspark.sql.functions import col, when
# Example: Different window sizes by user tierdf = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load()
# Apply different windowing logicpremium = df.filter(col("user_tier") == "premium")regular = df.filter(col("user_tier") == "regular")
# Premium: 1-minute windows (lower latency)premium_agg = premium.groupBy( window(col("timestamp"), "1 minute"), col("user_id")).agg(count("*").alias("count"))
# Regular: 5-minute windows (lower cost)regular_agg = regular.groupBy( window(col("timestamp"), "5 minutes"), col("user_id")).agg(count("*").alias("count"))Cost Implications
State Storage Cost
| Window Type | State Size | Storage Cost | Compute Cost |
|---|---|---|---|
| Tumbling | Controlled | Low | Low |
| Sliding | Size × overlap | Medium | Medium |
| Session | Unbounded | High (need TTL) | High |
State Backend Selection
Best Practices
DO
# 1. Use tumbling windows when possible (lowest cost)windowed = df.groupBy( window(col("timestamp"), "5 minutes"))
# 2. Set appropriate watermarks (balance latency vs. completeness)df.withWatermark("timestamp", "5 minutes")
# 3. Handle late data explicitlyside_output_late_data(events)
# 4. Monitor state sizeprint("State size: ", get_state_size())
# 5. Use TTL for session windowssession_window.withGap(Time.minutes(15))DON’T
# 1. Don't use sliding windows in Spark (inefficient)# Use tumbling windows instead
# 2. Don't forget watermarks (late data issues)# Always define watermarks for event time processing
# 3. Don't use very small windows for high-volume data# 1-second windows for 1M events/sec = expensive
# 4. Don't ignore state size (can cause OOM)# Monitor and plan state storage
# 5. Don't mix processing time and event time# Be explicit about which time semantics you're usingKey Takeaways
- Tumbling windows: Default choice, lowest cost, non-overlapping
- Sliding windows: Overlapping, 2-10x state and compute cost
- Session windows: Dynamic, requires TTL, use for user sessions
- Watermarks: Essential for late data handling
- State management: Plan for state size, use appropriate backend
- Spark limitation: No efficient sliding windows, consider Flink
- Cost awareness: Windowing strategy directly impacts state and compute costs
Back to Module 2