Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 22 мин
Продвинутый
Async OperatorsForStDB SupportOperator CoverageConfigurationLimitations

Operators поддерживающие ForstDB

ForstDB как state backend работает для любого Flink job — это просто другой backend. Но async state access (которая даёт реальную performance в disaggregated setup) требует, чтобы оператор был переписан под async API. К Flink 2.2 (май 2026) coverage SQL operators хорошее, но не 100%. Понимание, какие операторы async-ready, какие нет — критично для production decisions.

В этом уроке мы разбираем actual coverage в Flink 2.2: какие SQL operators поддерживают async state, какие fallback на sync, какие конфиги управляют поведением, и что делать когда нужный оператор не поддерживается.

Joins и lookup в Flink SQL

Два уровня поддержки

Уровень 1: ForstDB как backend
  - Любой Flink job может использовать ForstDB
  - Config: state.backend.type: forst
  - Sync state access работает, но slow (10 ms на cache miss)
  - Approach: использовать вместе с большим cache

Уровень 2: Async state access
  - Operator должен быть переписан под State V2 API
  - Config: table.exec.async-state.enabled: true
  - True performance benefits disaggregation
  - Coverage растёт по мере development

Большинство users хотят Уровень 2 — иначе ForstDB не имеет смысла. Поэтому coverage operators это главная gating статья миграции.

Streaming operators (async-ready):

  -- Aggregate operators
  StreamPhysicalGroupAggregate          YES (async)
  StreamPhysicalLocalGroupAggregate     YES (async)
  StreamPhysicalGlobalGroupAggregate    YES (async)
  StreamPhysicalIncrementalGroupAggregate YES
  StreamPhysicalGroupWindowAggregate    YES (async с Flink 2.1)
  StreamPhysicalGroupTableAggregate     YES

  -- Deduplicate / Rank
  StreamPhysicalDeduplicate             YES
  StreamPhysicalRank                    YES (с Flink 2.1)
  StreamPhysicalSortLimit               YES

  -- Joins
  StreamPhysicalJoin (regular)          YES (с Flink 2.1)
  StreamPhysicalIntervalJoin            YES
  StreamPhysicalTemporalJoin            YES
  StreamPhysicalLookupJoin              YES (был async всегда)
  StreamPhysicalWindowJoin              YES
  StreamPhysicalDeltaJoin (2.1+)        YES (native async)
  StreamPhysicalMultiJoin (2.2)         YES (native async)

  -- Simple operators
  StreamPhysicalCalc                    YES (stateless, всегда работает)
  StreamPhysicalExchange                YES (stateless)
  StreamPhysicalSink                    зависит от sink

  -- ChangelogNormalize
  StreamPhysicalChangelogNormalize      YES

Operators ещё НЕ поддерживающие async

Streaming operators без async support:

  StreamPhysicalMatchRecognize          NO (CEP, в работе)
  StreamPhysicalSortAggregate           NO (legacy, deprecated)
  StreamPhysicalProcessTableFunction    NO (custom function)
  Custom UDF with state                  NO (user code в sync style)
  StreamPhysicalSortMergeJoin            NO (legacy, deprecated)

Что делать если не поддерживается:
  - Job всё равно работает (sync fallback)
  - Performance degraded на ForstDB (cache miss = 10 ms)
  - Mitigation: больший cache, RocksDB для этого job
  - Future: coverage extends каждый release

SQL examples — что async, что нет

-- ASYNC: GROUP BY aggregate
SELECT user_id, COUNT(*) AS cnt
FROM clicks
GROUP BY user_id;
-- StreamPhysicalGroupAggregate — async-ready

-- ASYNC: window aggregate
SELECT user_id, TUMBLE_START(ts, INTERVAL '1' HOUR) AS w, COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' HOUR);
-- StreamPhysicalGroupWindowAggregate — async

-- ASYNC: regular join
SELECT *
FROM orders o JOIN payments p ON o.id = p.order_id;
-- StreamPhysicalJoin — async (с 2.1)

-- ASYNC: deduplicate
SELECT * FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS rn
  FROM events
) WHERE rn = 1;
-- StreamPhysicalDeduplicate — async

-- ASYNC: CDC source + ChangelogNormalize
SELECT * FROM cdc_orders;
-- StreamPhysicalChangelogNormalize — async

