Skip to content

Apache Flink Deep Dive

Stateful Stream Processing at Scale


Overview

Apache Flink is a distributed streaming engine designed for real-time analytics with stateful computations. Flink provides exactly-once semantics, sophisticated windowing, and excellent state management, making it the gold standard for complex streaming applications.


Job Architecture

State Backends

State Backend Selection:

BackendMax StateLatencyUse Case
Memory< 100GBLowestTesting, small state
RocksDB10TB+Low-MediumProduction, large state
HashMapUnlimitedHighVery large state

DataStream API

Basic DataStream

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create DataStream from Kafka
FlinkKafkaConsumer<Event> kafkaSource = new FlinkKafkaFeatureBuilder<>(
Event.class
).build();
kafkaSource.setBootstrapServers("kafka1:9092,kafka2:9092,kafka3:9092");
kafkaSource.setTopics(Collections.singletonList("events"));
// Create DataStream
DataStream<Event> events = env
.addSource(kafkaSource)
.keyBy(Event::getUserId); // Keyed stream

Windowing

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
// Tumbling window (1 minute)
DataStream<Aggregated> tumbling = events
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAggregator());
// Sliding window (1 minute size, 30 second slide)
DataStream<Aggregated> sliding = events
.window(SlidingEventTimeWindows.of(
Time.minutes(1),
Time.seconds(30)
))
.aggregate(new CountAggregator());
// Session window (5 minute gap)
DataStream<Aggregated> session = events
.window(EventTimeSessionWindows.withGap(Time.minutes(5))
.aggregate(new CountAggregator());

Stateful Processing

// KeyedProcessFunction for stateful operations
public class UserFunction extends KeyedProcessFunction<String, Event, Result> {
private ValueState<Long> eventCount;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"eventCount",
Long.class
);
eventCount = getRuntimeContext().getState(descriptor);
}
@Override
public Result processElement(Event event, Context ctx) throws Exception {
// Update state
Long currentCount = eventCount.value() == null ? 0 : eventCount.value();
eventCount.update(currentCount + 1);
// Get timer for cleanup
ctx.timer().register(EventTime_TIMER, ctx.timestamp());
return new Result(event.getUserId(), currentCount);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx) throws Exception {
// Clear state after TTL
eventCount.clear();
}
}

Checkpointing

// Enable checkpointing
env.enableCheckpointing(
"s3://flink/checkpoints/", // Checkpoint directory
60000 // Checkpoint interval (1 minute)
);
// Configure checkpointing
CheckpointConfig config = CheckpointConfig.builder()
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
.setMinPauseBetweenCheckpoints(30000) // 30 seconds
.build];
env.getCheckpointConfig().configure(config);

Streaming SQL

Table API

-- Create table for Kafka
CREATE TABLE events (
user_id BIGINT,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka1:9092',
'format' = 'json'
);
-- Create aggregated view
CREATE TABLE user_metrics AS
SELECT
user_id,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM events
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE);

UDF in SQL

// Register UDF
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryFunction(
"enrich_user",
DataTypes.of(DataTypes.STRING()),
EnrichUser.class
);
// Use in SQL
SELECT
user_id,
enrich_user(user_id) as user_data
FROM events;

Savepoints

Creating Savepoints

// Create savepoint manually
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Execute with savepoint
env.executeAndReturn(
"my-job",
new SavepointRestoreSettings("s3://flink/savepoints/my-savepoint/")
);
// Trigger savepoint via REST API
curl -X POST http://flink-jobmanager:8081/jobs/my-job/savepoints \
-H "Content-Type: application/json" \
-d '{"savepoint": "s3://flink/savepoints/my-savepoint/"}'

Restore from Savepoint

// Restore from savepoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.restoreSavepoint(
"s3://flink/savepoints/my-savepoint/"
);
// Continue job from savepoint
DataStream<Event> events = env
.addSource(kafkaSource)
.keyBy(Event::getUserId);

Performance Tuning

Configuration

// Flink configuration
Configuration config = new Configuration();
// Memory management
config.setFloat("taskmanager.memory.process.size", "4096"); // 4GB per slot
config.setInteger("taskmanager.numberOfTaskSlots", "16"); // 16 slots per TM
// Parallelism
config.setInteger("parallelism.default", "200");
// Checkpointing
config.setString("state.backend", "rocksdb");
config.setString("state.checkpoints.dir", "s3://flink/checkpoints/");
config.setLong("state.checkpoints.num-retained", "3"); // Retain 3 checkpoints
// Network
config.setString("rest.address", "flink-jobmanager:8081");
config.setString("rest.port", "8081");

Backpressure Handling

// Configure backpressure
env.getConfig()
.setFloat("taskmanager.network.memory.fraction", "0.2")
.setFloat("taskmanager.memory.fraction", "0.7");
// Unaligned checkpoints
env.getCheckpointConfig()
.enableUnalignedCheckpoints(true);

Operations

Deployment

# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-job
spec:
replicas: 1
template:
spec:
containers:
- name: flink-job
image: flink:1.17
command: ["flink", "run", "-d"]
args: ["-s", "s3://flink/jobs/my-job.jar"]
env:
- name: FLINK_PROPERTIES
value: "jobmanager.rpc.address: flink-jobmanager:8081"
- name: TASK_MANAGER_MEMORY
value: "4096m" # 4GB
- name | TASK_MANAGER_CPU
value: "2.0"

Monitoring

monitoring:
metrics:
- name: "Checkpoint duration"
metric: flink checkpoint duration
alert: "If > 1 minute"
- name: "Checkpoint failure rate"
metric: flink checkpoint failures
alert: "If > 0"
- name: "Backpressure ratio"
metric: flink backpressure ratio
alert: "If > 0.8"
- name: "State size"
metric: flink state size
alert: "If > 10GB"

Senior Level Considerations

Anti-Patterns

Anti-Pattern 1: Ignoring state size

// Bad: State grows unbounded
keyedProcessFunction.updateState(event);
// Good: Implement TTL
ctx.timer().register("state-ttl", timestamp);

Anti-Pattern 2: Not tuning memory

// Bad: Default memory settings
config.setFloat("taskmanager.memory.process.size", "1024"); // Too small
// Good: Right-size for state
config.setFloat("taskmanager.memory.process.size", "4096"); // 4GB

Anti-Pattern 3: No checkpointing

// Bad: No checkpointing (job failure = lose all state)
// Good:
env.enableCheckpointing("s3://flink/checkpoints/");

Key Takeaways

  1. Stateful processing: Core strength for complex stateful computations
  2. Exactly-once: Built-in support with checkpointing
  3. Windowing: Most flexible windowing options (tumbling, sliding, session)
  4. State backends: RocksDB for production state
  5. Savepoints: Enable job upgrades without state loss
  6. Performance: Tune memory, parallelism, checkpointing
  7. Operations: Requires monitoring and alerting
  8. Cost: Use spot instances for stateless operations

Back to Module 2