State V2 API — async доступ к state
В предыдущем уроке мы видели, что ForStDB требует async I/O для performance. Sync state access на disaggregated state приводит к 1000x slowdown в случае cache miss. State V1 API (классический Flink 1.x) — синхронный. Поэтому для Flink 2.0 был введён State V2 API (FLIP-424) — переписанный с нуля async-first.
В этом уроке мы разбираем State V2 API: что меняется в коде операторов, как работает StateFuture, какие правила использования, и как мигрировать существующий код с V1 на V2.
State primitives через DataStream APIState V1 API: что не так
Классический State API Flink 1.x — синхронный:
// State V1 API (synchronous)
public class MyOperator extends KeyedProcessFunction<String, Event, Result> {
private ValueState<Long> counter;
@Override
public void open(Configuration config) {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Long.class)
);
}
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
Long c = counter.value(); // SYNC blocking call
c = (c == null ? 1L : c + 1L);
counter.update(c); // SYNC blocking call
out.collect(new Result(e.key, c));
}
}
Проблема в counter.value(): на RocksDB local это ~10 µs, на ForstDB cache miss — 10 ms. Operator thread блокируется на 10 ms, в это время никакая другая обработка не происходит. Если средний record processing time был 50 µs, теперь стал 10 ms — throughput падает в 200 раз.
State V1 API не лечится — она фундаментально синхронная.
State V2 API: async-first
State V2 (FLIP-424) переписан async:
// State V2 API (async)
public class MyOperatorV2 extends KeyedProcessFunctionV2<String, Event, Result> {
private ValueState<Long> counter; // Тот же интерфейс, но другие методы
@Override
public void open(Configuration config) {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Long.class)
);
}
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) {
counter.asyncValue().thenAccept(c -> {
long newCount = (c == null ? 1L : c + 1L);
counter.asyncUpdate(newCount).thenAccept(unused -> {
out.collect(new Result(e.key, newCount));
});
});
// Не блокируем; processElement возвращается сразу
}
}
Главные различия:
- Async методы:
asyncValue(),asyncUpdate(),asyncGet(),asyncPut(),asyncRemove()вместо sync. - StateFuture
<T>: всё возвращает StateFuture (аналог CompletableFuture, но интегрирован с runtime). thenAccept/thenApply/thenCompose: для chaining операций.processElementвозвращается сразу: не блокируется на state access.
StateFuture: что это и как работает
StateFuture<T> — это Flink-specific Future, который:
Свойства StateFuture:
- Lightweight (не Java CompletableFuture)
- Интегрирован с Flink runtime threading model
- Гарантирует in-order processing per key
- Не allocate heavy objects (object pool)
- Может быть chain'ed (thenAccept, thenApply, thenCompose)
API:
StateFuture<T>.thenAccept(Consumer<T>) -- execute on completion, no result
StateFuture<T>.thenApply(Function<T,R>) -- transform result, return StateFuture<R>
StateFuture<T>.thenCompose(Function<T,StateFuture<R>>) -- chain async ops
StateFuture<T>.thenCombine(other, BiFunction) -- combine two futures
Что под капотом:
StateRequestBuffer:
- Per-operator buffer для async state requests
- Aggregates запросы для batching (если возможно)
- Submits на background I/O thread
- Returns StateFuture immediately
Background I/O thread:
- Polls request buffer
- Issues I/O (S3 read, RocksDB lookup)
- On completion: fires StateFuture callbacks
- Callbacks queued back в operator thread для in-order processing
Operator thread:
- Sequential per key
- Processes callbacks в order
- Не блокируется на I/O
Critical rule: no sync on async
Самое важное правило: нельзя смешивать sync и async state access в одном operator:
// WRONG: sync .value() на async state
counter.asyncValue().thenAccept(c -> {
// ...
});
Long bad = counter.value(); // ERROR: sync call недопустим
Причина: async state работает через очередь requests. Sync call блокировал бы thread и привёл бы к deadlock. Runtime выкинет UnsupportedOperationException: Synchronous state access is not supported in async context.
Если оператор использует State V2 — все state ops должны быть async.
Async order guarantees
Async усложняет порядок, но Flink гарантирует:
Per-key ordering guarantees:
1. processElement для одного key не запускается, пока previous не закончил
(включая все его StateFuture chains)
2. State updates от processElement_N visible в processElement_N+1
для того же key
3. Timer fires (onTimer) после processElement обработан полностью
4. Watermark advance после processElement chains complete
Между разными keys:
- Параллельное выполнение возможно
- State не shared между keys (keyed state)
- Async I/O может pipeline разные keys
То есть для одного key всё детерминированно как в sync. Для разных keys — concurrent. Это даёт thread-safety без user code locks.
Async timers
Timers тоже async:
// Async timers V2
public void processElement(Event e, Context ctx, Collector<Result> out) {
counter.asyncValue().thenAccept(c -> {
if (c == null) {
ctx.timerService().registerEventTimeTimer(e.timestamp + 10000);
}
counter.asyncUpdate(c + 1L);
});
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
// onTimer тоже может использовать async state
counter.asyncValue().thenAccept(c -> {
if (c != null && c > 100) {
out.collect(new Result("threshold exceeded"));
counter.asyncClear();
}
});
}
Timer callbacks queued in correct order with processElement callbacks per key.
Watermark coordination
Watermarks тоже должны coordinate с async state:
Watermark advance с async state:
1. New watermark W received
2. operator must ensure no in-flight processElement < W
3. Wait until все StateFutures для events < W complete
4. Только потом emit watermark downstream
Это handled automatically Flink runtime через WatermarkCoordinator.
User code не должен явно tracking — async runtime знает все in-flight requests.
Output ordering
Outputs могут быть emit в любом порядке внутри async chain:
public void processElement(Event e, Context ctx, Collector<Result> out) {
counter.asyncValue().thenAccept(c -> {
out.collect(new Result(e, c)); // emit может быть в любом order vs other keys
});
}
Per-key order гарантирован (event N выйдет до event N+1 для того же key). Across keys — нет гарантии. Это same as sync model — keyed processing не гарантирует cross-key order.
Migration: V1 -> V2 пример
Сценарий: real operator на V1, мигрируем на V2.
// Before (V1)
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
private ValueState<Boolean> flagged;
private MapState<String, Long> recentAttempts;
private ListState<Long> recentTimestamps;
@Override
public void open(Configuration config) {
flagged = getRuntimeContext().getState(new ValueStateDescriptor<>("flagged", Boolean.class));
recentAttempts = getRuntimeContext().getMapState(new MapStateDescriptor<>("attempts", String.class, Long.class));
recentTimestamps = getRuntimeContext().getListState(new ListStateDescriptor<>("timestamps", Long.class));
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
Boolean isFlagged = flagged.value(); // SYNC
if (Boolean.TRUE.equals(isFlagged)) return;
Long attempts = recentAttempts.get(tx.merchantId); // SYNC
if (attempts != null && attempts > 10) {
flagged.update(true); // SYNC
out.collect(new Alert(tx, "Too many attempts"));
return;
}
recentAttempts.put(tx.merchantId, (attempts == null ? 1L : attempts + 1L)); // SYNC
recentTimestamps.add(tx.timestamp); // SYNC
}
}
Async версия:
// After (V2)
public class FraudDetectorV2 extends KeyedProcessFunctionV2<String, Transaction, Alert> {
private ValueState<Boolean> flagged;
private MapState<String, Long> recentAttempts;
private ListState<Long> recentTimestamps;
@Override
public void open(Configuration config) {
// Открытие state идентично
flagged = getRuntimeContext().getState(new ValueStateDescriptor<>("flagged", Boolean.class));
recentAttempts = getRuntimeContext().getMapState(new MapStateDescriptor<>("attempts", String.class, Long.class));
recentTimestamps = getRuntimeContext().getListState(new ListStateDescriptor<>("timestamps", Long.class));
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) {
flagged.asyncValue().thenCompose(isFlagged -> {
if (Boolean.TRUE.equals(isFlagged)) {
return StateFutureUtils.completedVoidFuture();
}
return recentAttempts.asyncGet(tx.merchantId).thenCompose(attempts -> {
if (attempts != null && attempts > 10) {
return flagged.asyncUpdate(true).thenAccept(unused -> {
out.collect(new Alert(tx, "Too many attempts"));
});
}
return recentAttempts.asyncPut(tx.merchantId, (attempts == null ? 1L : attempts + 1L))
.thenCompose(unused -> recentTimestamps.asyncAdd(tx.timestamp));
});
});
}
}
Заметные изменения:
processElementтеперь returning void безthrows Exception(errors через StateFuture).- Используется
thenComposeдля chaining. StateFutureUtils.completedVoidFuture()для early exits.- Логика та же, но в callback style.
Migration: shortcuts через wrappers
Чтобы не переписывать весь код, Flink предоставляет wrappers:
// AsyncProcessingFunction wrapper (упрощение для миграции)
public class FraudDetectorWrapped extends AsyncKeyedProcessFunctionAdapter<String, Transaction, Alert> {
// Логика по сути как V1, но runtime автоматически wraps в async
}
// Можно также использовать sync-on-async helper:
public class FraudDetectorHybrid extends KeyedProcessFunctionV2<String, Transaction, Alert> {
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) {
StateFutureUtils.sequential(() -> {
// Внутри этого блока разрешён "sync-style"
// Runtime автоматически handles async I/O behind the scenes
// Но всё равно нужны async setters/getters
});
}
}
Эти wrappers — compromise: они дают V1-like syntax, но всё equivalent async runtime. Throughput может быть ниже чем true async код из-за overhead.
Когда V1 ещё работает
State V1 API остаётся для:
1. Heap-based state backends
- HashMapStateBackend
- In-memory state быстрый, async не нужен
- V1 продолжает работать
2. RocksDB backend (Flink 2.x continues support)
- RocksDB local достаточно быстрый
- V1 работает, V2 тоже работает
- V2 даёт небольшой performance boost через pipelining
3. ForstDB backend (требуется V2 для full benefit)
- V1 работает, но slow
- V2 strongly recommended
- Config: table.exec.async-state.enabled: true
Для всего нового кода рекомендуется V2 как future-proof. State V1 в long-term будет deprecated.
SQL и Table API: автоматический async
В SQL/Table API пользователь не пишет операторы. Async handled планировщиком:
# Включить async state для SQL/Table API
table.exec.async-state.enabled: true
# Tunable parameters
table.exec.async-state.in-flight.size: 100 # parallel I/O requests
table.exec.async-state.buffer-timeout: 100ms
Flink планировщик автоматически генерит async-capable операторы через codegen. Это работает только для supported операторов — список в уроке 4 этого модуля.
Performance benchmarks
Из VLDB paper и release notes:
Word count, 1.2 GB state:
Sync V1 + RocksDB local: 100% baseline (50K events/sec)
Sync V1 + ForstDB no cache: 5% (2.5K events/sec — 20x slower)
Sync V1 + ForstDB 1GB cache: 70% (~35K events/sec)
Async V2 + ForstDB no cache: 65% (~32K events/sec)
Async V2 + ForstDB 1GB cache: 120% (~60K events/sec — FASTER!)
Объяснение:
- Sync V1 + cache miss = 10 ms blocking per record
- Async V2 pipelines requests, скрывает latency
- С cache async + warm cache даёт boost через better parallelism
Nexmark Q20 (filter join):
Sync V1 + RocksDB: 100%
Async V2 + ForstDB: 95% (parity)
Async V2 + ForstDB + MultiJoin: 115% (улучшение благодаря меньшему state)
Limitations и pitfalls
1. Не все операторы V2-ready (Flink 2.0)
- Rank, Deduplicate, GroupAggregate, JoinV2, Window — YES
- Custom UDF без re-engineering — partial
- Подробнее в уроке 4
2. Async может скрывать errors
- StateFuture.thenAccept exception swallowed silently
- Recommended: всегда .exceptionally() для logging
- Или global handler
3. Debugging complex async chains
- Stack traces не показывают callback chain
- Helpful: include event identifiers в log statements
- Tools: async profilers (async-profiler, Java Flight Recorder)
4. State requests batch может delay
- Если low input rate, batching adds latency
- Tune: table.exec.async-state.buffer-timeout = 10ms
5. Memory для in-flight requests
- Each StateFuture allocates (mostly pooled)
- При very high concurrency может pressure heap
- Monitor: heap usage, GC pause
Если operator имеет complex business logic с многими state operations, миграция на V2 не тривиальна. Recommended approach: 1) start with wrapper (AsyncKeyedProcessFunctionAdapter), 2) measure performance, 3) только если узкое место — переписать вручную в true async. Часто wrapper достаточно для 80% benefits.
SQL hints для async tuning
SELECT /*+ STATE_TTL('orders' = '1d') */
o.id, sum(p.amount)
FROM orders o JOIN payments p ON o.id = p.order_id
GROUP BY o.id;
STATE_TTL hint работает для async state так же как для sync. TTL evaluation тоже async — не блокирует процессинг.
Чтение source
Flink source для State V2:
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/
AsyncExecutionController.java -- coordinator async execution
StateRequestBuffer.java -- буфер requests
StateExecutor.java -- background I/O thread
flink-core/src/main/java/org/apache/flink/api/common/state/v2/
ValueState.java -- async ValueState interface
MapState.java
ListState.java
StateFuture.java -- future interface
flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
AsyncKeyedProcessOperator.java
AsyncWindowAggregateOperator.java
AsyncKeyedCoProcessOperator.java
FLIP документы:
FLIP-424: Asynchronous State APIs
FLIP-425: Asynchronous Execution Model for Flink SQL/Table
FLIP-440: Async Source/Sink V2