-- NOT ASYNC: MATCH_RECOGNIZE (CEP)
SELECT * FROM events
MATCH_RECOGNIZE (
  PARTITION BY user_id
  PATTERN (A B+ C)
  DEFINE A AS A.type = 'login', B AS B.type = 'attempt', C AS C.type = 'fail'
);
-- StreamPhysicalMatchRecognize — sync fallback

Async configuration

# flink-conf.yaml для async state

# Главный switch
table.exec.async-state.enabled: true

# Tunable parameters
table.exec.async-state.in-flight.size: 100
  # Сколько одновременных в полёте requests на operator
  # Больше = больше throughput, но больше heap
  # Default 6, increase для high latency S3

table.exec.async-state.buffer-timeout: 100ms
  # Maximum wait для batching state requests
  # Lower = lower latency, less batching
  # Higher = better batching, higher latency

table.exec.async-state.cache.size: 1000
  # Per-operator local result cache
  # Помогает для repeated lookups same key

# State backend configs
state.backend.type: forst
state.backend.forst.local-cache.size: 50gb
state.backend.forst.block-cache.size: 1gb

Performance impact матрицы

Performance impact: operator x backend x async
Operator
RocksDB sync
ForStDB sync
ForStDB async

Recovery / startup behaviour

При starting job с ForstDB:

Cold start (новый job):
  - No state to recover
  - Operators initialize empty
  - Цех warming up cache по мере чтения данных

Restart from checkpoint:
  - Read snapshot manifest (small, MB)
  - Operators initialize, state references S3 manifest
  - НЕ download SST files
  - Resume processing
  - Cache cold для несколько minutes

Restart from savepoint (savepoint = explicit user-triggered checkpoint):
  - Same as checkpoint
  - Manifest можно portable между clusters
  - Useful для migrations

Rescaling (parallelism change):
  - Old state по key groups в S3
  - New TaskManagers lookup keys в S3 lazy
  - НЕ требует pre-download
  - Resume processing после ~30 sec setup

Все эти scenarios теперь O(sec), не O(min/hour).

Сравнение с RocksDB defaults

Чтобы понимать impact, сравним defaults:

RocksDB backend defaults (Flink 1.x/2.x):
  state.backend.rocksdb.block.cache-size: 8MB
  state.backend.rocksdb.write.buffer.size: 64MB
  state.backend.rocksdb.compaction.level.max-size-level-base: 256MB
  state.backend.rocksdb.predefined-options: SPINNING_DISK (или FLASH_SSD)
  
  Optimal для large state: tune вверх
  state.backend.rocksdb.block.cache-size: 1GB
  state.backend.rocksdb.write.buffer.size: 256MB

ForstDB backend defaults:
  state.backend.forst.block-cache.size: 256MB
  state.backend.forst.local-cache.size: 0 (disabled by default)
  state.backend.forst.write-buffer.size: 64MB
  state.backend.forst.async-execution: true

  Optimal для large state:
  state.backend.forst.local-cache.size: 50GB
  state.backend.forst.block-cache.size: 1GB
  state.backend.forst.async-in-flight: 200

Critically: ForstDB local cache disabled by default. Это обязательно нужно tune для production — без cache cache miss rate ~100% и performance страдает.

Operational considerations

1. Monitoring необходим
   Metrics для tracking:
     - state.backend.forst.cache.hit_rate (целить > 80%)
     - state.backend.forst.s3.requests.put / get / list
     - state.backend.forst.compaction.bytes
     - operator.async-state.in-flight.count
     - operator.async-state.buffer.timeout.count

   Alerts:
     - Cache hit rate < 50% -> increase cache
     - S3 errors > threshold -> connectivity issue
     - Buffer timeouts > threshold -> increase in-flight

2. Cost management
   S3 PUT/GET: $0.005-0.0004 per 1000 requests
   For job делающий 100K state ops/sec:
     - 100K * 86400 = 8.6B ops/day
     - При 20% cache miss = 1.7B S3 requests
     - 1.7B / 1000 * 0.0004 = $680/day только S3 requests
   
   Optimization:
     - Larger cache -> fewer S3 requests
     - VPC endpoint -> eliminate cross-AZ traffic costs
     - S3 storage tier (Intelligent Tiering) -> save on cold storage

3. Job migration timing
   Не мигрировать ALL jobs сразу. Stages:
     a. Pilot job (низкий risk, dev/staging)
     b. Cooldown period (1-2 недели monitoring)
     c. Migrate non-critical production jobs
     d. Migrate critical jobs последними
     e. Keep some RocksDB jobs для baseline comparison

