Learning Platform
Глоссарий Troubleshooting
Урок 11.03 · 25 мин
Продвинутый
State V2 APIAsync StateStateFutureFLIP-424MigrationAsync Execution

State V2 API — async доступ к state

В предыдущем уроке мы видели, что ForStDB требует async I/O для performance. Sync state access на disaggregated state приводит к 1000x slowdown в случае cache miss. State V1 API (классический Flink 1.x) — синхронный. Поэтому для Flink 2.0 был введён State V2 API (FLIP-424) — переписанный с нуля async-first.

В этом уроке мы разбираем State V2 API: что меняется в коде операторов, как работает StateFuture, какие правила использования, и как мигрировать существующий код с V1 на V2.

State primitives через DataStream API

State V1 API: что не так

Классический State API Flink 1.x — синхронный:

// State V1 API (synchronous)
public class MyOperator extends KeyedProcessFunction<String, Event, Result> {
    private ValueState<Long> counter;

    @Override
    public void open(Configuration config) {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Long.class)
        );
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
        Long c = counter.value();        // SYNC blocking call
        c = (c == null ? 1L : c + 1L);
        counter.update(c);                // SYNC blocking call
        out.collect(new Result(e.key, c));
    }
}

Проблема в counter.value(): на RocksDB local это ~10 µs, на ForstDB cache miss — 10 ms. Operator thread блокируется на 10 ms, в это время никакая другая обработка не происходит. Если средний record processing time был 50 µs, теперь стал 10 ms — throughput падает в 200 раз.

State V1 API не лечится — она фундаментально синхронная.

State V2 API: async-first

State V2 (FLIP-424) переписан async:

// State V2 API (async)
public class MyOperatorV2 extends KeyedProcessFunctionV2<String, Event, Result> {
    private ValueState<Long> counter;  // Тот же интерфейс, но другие методы

    @Override
    public void open(Configuration config) {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Long.class)
        );
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Result> out) {
        counter.asyncValue().thenAccept(c -> {
            long newCount = (c == null ? 1L : c + 1L);
            counter.asyncUpdate(newCount).thenAccept(unused -> {
                out.collect(new Result(e.key, newCount));
            });
        });
        // Не блокируем; processElement возвращается сразу
    }
}

Главные различия:

  1. Async методы: asyncValue(), asyncUpdate(), asyncGet(), asyncPut(), asyncRemove() вместо sync.
  2. StateFuture<T>: всё возвращает StateFuture (аналог CompletableFuture, но интегрирован с runtime).
  3. thenAccept/thenApply/thenCompose: для chaining операций.
  4. processElement возвращается сразу: не блокируется на state access.

StateFuture: что это и как работает

StateFuture<T> — это Flink-specific Future, который:

Свойства StateFuture:
  - Lightweight (не Java CompletableFuture)
  - Интегрирован с Flink runtime threading model
  - Гарантирует in-order processing per key
  - Не allocate heavy objects (object pool)
  - Может быть chain'ed (thenAccept, thenApply, thenCompose)

API:
  StateFuture<T>.thenAccept(Consumer<T>)         -- execute on completion, no result
  StateFuture<T>.thenApply(Function<T,R>)        -- transform result, return StateFuture<R>
  StateFuture<T>.thenCompose(Function<T,StateFuture<R>>) -- chain async ops
  StateFuture<T>.thenCombine(other, BiFunction)  -- combine two futures

Что под капотом:

StateRequestBuffer:
  - Per-operator buffer для async state requests
  - Aggregates запросы для batching (если возможно)
  - Submits на background I/O thread
  - Returns StateFuture immediately

Background I/O thread:
  - Polls request buffer
  - Issues I/O (S3 read, RocksDB lookup)
  - On completion: fires StateFuture callbacks
  - Callbacks queued back в operator thread для in-order processing

Operator thread:
  - Sequential per key
  - Processes callbacks в order
  - Не блокируется на I/O

Critical rule: no sync on async

Самое важное правило: нельзя смешивать sync и async state access в одном operator:

