Operators поддерживающие ForstDB
ForstDB как state backend работает для любого Flink job — это просто другой backend. Но async state access (которая даёт реальную performance в disaggregated setup) требует, чтобы оператор был переписан под async API. К Flink 2.2 (май 2026) coverage SQL operators хорошее, но не 100%. Понимание, какие операторы async-ready, какие нет — критично для production decisions.
В этом уроке мы разбираем actual coverage в Flink 2.2: какие SQL operators поддерживают async state, какие fallback на sync, какие конфиги управляют поведением, и что делать когда нужный оператор не поддерживается.
Joins и lookup в Flink SQLДва уровня поддержки
Уровень 1: ForstDB как backend
- Любой Flink job может использовать ForstDB
- Config: state.backend.type: forst
- Sync state access работает, но slow (10 ms на cache miss)
- Approach: использовать вместе с большим cache
Уровень 2: Async state access
- Operator должен быть переписан под State V2 API
- Config: table.exec.async-state.enabled: true
- True performance benefits disaggregation
- Coverage растёт по мере development
Большинство users хотят Уровень 2 — иначе ForstDB не имеет смысла. Поэтому coverage operators это главная gating статья миграции.
Operators поддерживающие async state в Flink 2.2
Streaming operators (async-ready):
-- Aggregate operators
StreamPhysicalGroupAggregate YES (async)
StreamPhysicalLocalGroupAggregate YES (async)
StreamPhysicalGlobalGroupAggregate YES (async)
StreamPhysicalIncrementalGroupAggregate YES
StreamPhysicalGroupWindowAggregate YES (async с Flink 2.1)
StreamPhysicalGroupTableAggregate YES
-- Deduplicate / Rank
StreamPhysicalDeduplicate YES
StreamPhysicalRank YES (с Flink 2.1)
StreamPhysicalSortLimit YES
-- Joins
StreamPhysicalJoin (regular) YES (с Flink 2.1)
StreamPhysicalIntervalJoin YES
StreamPhysicalTemporalJoin YES
StreamPhysicalLookupJoin YES (был async всегда)
StreamPhysicalWindowJoin YES
StreamPhysicalDeltaJoin (2.1+) YES (native async)
StreamPhysicalMultiJoin (2.2) YES (native async)
-- Simple operators
StreamPhysicalCalc YES (stateless, всегда работает)
StreamPhysicalExchange YES (stateless)
StreamPhysicalSink зависит от sink
-- ChangelogNormalize
StreamPhysicalChangelogNormalize YES
Operators ещё НЕ поддерживающие async
Streaming operators без async support:
StreamPhysicalMatchRecognize NO (CEP, в работе)
StreamPhysicalSortAggregate NO (legacy, deprecated)
StreamPhysicalProcessTableFunction NO (custom function)
Custom UDF with state NO (user code в sync style)
StreamPhysicalSortMergeJoin NO (legacy, deprecated)
Что делать если не поддерживается:
- Job всё равно работает (sync fallback)
- Performance degraded на ForstDB (cache miss = 10 ms)
- Mitigation: больший cache, RocksDB для этого job
- Future: coverage extends каждый release
SQL examples — что async, что нет
-- ASYNC: GROUP BY aggregate
SELECT user_id, COUNT(*) AS cnt
FROM clicks
GROUP BY user_id;
-- StreamPhysicalGroupAggregate — async-ready
-- ASYNC: window aggregate
SELECT user_id, TUMBLE_START(ts, INTERVAL '1' HOUR) AS w, COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' HOUR);
-- StreamPhysicalGroupWindowAggregate — async
-- ASYNC: regular join
SELECT *
FROM orders o JOIN payments p ON o.id = p.order_id;
-- StreamPhysicalJoin — async (с 2.1)
-- ASYNC: deduplicate
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS rn
FROM events
) WHERE rn = 1;
-- StreamPhysicalDeduplicate — async
-- ASYNC: CDC source + ChangelogNormalize
SELECT * FROM cdc_orders;
-- StreamPhysicalChangelogNormalize — async
-- NOT ASYNC: MATCH_RECOGNIZE (CEP)
SELECT * FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
PATTERN (A B+ C)
DEFINE A AS A.type = 'login', B AS B.type = 'attempt', C AS C.type = 'fail'
);
-- StreamPhysicalMatchRecognize — sync fallback
Async configuration
# flink-conf.yaml для async state
# Главный switch
table.exec.async-state.enabled: true
# Tunable parameters
table.exec.async-state.in-flight.size: 100
# Сколько одновременных в полёте requests на operator
# Больше = больше throughput, но больше heap
# Default 6, increase для high latency S3
table.exec.async-state.buffer-timeout: 100ms
# Maximum wait для batching state requests
# Lower = lower latency, less batching
# Higher = better batching, higher latency
table.exec.async-state.cache.size: 1000
# Per-operator local result cache
# Помогает для repeated lookups same key
# State backend configs
state.backend.type: forst
state.backend.forst.local-cache.size: 50gb
state.backend.forst.block-cache.size: 1gb
Performance impact матрицы
Recovery / startup behaviour
При starting job с ForstDB:
Cold start (новый job):
- No state to recover
- Operators initialize empty
- Цех warming up cache по мере чтения данных
Restart from checkpoint:
- Read snapshot manifest (small, MB)
- Operators initialize, state references S3 manifest
- НЕ download SST files
- Resume processing
- Cache cold для несколько minutes
Restart from savepoint (savepoint = explicit user-triggered checkpoint):
- Same as checkpoint
- Manifest можно portable между clusters
- Useful для migrations
Rescaling (parallelism change):
- Old state по key groups в S3
- New TaskManagers lookup keys в S3 lazy
- НЕ требует pre-download
- Resume processing после ~30 sec setup
Все эти scenarios теперь O(sec), не O(min/hour).
Сравнение с RocksDB defaults
Чтобы понимать impact, сравним defaults:
RocksDB backend defaults (Flink 1.x/2.x):
state.backend.rocksdb.block.cache-size: 8MB
state.backend.rocksdb.write.buffer.size: 64MB
state.backend.rocksdb.compaction.level.max-size-level-base: 256MB
state.backend.rocksdb.predefined-options: SPINNING_DISK (или FLASH_SSD)
Optimal для large state: tune вверх
state.backend.rocksdb.block.cache-size: 1GB
state.backend.rocksdb.write.buffer.size: 256MB
ForstDB backend defaults:
state.backend.forst.block-cache.size: 256MB
state.backend.forst.local-cache.size: 0 (disabled by default)
state.backend.forst.write-buffer.size: 64MB
state.backend.forst.async-execution: true
Optimal для large state:
state.backend.forst.local-cache.size: 50GB
state.backend.forst.block-cache.size: 1GB
state.backend.forst.async-in-flight: 200
Critically: ForstDB local cache disabled by default. Это обязательно нужно tune для production — без cache cache miss rate ~100% и performance страдает.
Operational considerations
1. Monitoring необходим
Metrics для tracking:
- state.backend.forst.cache.hit_rate (целить > 80%)
- state.backend.forst.s3.requests.put / get / list
- state.backend.forst.compaction.bytes
- operator.async-state.in-flight.count
- operator.async-state.buffer.timeout.count
Alerts:
- Cache hit rate < 50% -> increase cache
- S3 errors > threshold -> connectivity issue
- Buffer timeouts > threshold -> increase in-flight
2. Cost management
S3 PUT/GET: $0.005-0.0004 per 1000 requests
For job делающий 100K state ops/sec:
- 100K * 86400 = 8.6B ops/day
- При 20% cache miss = 1.7B S3 requests
- 1.7B / 1000 * 0.0004 = $680/day только S3 requests
Optimization:
- Larger cache -> fewer S3 requests
- VPC endpoint -> eliminate cross-AZ traffic costs
- S3 storage tier (Intelligent Tiering) -> save on cold storage
3. Job migration timing
Не мигрировать ALL jobs сразу. Stages:
a. Pilot job (низкий risk, dev/staging)
b. Cooldown period (1-2 недели monitoring)
c. Migrate non-critical production jobs
d. Migrate critical jobs последними
e. Keep some RocksDB jobs для baseline comparison
4. Schema evolution
Async state не меняет schema evolution rules
State Processor API работает с ForstDB
Savepoints portable между sync/async
Конфиг для типичных production setups
# Setup 1: Production analytics, large state (1 TB), cost-sensitive
state.backend.type: forst
state.backend.forst.local-cache.size: 200gb # большой cache
state.backend.forst.block-cache.size: 4gb # больше RAM cache
state.backend.forst.compaction.style: LEVEL
state.backend.forst.wal.enabled: false # checkpoint достаточно
table.exec.async-state.enabled: true
table.exec.async-state.in-flight.size: 200 # high concurrency
execution.checkpointing.interval: 60s
execution.checkpointing.unaligned: true # для slow operators
# Setup 2: Low-latency, smaller state (100 GB), latency-critical
state.backend.type: forst # либо rocksdb
state.backend.forst.local-cache.size: 100gb # cache всё state
state.backend.forst.block-cache.size: 8gb # огромный RAM cache
state.backend.forst.wal.enabled: false
table.exec.async-state.enabled: true
table.exec.async-state.in-flight.size: 50 # less concurrency для лучшей order
execution.checkpointing.interval: 10s # частые для recovery
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 200ms
table.exec.mini-batch.size: 10000
# Setup 3: Multi-region, DR-focused
state.backend.type: forst
state.backend.forst.local-cache.size: 50gb
state.checkpoints.dir: s3://flink-state/region-a/checkpoints
# Cross-region replication configured на bucket level
# Standby cluster в region B читает с replicated bucket
table.exec.async-state.enabled: true
execution.checkpointing.interval: 30s
execution.checkpointing.tolerable-failed-checkpoints: 5
Edge cases и known issues
1. Custom serializers
Custom serializers должны быть thread-safe для async I/O
Не использовать ThreadLocal не carefully
2. State migration
При schema change state migration работает sync
Может быть slow для большого state
Mitigation: incremental migration через State Processor API
3. Hot keys
Очень skewed data hits cache thrashing
Hot key state может быть recached multiple times
Mitigation: SplitAggregateRule для distinct aggregates
4. Async timer handling
Timers async не имеют такого же ordering как processElement
Не использовать timers для exactly-once delivery (используйте 2PC sink)
5. Network partitions
S3 unreachable = job stops processing
В sync model state writes блокировались
В async: requests buffer, fail после timeout
Result: similar behavior, но faster failure detection
ForstDB не silver bullet. Для small state (менее 50 GB), stable workloads без rescaling, latency-sensitive applications (sub-ms) — RocksDB local может оставаться лучшим выбором. ForstDB shines для large state (более 100 GB), dynamic environments (frequent rescaling/recovery), cloud-native deployments. Решение должно быть осознанным после benchmarking на actual workload.
Не-SQL APIs
DataStream API и Process Function:
// DataStream API с ForstDB
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.setString("state.backend.type", "forst");
config.setString("state.backend.forst.local-cache.size", "50gb");
env.configure(config);
// Operators могут использовать V1 или V2 API
// V2 обязательна для full async benefits
// V1 ProcessFunction (sync, slow на ForstDB)
DataStream<Event> events = ...;
events.keyBy(e -> e.key)
.process(new MyProcessFunction()); // V1
// V2 ProcessFunction (async, fast)
events.keyBy(e -> e.key)
.process(new MyProcessFunctionV2()); // V2 (нужен async runtime)
Чтение source
Flink source:
flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
AsyncKeyedProcessOperator.java -- main async operator
AsyncWindowAggregateOperator.java
AsyncDeduplicateOperator.java
AsyncRankOperator.java
AsyncJoinOperator.java
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/asyncprocessing/
AsyncProcessingFunction.java
AsyncExecutionController.java
flink-state-backends/flink-statebackend-forst/
src/main/java/org/apache/flink/state/forst/
ForStStateBackend.java
ForStOptions.java -- все конфиги
FLIP документы:
FLIP-423: Disaggregated State Management Framework
FLIP-424: Asynchronous State APIs
FLIP-425: Asynchronous Execution Model
FLIP-451: ForSt Local Disk Cache