4. Schema evolution
   Async state не меняет schema evolution rules
   State Processor API работает с ForstDB
   Savepoints portable между sync/async

Конфиг для типичных production setups

# Setup 1: Production analytics, large state (1 TB), cost-sensitive
state.backend.type: forst
state.backend.forst.local-cache.size: 200gb         # большой cache
state.backend.forst.block-cache.size: 4gb           # больше RAM cache
state.backend.forst.compaction.style: LEVEL
state.backend.forst.wal.enabled: false              # checkpoint достаточно
table.exec.async-state.enabled: true
table.exec.async-state.in-flight.size: 200          # high concurrency
execution.checkpointing.interval: 60s
execution.checkpointing.unaligned: true              # для slow operators

# Setup 2: Low-latency, smaller state (100 GB), latency-critical
state.backend.type: forst                            # либо rocksdb
state.backend.forst.local-cache.size: 100gb          # cache всё state
state.backend.forst.block-cache.size: 8gb            # огромный RAM cache
state.backend.forst.wal.enabled: false
table.exec.async-state.enabled: true
table.exec.async-state.in-flight.size: 50            # less concurrency для лучшей order
execution.checkpointing.interval: 10s                # частые для recovery
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 200ms
table.exec.mini-batch.size: 10000

# Setup 3: Multi-region, DR-focused
state.backend.type: forst
state.backend.forst.local-cache.size: 50gb
state.checkpoints.dir: s3://flink-state/region-a/checkpoints
# Cross-region replication configured на bucket level
# Standby cluster в region B читает с replicated bucket
table.exec.async-state.enabled: true
execution.checkpointing.interval: 30s
execution.checkpointing.tolerable-failed-checkpoints: 5

Edge cases и known issues

1. Custom serializers
   Custom serializers должны быть thread-safe для async I/O
   Не использовать ThreadLocal не carefully
   
2. State migration
   При schema change state migration работает sync
   Может быть slow для большого state
   Mitigation: incremental migration через State Processor API

3. Hot keys
   Очень skewed data hits cache thrashing
   Hot key state может быть recached multiple times
   Mitigation: SplitAggregateRule для distinct aggregates

4. Async timer handling
   Timers async не имеют такого же ordering как processElement
   Не использовать timers для exactly-once delivery (используйте 2PC sink)

5. Network partitions
   S3 unreachable = job stops processing
   В sync model state writes блокировались
   В async: requests buffer, fail после timeout
   Result: similar behavior, но faster failure detection
WARNING

ForstDB не silver bullet. Для small state (менее 50 GB), stable workloads без rescaling, latency-sensitive applications (sub-ms) — RocksDB local может оставаться лучшим выбором. ForstDB shines для large state (более 100 GB), dynamic environments (frequent rescaling/recovery), cloud-native deployments. Решение должно быть осознанным после benchmarking на actual workload.

Не-SQL APIs

DataStream API и Process Function:

// DataStream API с ForstDB
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();
config.setString("state.backend.type", "forst");
config.setString("state.backend.forst.local-cache.size", "50gb");
env.configure(config);

// Operators могут использовать V1 или V2 API
// V2 обязательна для full async benefits

// V1 ProcessFunction (sync, slow на ForstDB)
DataStream<Event> events = ...;
events.keyBy(e -> e.key)
      .process(new MyProcessFunction());  // V1

// V2 ProcessFunction (async, fast)
events.keyBy(e -> e.key)
      .process(new MyProcessFunctionV2());  // V2 (нужен async runtime)

Чтение source

Flink source:
  flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
    AsyncKeyedProcessOperator.java         -- main async operator
    AsyncWindowAggregateOperator.java
    AsyncDeduplicateOperator.java
    AsyncRankOperator.java
    AsyncJoinOperator.java

  flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/asyncprocessing/
    AsyncProcessingFunction.java
    AsyncExecutionController.java

  flink-state-backends/flink-statebackend-forst/
    src/main/java/org/apache/flink/state/forst/
      ForStStateBackend.java
      ForStOptions.java                     -- все конфиги

FLIP документы:
  FLIP-423: Disaggregated State Management Framework
  FLIP-424: Asynchronous State APIs
  FLIP-425: Asynchronous Execution Model
  FLIP-451: ForSt Local Disk Cache
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие SQL operators в Flink 2.2 поддерживают async state access?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5