// WRONG: sync .value() на async state
counter.asyncValue().thenAccept(c -> {
    // ...
});
Long bad = counter.value();  // ERROR: sync call недопустим

Причина: async state работает через очередь requests. Sync call блокировал бы thread и привёл бы к deadlock. Runtime выкинет UnsupportedOperationException: Synchronous state access is not supported in async context.

Если оператор использует State V2 — все state ops должны быть async.

Async order guarantees

Async усложняет порядок, но Flink гарантирует:

Per-key ordering guarantees:

1. processElement для одного key не запускается, пока previous не закончил
   (включая все его StateFuture chains)

2. State updates от processElement_N visible в processElement_N+1
   для того же key

3. Timer fires (onTimer) после processElement обработан полностью

4. Watermark advance после processElement chains complete

Между разными keys:
  - Параллельное выполнение возможно
  - State не shared между keys (keyed state)
  - Async I/O может pipeline разные keys

То есть для одного key всё детерминированно как в sync. Для разных keys — concurrent. Это даёт thread-safety без user code locks.

Sync vs Async state execution model
Sync (V1)Sync model: один record processing blocks thread целиком, включая state I/O. Throughput = 1 / (processing_time + state_io_time).
processElement
Blocked 10 ms (S3 read)state.value() блокирует на ~10 ms (S3 read). Operator thread idle во время I/O.
value returns
ProcessСейчас может обрабатывать. Compute ~10 µs. Throughput 1 / 10 ms = 100 events/sec.
state.update
Blocked 10 ms (S3 write)state.update() блокирует ещё ~10 ms. Total 20 ms per record. Throughput 50 events/sec.
Async (V2)Async model: processElement возвращается сразу, async chain работает на background thread. Operator thread свободен для next event.
asyncValue
Submit + returnSubmit request, get StateFuture. Operator thread свободен. Throughput не bounded I/O latency.
next event
Pipeline 100 in-flightBackground I/O thread обрабатывает 100+ pending requests параллельно. S3 connection pool max concurrency.
callbacks
Process + emitOn completion: thenAccept callback executes, emit result. Throughput ~10,000+ events/sec.

Async timers

Timers тоже async:

// Async timers V2
public void processElement(Event e, Context ctx, Collector<Result> out) {
    counter.asyncValue().thenAccept(c -> {
        if (c == null) {
            ctx.timerService().registerEventTimeTimer(e.timestamp + 10000);
        }
        counter.asyncUpdate(c + 1L);
    });
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
    // onTimer тоже может использовать async state
    counter.asyncValue().thenAccept(c -> {
        if (c != null && c > 100) {
            out.collect(new Result("threshold exceeded"));
            counter.asyncClear();
        }
    });
}

Timer callbacks queued in correct order with processElement callbacks per key.

Watermark coordination

Watermarks тоже должны coordinate с async state:

Watermark advance с async state:

1. New watermark W received
2. operator must ensure no in-flight processElement < W
3. Wait until все StateFutures для events < W complete
4. Только потом emit watermark downstream

Это handled automatically Flink runtime через WatermarkCoordinator.
User code не должен явно tracking — async runtime знает все in-flight requests.

Output ordering

Outputs могут быть emit в любом порядке внутри async chain:

public void processElement(Event e, Context ctx, Collector<Result> out) {
    counter.asyncValue().thenAccept(c -> {
        out.collect(new Result(e, c));   // emit может быть в любом order vs other keys
    });
}

Per-key order гарантирован (event N выйдет до event N+1 для того же key). Across keys — нет гарантии. Это same as sync model — keyed processing не гарантирует cross-key order.

Migration: V1 -> V2 пример

Сценарий: real operator на V1, мигрируем на V2.

