Skip to content

Windowing Strategies

Tumbling, Sliding, Session Windows and Watermarks


Overview

Windowing is fundamental to streaming analytics. It enables aggregations over finite subsets of infinite data streams. This document covers windowing strategies, watermarks for handling late data, and implementation in both Flink and Spark Structured Streaming.


Window Types


Tumbling Windows

Concept

Characteristics:

  • Fixed-size, non-overlapping windows
  • Each event belongs to exactly one window
  • Deterministic (easy to reason about)
  • Most common window type

Use Cases

  • Per-minute/hour/day metrics
  • Periodic reporting
  • Simple aggregations
DataStream<Event> events = ...;
// Tumbling window of 5 minutes
DataStream<Aggregated> result = events
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new AggregateFunction());
// With offset (e.g., windows start at :00, :05, :10)
.window(TumblingEventTimeWindows.of(
Time.minutes(5),
Time.minutes(0) // Offset
))

Spark Implementation

from pyspark.sql.functions import window
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Tumbling window of 5 minutes
windowed = df.groupBy(
window(col("timestamp"), "5 minutes"),
col("key")
).agg(
count("*").alias("count"),
sum("value").alias("total")
)

Sliding Windows

Concept

Characteristics:

  • Fixed-size, overlapping windows
  • Each event can belong to multiple windows
  • Slide interval controls overlap
  • Higher computational cost

Use Cases

  • Moving averages
  • Trend analysis
  • Anomaly detection over time windows
// Sliding window: 5 minutes size, 1 minute slide
DataStream<Aggregated> result = events
.keyBy(Event::getKey)
.window(SlidingEventTimeWindows.of(
Time.minutes(5), // Window size
Time.minutes(1) // Slide interval
))
.aggregate(new AggregateFunction());
// Note: This creates 5x the windows of tumbling!

Spark Implementation

# Sliding window: 5 minutes size, 1 minute slide
# In Spark: Use multiple tumbling windows with different offsets
from pyspark.sql.functions import window, col
# Create 5 windows offset by 1 minute each
results = []
for offset in range(5):
windowed = df.groupBy(
window(
col("timestamp"),
"5 minutes",
f"{offset} minute"
),
col("key")
).agg(count("*").alias("count"))
results.append(windowed)
# Union results (inefficient in Spark)
# Alternative: Use session windows or custom logic

Note: Spark doesn’t support sliding windows efficiently. Consider Flink for true sliding windows.


Session Windows

Concept

Characteristics:

  • Dynamic windows based on activity
  • Gap timeout determines session boundaries
  • Captures user sessions naturally
  • Variable window sizes

Use Cases

  • User session analytics
  • Website/app session tracking
  • Activity-based grouping
  • Clickstream analysis
// Session window with 15 minute gap
DataStream<Aggregated> result = events
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
.aggregate(new AggregateFunction());
// Session windows merge when events arrive within gap

Spark Implementation

from pyspark.sql.functions import session_window
# Session window with 15 minute gap
windowed = df.groupBy(
session_window(col("timestamp"), "15 minutes"),
col("user_id")
).agg(
count("*").alias("event_count"),
min("timestamp").alias("session_start"),
max("timestamp").alias("session_end")
)

Watermarks and Late Data

The Problem of Late Data

Watermark Strategy

Watermark: Timestamp indicating when we believe all events for a given window have arrived.

Lateness tolerance: How long to wait for late events before closing window.

// Watermark strategy: 5 minute lateness tolerance
DataStream<Event> events = env
.addSource(kafkaSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1)) // Handle idle partitions
);
// Side output for late data
OutputTag<Event> lateTag = new OutputTag<Event>("late-data") {};
SingleOutputStreamOperator<Aggregated> result = events
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sideOutputLateData(lateTag) // Capture late data
.aggregate(new AggregateFunction());
// Get late data
DataStream<Event> lateData = result.getSideOutput(lateTag);

Spark Implementation

from pyspark.sql.functions import col
# Define watermark with 5 minute lateness
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load() \
.withWatermark("timestamp", "5 minutes") # Lateness tolerance
# Late data will be dropped by default
# Use update mode to handle late data (for stateful operations)
windowed = df.groupBy(
window(col("timestamp"), "5 minutes"),
col("key")
).agg(
count("*").alias("count")
)
# Update mode allows updates (late data updates results)
query = windowed.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/checkpoint") \
.start("/delta/results")

Window Size Selection

Choosing Window Size

Guidelines

LatencyWindow SizeTypeUse Case
Seconds5-30sSliding/TumblingReal-time dashboards
Minutes1-15minTumblingNear-real-time analytics
Hours1-24hTumblingHourly/daily reporting

Trade-off: Smaller windows = higher latency cost (more windows to process).


State Size Considerations

Window State Explosion

State Management Strategies

StrategyDescriptionUse Case
TTL on stateEvict old stateSession windows
State partitioningDistribute stateHigh-cardinality keys
RocksDB state backendSpill to diskLarge state
Mini-batch aggregationReduce updatesHigh-velocity data

Advanced Patterns

Per-Key Watermarks

// Different watermarks per key (advanced)
WatermarkStrategy
.<Event>forGenerator(context -> new WatermarkGenerator() {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000; // 5 seconds
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
});

Dynamic Window Sizes

# Dynamic windowing based on data characteristics
from pyspark.sql.functions import col, when
# Example: Different window sizes by user tier
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Apply different windowing logic
premium = df.filter(col("user_tier") == "premium")
regular = df.filter(col("user_tier") == "regular")
# Premium: 1-minute windows (lower latency)
premium_agg = premium.groupBy(
window(col("timestamp"), "1 minute"),
col("user_id")
).agg(count("*").alias("count"))
# Regular: 5-minute windows (lower cost)
regular_agg = regular.groupBy(
window(col("timestamp"), "5 minutes"),
col("user_id")
).agg(count("*").alias("count"))

Cost Implications

State Storage Cost

Window TypeState SizeStorage CostCompute Cost
TumblingControlledLowLow
SlidingSize × overlapMediumMedium
SessionUnboundedHigh (need TTL)High

State Backend Selection


Best Practices

DO

# 1. Use tumbling windows when possible (lowest cost)
windowed = df.groupBy(
window(col("timestamp"), "5 minutes")
)
# 2. Set appropriate watermarks (balance latency vs. completeness)
df.withWatermark("timestamp", "5 minutes")
# 3. Handle late data explicitly
side_output_late_data(events)
# 4. Monitor state size
print("State size: ", get_state_size())
# 5. Use TTL for session windows
session_window.withGap(Time.minutes(15))

DON’T

# 1. Don't use sliding windows in Spark (inefficient)
# Use tumbling windows instead
# 2. Don't forget watermarks (late data issues)
# Always define watermarks for event time processing
# 3. Don't use very small windows for high-volume data
# 1-second windows for 1M events/sec = expensive
# 4. Don't ignore state size (can cause OOM)
# Monitor and plan state storage
# 5. Don't mix processing time and event time
# Be explicit about which time semantics you're using

Key Takeaways

  1. Tumbling windows: Default choice, lowest cost, non-overlapping
  2. Sliding windows: Overlapping, 2-10x state and compute cost
  3. Session windows: Dynamic, requires TTL, use for user sessions
  4. Watermarks: Essential for late data handling
  5. State management: Plan for state size, use appropriate backend
  6. Spark limitation: No efficient sliding windows, consider Flink
  7. Cost awareness: Windowing strategy directly impacts state and compute costs

Back to Module 2