Apache Flink Deep Dive
Stateful Stream Processing at Scale
Overview
Apache Flink is a distributed streaming engine designed for real-time analytics with stateful computations. Flink provides exactly-once semantics, sophisticated windowing, and excellent state management, making it the gold standard for complex streaming applications.
Flink Architecture Deep Dive
Job Architecture
State Backends
State Backend Selection:
| Backend | Max State | Latency | Use Case |
|---|---|---|---|
| Memory | < 100GB | Lowest | Testing, small state |
| RocksDB | 10TB+ | Low-Medium | Production, large state |
| HashMap | Unlimited | High | Very large state |
DataStream API
Basic DataStream
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create DataStream from KafkaFlinkKafkaConsumer<Event> kafkaSource = new FlinkKafkaFeatureBuilder<>( Event.class).build();
kafkaSource.setBootstrapServers("kafka1:9092,kafka2:9092,kafka3:9092");kafkaSource.setTopics(Collections.singletonList("events"));
// Create DataStreamDataStream<Event> events = env .addSource(kafkaSource) .keyBy(Event::getUserId); // Keyed streamWindowing
import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
// Tumbling window (1 minute)DataStream<Aggregated> tumbling = events .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregator());
// Sliding window (1 minute size, 30 second slide)DataStream<Aggregated> sliding = events .window(SlidingEventTimeWindows.of( Time.minutes(1), Time.seconds(30) )) .aggregate(new CountAggregator());
// Session window (5 minute gap)DataStream<Aggregated> session = events .window(EventTimeSessionWindows.withGap(Time.minutes(5)) .aggregate(new CountAggregator());Stateful Processing
// KeyedProcessFunction for stateful operationspublic class UserFunction extends KeyedProcessFunction<String, Event, Result> {
private ValueState<Long> eventCount;
@Override public void open(Configuration parameters) { ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( "eventCount", Long.class ); eventCount = getRuntimeContext().getState(descriptor); }
@Override public Result processElement(Event event, Context ctx) throws Exception { // Update state Long currentCount = eventCount.value() == null ? 0 : eventCount.value(); eventCount.update(currentCount + 1);
// Get timer for cleanup ctx.timer().register(EventTime_TIMER, ctx.timestamp());
return new Result(event.getUserId(), currentCount); }
@Override public void onTimer(long timestamp, OnTimerContext ctx) throws Exception { // Clear state after TTL eventCount.clear(); }}Checkpointing
// Enable checkpointingenv.enableCheckpointing( "s3://flink/checkpoints/", // Checkpoint directory 60000 // Checkpoint interval (1 minute));
// Configure checkpointingCheckpointConfig config = CheckpointConfig.builder() .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) .setMinPauseBetweenCheckpoints(30000) // 30 seconds .build];
env.getCheckpointConfig().configure(config);Streaming SQL
Table API
-- Create table for KafkaCREATE TABLE events ( user_id BIGINT, event_type STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ( 'connector' = 'kafka', 'topic' = 'events', 'properties.bootstrap.servers' = 'kafka1:9092', 'format' = 'json');
-- Create aggregated viewCREATE TABLE user_metrics ASSELECT user_id, TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end, COUNT(*) as event_count, SUM(amount) as total_amountFROM eventsGROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE);UDF in SQL
// Register UDFStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryFunction( "enrich_user", DataTypes.of(DataTypes.STRING()), EnrichUser.class);
// Use in SQLSELECT user_id, enrich_user(user_id) as user_dataFROM events;Savepoints
Creating Savepoints
// Create savepoint manuallyStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Execute with savepointenv.executeAndReturn( "my-job", new SavepointRestoreSettings("s3://flink/savepoints/my-savepoint/"));
// Trigger savepoint via REST APIcurl -X POST http://flink-jobmanager:8081/jobs/my-job/savepoints \ -H "Content-Type: application/json" \ -d '{"savepoint": "s3://flink/savepoints/my-savepoint/"}'Restore from Savepoint
// Restore from savepointStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.restoreSavepoint( "s3://flink/savepoints/my-savepoint/");
// Continue job from savepointDataStream<Event> events = env .addSource(kafkaSource) .keyBy(Event::getUserId);Performance Tuning
Configuration
// Flink configurationConfiguration config = new Configuration();
// Memory managementconfig.setFloat("taskmanager.memory.process.size", "4096"); // 4GB per slotconfig.setInteger("taskmanager.numberOfTaskSlots", "16"); // 16 slots per TM
// Parallelismconfig.setInteger("parallelism.default", "200");
// Checkpointingconfig.setString("state.backend", "rocksdb");config.setString("state.checkpoints.dir", "s3://flink/checkpoints/");config.setLong("state.checkpoints.num-retained", "3"); // Retain 3 checkpoints
// Networkconfig.setString("rest.address", "flink-jobmanager:8081");config.setString("rest.port", "8081");Backpressure Handling
// Configure backpressureenv.getConfig() .setFloat("taskmanager.network.memory.fraction", "0.2") .setFloat("taskmanager.memory.fraction", "0.7");
// Unaligned checkpointsenv.getCheckpointConfig() .enableUnalignedCheckpoints(true);Operations
Deployment
# Kubernetes deploymentapiVersion: apps/v1kind: Deploymentmetadata: name: flink-jobspec: replicas: 1 template: spec: containers: - name: flink-job image: flink:1.17 command: ["flink", "run", "-d"] args: ["-s", "s3://flink/jobs/my-job.jar"] env: - name: FLINK_PROPERTIES value: "jobmanager.rpc.address: flink-jobmanager:8081" - name: TASK_MANAGER_MEMORY value: "4096m" # 4GB - name | TASK_MANAGER_CPU value: "2.0"Monitoring
monitoring: metrics: - name: "Checkpoint duration" metric: flink checkpoint duration alert: "If > 1 minute"
- name: "Checkpoint failure rate" metric: flink checkpoint failures alert: "If > 0"
- name: "Backpressure ratio" metric: flink backpressure ratio alert: "If > 0.8"
- name: "State size" metric: flink state size alert: "If > 10GB"Senior Level Considerations
Anti-Patterns
Anti-Pattern 1: Ignoring state size
// Bad: State grows unboundedkeyedProcessFunction.updateState(event);
// Good: Implement TTLctx.timer().register("state-ttl", timestamp);Anti-Pattern 2: Not tuning memory
// Bad: Default memory settingsconfig.setFloat("taskmanager.memory.process.size", "1024"); // Too small
// Good: Right-size for stateconfig.setFloat("taskmanager.memory.process.size", "4096"); // 4GBAnti-Pattern 3: No checkpointing
// Bad: No checkpointing (job failure = lose all state)// Good:env.enableCheckpointing("s3://flink/checkpoints/");Key Takeaways
- Stateful processing: Core strength for complex stateful computations
- Exactly-once: Built-in support with checkpointing
- Windowing: Most flexible windowing options (tumbling, sliding, session)
- State backends: RocksDB for production state
- Savepoints: Enable job upgrades without state loss
- Performance: Tune memory, parallelism, checkpointing
- Operations: Requires monitoring and alerting
- Cost: Use spot instances for stateless operations
Back to Module 2