// Before (V1)
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    private ValueState<Boolean> flagged;
    private MapState<String, Long> recentAttempts;
    private ListState<Long> recentTimestamps;

    @Override
    public void open(Configuration config) {
        flagged = getRuntimeContext().getState(new ValueStateDescriptor<>("flagged", Boolean.class));
        recentAttempts = getRuntimeContext().getMapState(new MapStateDescriptor<>("attempts", String.class, Long.class));
        recentTimestamps = getRuntimeContext().getListState(new ListStateDescriptor<>("timestamps", Long.class));
    }

    @Override
    public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
        Boolean isFlagged = flagged.value();    // SYNC
        if (Boolean.TRUE.equals(isFlagged)) return;

        Long attempts = recentAttempts.get(tx.merchantId);   // SYNC
        if (attempts != null && attempts > 10) {
            flagged.update(true);                            // SYNC
            out.collect(new Alert(tx, "Too many attempts"));
            return;
        }
        recentAttempts.put(tx.merchantId, (attempts == null ? 1L : attempts + 1L));  // SYNC
        recentTimestamps.add(tx.timestamp);                  // SYNC
    }
}

Async версия:

// After (V2)
public class FraudDetectorV2 extends KeyedProcessFunctionV2<String, Transaction, Alert> {
    private ValueState<Boolean> flagged;
    private MapState<String, Long> recentAttempts;
    private ListState<Long> recentTimestamps;

    @Override
    public void open(Configuration config) {
        // Открытие state идентично
        flagged = getRuntimeContext().getState(new ValueStateDescriptor<>("flagged", Boolean.class));
        recentAttempts = getRuntimeContext().getMapState(new MapStateDescriptor<>("attempts", String.class, Long.class));
        recentTimestamps = getRuntimeContext().getListState(new ListStateDescriptor<>("timestamps", Long.class));
    }

    @Override
    public void processElement(Transaction tx, Context ctx, Collector<Alert> out) {
        flagged.asyncValue().thenCompose(isFlagged -> {
            if (Boolean.TRUE.equals(isFlagged)) {
                return StateFutureUtils.completedVoidFuture();
            }
            return recentAttempts.asyncGet(tx.merchantId).thenCompose(attempts -> {
                if (attempts != null && attempts > 10) {
                    return flagged.asyncUpdate(true).thenAccept(unused -> {
                        out.collect(new Alert(tx, "Too many attempts"));
                    });
                }
                return recentAttempts.asyncPut(tx.merchantId, (attempts == null ? 1L : attempts + 1L))
                    .thenCompose(unused -> recentTimestamps.asyncAdd(tx.timestamp));
            });
        });
    }
}

Заметные изменения:

  • processElement теперь returning void без throws Exception (errors через StateFuture).
  • Используется thenCompose для chaining.
  • StateFutureUtils.completedVoidFuture() для early exits.
  • Логика та же, но в callback style.

Migration: shortcuts через wrappers

Чтобы не переписывать весь код, Flink предоставляет wrappers:

// AsyncProcessingFunction wrapper (упрощение для миграции)
public class FraudDetectorWrapped extends AsyncKeyedProcessFunctionAdapter<String, Transaction, Alert> {
    // Логика по сути как V1, но runtime автоматически wraps в async
}

// Можно также использовать sync-on-async helper:
public class FraudDetectorHybrid extends KeyedProcessFunctionV2<String, Transaction, Alert> {
    public void processElement(Transaction tx, Context ctx, Collector<Alert> out) {
        StateFutureUtils.sequential(() -> {
            // Внутри этого блока разрешён "sync-style"
            // Runtime автоматически handles async I/O behind the scenes
            // Но всё равно нужны async setters/getters
        });
    }
}

Эти wrappers — compromise: они дают V1-like syntax, но всё equivalent async runtime. Throughput может быть ниже чем true async код из-за overhead.

Когда V1 ещё работает

State V1 API остаётся для:

1. Heap-based state backends
   - HashMapStateBackend
   - In-memory state быстрый, async не нужен
   - V1 продолжает работать

2. RocksDB backend (Flink 2.x continues support)
   - RocksDB local достаточно быстрый
   - V1 работает, V2 тоже работает
   - V2 даёт небольшой performance boost через pipelining

