Stream Processing: Flink и паттерны
Flink Architecture
Event Time vs Processing Time
Event Time: когда СОБЫТИЕ произошло (timestamp в данных)
Processing Time: когда СИСТЕМА обработала событие
Проблема:
Event happened at 10:00:00 (event time)
Arrived at system at 10:00:05 (5 sec network delay)
Processed at 10:00:07 (2 sec queue wait)
Processing time window [10:00:00, 10:01:00] would MISS this event!
Event time window [10:00:00, 10:01:00] correctly includes it [OK]
Always use Event Time for analytics
Processing time windows are unreliable: network delays, reprocessing, consumer lag → events in wrong windows. Event time + watermarks = correct results regardless of processing delays.
Watermarks: когда окно «закрыто»
Проблема: Как Flink знает, что все events для window [10:00-10:01] прибыли?
Решение: Watermark — assertion «все events с timestamp ≤ W уже прибыли»:
Watermark progression:
t=10:00:03 → watermark = 10:00:00 (3 sec behind)
t=10:00:08 → watermark = 10:00:05
t=10:00:15 → watermark = 10:00:12
Window [10:00:00, 10:01:00] fires when watermark ≥ 10:01:00
Allowed lateness: events arriving after watermark
→ side output (late data handling)
→ or update result (accumulating mode)
Windowing Strategies
Checkpointing: exactly-once в Flink
Flink Checkpointing:
1. Barrier injected into stream (by JobManager)
2. Each operator saves state to durable storage (S3, HDFS)
3. On failure: restore from last checkpoint
4. Resume processing from checkpoint offset
Result: exactly-once processing (state + output consistent)
Config:
checkpoint_interval = 60 sec (balance: frequent = more overhead, rare = more reprocessing)
checkpoint_timeout = 120 sec
min_pause = 30 sec
Flink 2.0: Disaggregated State (2025)
Flink 2.0 (релиз 24 марта 2025) — первый major релиз за 9 лет. Главное архитектурное изменение — disaggregated state management через ForSt key-value backend.
Flink 1.x (RocksDB local state):
TaskManager
├─ Operator state в RocksDB на локальном диске
├─ Размер state ограничен local disk capacity
├─ Rescaling = state redistribution = перезагрузка с checkpoint в S3
└─ Cluster sizing определяется max state size
Flink 2.0 (ForSt + S3/HDFS):
TaskManager
├─ ForSt = "For Streaming" KV store
├─ State хранится в object storage (S3/HDFS)
├─ Локальный hybrid cache (1 GB) для hot working set
├─ Async execution model для SQL operators (joins, aggregates)
├─ State snapshot caching между checkpoints
└─ Rescaling = смена metadata pointer (instant)
Импликации:
- Unlimited state size (не bound by local disk)
- Fast recovery независимо от размера state
- 75-120% throughput при state 1.2-4.8 GB с 1 GB cache (vs RocksDB local)
- Cluster sizing меняется: меньше nodes, тоньше disks
- Stateful Flink jobs становятся cloud-native, как stateless
Когда переходить на Flink 2.0 ForSt. Если ваш state регулярно превышает 100 GB на TaskManager или вы регулярно ребалансируете кластер при росте state — disaggregated state снимает оба ограничения. Если state маленький (менее 10 GB) — RocksDB local быстрее, переход не обязателен. Cluster sizing для ForSt: меньше TaskManagers, дешевле disks, S3 throughput становится bottleneck вместо local IOPS.
Streaming Design Patterns
Pattern 1: Enrichment (stream + lookup)
Stream: order events (user_id, product_id, amount)
Lookup: user dimension table (user_id → name, segment, country)
Enriched: order + user name, segment, country
Approaches:
a) Broadcast state: load small lookup table into each task (< 1 GB)
b) Async I/O: query external DB per event (adds latency)
c) Temporal join: join stream with versioned table (time-aware)
Pattern 2: Deduplication
Deduplication in streaming:
Key: event_id
State: Set of seen event_ids (bounded by TTL)
if (event_id in seen_set) → skip (duplicate)
else → process + add to seen_set
TTL: clean old entries (e.g., 24 hours) to bound state size
Arbitrary State API v2 в Spark 4.0
Pattern 3: Dead Letter Queue (DLQ)
DLQ: events that failed processing → separate topic for investigation
Main flow: Kafka → Flink → process → Sink
Error flow: Flink → catch exception → DLQ topic
DLQ contains:
- Original event
- Error message
- Timestamp
- Retry count
Benefits: main pipeline doesn't block on bad events