3. ForstDB backend (требуется V2 для full benefit)
   - V1 работает, но slow
   - V2 strongly recommended
   - Config: table.exec.async-state.enabled: true

Для всего нового кода рекомендуется V2 как future-proof. State V1 в long-term будет deprecated.

SQL и Table API: автоматический async

В SQL/Table API пользователь не пишет операторы. Async handled планировщиком:

# Включить async state для SQL/Table API
table.exec.async-state.enabled: true

# Tunable parameters
table.exec.async-state.in-flight.size: 100  # parallel I/O requests
table.exec.async-state.buffer-timeout: 100ms

Flink планировщик автоматически генерит async-capable операторы через codegen. Это работает только для supported операторов — список в уроке 4 этого модуля.

Performance benchmarks

Из VLDB paper и release notes:

Word count, 1.2 GB state:

Sync V1 + RocksDB local: 100% baseline (50K events/sec)
Sync V1 + ForstDB no cache: 5% (2.5K events/sec — 20x slower)
Sync V1 + ForstDB 1GB cache: 70% (~35K events/sec)
Async V2 + ForstDB no cache: 65% (~32K events/sec)
Async V2 + ForstDB 1GB cache: 120% (~60K events/sec — FASTER!)

Объяснение:
  - Sync V1 + cache miss = 10 ms blocking per record
  - Async V2 pipelines requests, скрывает latency
  - С cache async + warm cache даёт boost через better parallelism

Nexmark Q20 (filter join):
  Sync V1 + RocksDB: 100%
  Async V2 + ForstDB: 95% (parity)
  Async V2 + ForstDB + MultiJoin: 115% (улучшение благодаря меньшему state)

Limitations и pitfalls

1. Не все операторы V2-ready (Flink 2.0)
   - Rank, Deduplicate, GroupAggregate, JoinV2, Window — YES
   - Custom UDF без re-engineering — partial
   - Подробнее в уроке 4

2. Async может скрывать errors
   - StateFuture.thenAccept exception swallowed silently
   - Recommended: всегда .exceptionally() для logging
   - Или global handler

3. Debugging complex async chains
   - Stack traces не показывают callback chain
   - Helpful: include event identifiers в log statements
   - Tools: async profilers (async-profiler, Java Flight Recorder)

4. State requests batch может delay
   - Если low input rate, batching adds latency
   - Tune: table.exec.async-state.buffer-timeout = 10ms

5. Memory для in-flight requests
   - Each StateFuture allocates (mostly pooled)
   - При very high concurrency может pressure heap
   - Monitor: heap usage, GC pause
WARNING

Если operator имеет complex business logic с многими state operations, миграция на V2 не тривиальна. Recommended approach: 1) start with wrapper (AsyncKeyedProcessFunctionAdapter), 2) measure performance, 3) только если узкое место — переписать вручную в true async. Часто wrapper достаточно для 80% benefits.

SQL hints для async tuning

SELECT /*+ STATE_TTL('orders' = '1d') */
       o.id, sum(p.amount)
FROM orders o JOIN payments p ON o.id = p.order_id
GROUP BY o.id;

STATE_TTL hint работает для async state так же как для sync. TTL evaluation тоже async — не блокирует процессинг.

Чтение source

Flink source для State V2:
  flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/
    AsyncExecutionController.java       -- coordinator async execution
    StateRequestBuffer.java              -- буфер requests
    StateExecutor.java                   -- background I/O thread

  flink-core/src/main/java/org/apache/flink/api/common/state/v2/
    ValueState.java                       -- async ValueState interface
    MapState.java
    ListState.java
    StateFuture.java                      -- future interface

  flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
    AsyncKeyedProcessOperator.java
    AsyncWindowAggregateOperator.java
    AsyncKeyedCoProcessOperator.java

FLIP документы:
  FLIP-424: Asynchronous State APIs
  FLIP-425: Asynchronous Execution Model for Flink SQL/Table

  FLIP-440: Async Source/Sink V2
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Что такое StateFuture<T> в State V2 API и чем отличается от Java CompletableFuture?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5