Learning Platform
Глоссарий Troubleshooting
Урок 10.02 · 22 мин
Средний
AsyncFunctionRichAsyncFunctionCapacityTimeoutOrdered Unordered

AsyncFunction API: capacity, timeout, ordered/unordered

В предыдущем уроке мы выяснили, зачем нужен async I/O — sync lookups убивают throughput. В этом уроке смотрим как написать AsyncFunction в Flink: API, обязательные параметры (capacity, timeout), выбор между ordered и unordered output, и production-готовые примеры с Redis и PostgreSQL.


API: AsyncDataStream и RichAsyncFunction

Async I/O в Flink — это пара компонентов:

  1. AsyncDataStream — статический фабричный класс. Превращает обычный DataStream в async-обогащённый.
  2. AsyncFunction<IN, OUT> (или RichAsyncFunction для доступа к open/close) — ваша функция, которая делает async lookup.

Базовая структура:

DataStream<Enriched> enriched = AsyncDataStream.unorderedWait(
    inputStream,                  // источник
    new MyAsyncFunction(),         // ваша функция
    5,                             // timeout
    TimeUnit.SECONDS,              // единица timeout
    100                            // capacity
);
# PyFlink 2.x
from pyflink.datastream import AsyncDataStream

enriched = AsyncDataStream.unordered_wait(
    input_stream,
    MyAsyncFunction(),
    5000,         # timeout ms
    100           # capacity
)

RichAsyncFunction: lifecycle и async invoke

RichAsyncFunction имеет три ключевых метода:

public abstract class RichAsyncFunction<IN, OUT>
        extends AbstractRichFunction
        implements AsyncFunction<IN, OUT> {

    // Инициализация ресурсов: connection pools, async clients
    @Override
    public void open(Configuration parameters) throws Exception {}

    // Главный метод: отправить запрос асинхронно и завершить ResultFuture
    @Override
    public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture)
        throws Exception;

    // Вызывается при timeout — можно вернуть default или эмитить null
    @Override
    public void timeout(IN input, ResultFuture<OUT> resultFuture)
        throws Exception {
        resultFuture.complete(Collections.emptyList());
    }

    // Закрытие ресурсов
    @Override
    public void close() throws Exception {}
}

Ключевая идея asyncInvoke: функция получает ResultFuture и должна вызвать .complete(...) когда async операция завершится. До вызова .complete() Flink держит элемент «в полёте» и считает его незавершённым.

Source API v2 изнутри: как Flink управляет async операциями

Пример: Redis enrichment с Lettuce

Lettuce — Reactive Redis-клиент с native async API.

public class RedisEnrichmentFunction extends RichAsyncFunction<Transaction, Enriched> {

    private transient RedisClient client;
    private transient StatefulRedisConnection<String, String> connection;
    private transient RedisAsyncCommands<String, String> async;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = RedisClient.create("redis://redis-cluster:6379");
        connection = client.connect();
        async = connection.async();
    }

    @Override
    public void asyncInvoke(Transaction tx, ResultFuture<Enriched> resultFuture) {
        // Lettuce возвращает RedisFuture (наследует CompletableFuture)
        RedisFuture<String> future = async.get("user:" + tx.getUserId());

        // thenAccept — non-blocking callback
        future.thenAccept(json -> {
            UserProfile profile = parseJson(json);
            resultFuture.complete(Collections.singletonList(
                new Enriched(tx, profile)
            ));
        }).exceptionally(err -> {
            // Ошибка — эмитить пустой результат и логировать
            log.error("Redis lookup failed for {}", tx.getUserId(), err);
            resultFuture.complete(Collections.emptyList());
            return null;
        });
    }

    @Override
    public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture)
            throws Exception {
        // Timeout — эмитить без enrichment вместо падения
        resultFuture.complete(Collections.singletonList(
            new Enriched(tx, UserProfile.UNKNOWN)
        ));
    }

    @Override
    public void close() throws Exception {
        if (connection != null) connection.close();
        if (client != null) client.shutdown();
    }
}
# PyFlink с asyncpg для PostgreSQL
import asyncio
import asyncpg
from pyflink.datastream.functions import AsyncFunction

class PostgresEnrichment(AsyncFunction):
    def open(self, ctx):
        self.loop = asyncio.new_event_loop()
        self.pool = self.loop.run_until_complete(
            asyncpg.create_pool(
                host='postgres', user='flink', database='users',
                min_size=10, max_size=50
            )
        )

    def async_invoke(self, input_, result_future):
        future = asyncio.run_coroutine_threadsafe(
            self._lookup(input_), self.loop
        )
        future.add_done_callback(
            lambda f: result_future.complete([f.result()])
        )

    async def _lookup(self, tx):
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT name, country FROM users WHERE id = $1',
                tx['user_id']
            )
            return {**tx, 'name': row['name'], 'country': row['country']}

    def close(self):
        self.loop.run_until_complete(self.pool.close())
        self.loop.close()
WARNING

В asyncInvoke нельзя ждать результата (например, через .get() или await вне coroutine’ы). Это блокирует subtask thread и убивает смысл async. Всегда регистрируйте callback (thenAccept, add_done_callback), который вызовет resultFuture.complete().


Capacity: сколько «в полёте» одновременно

capacity — максимальное число async запросов, которые могут быть отправлены одним subtask’ом одновременно без получения ответа. Это back-pressure mechanism.

Когда capacity исчерпан, Flink блокирует subtask thread — новые входящие события ждут, пока какой-то из in-flight запросов завершится.

Capacity как backpressure механизм

100 in-flight, capacity 100

Subtask получил 100 событий, отправил 100 запросов (заполнил capacity).

101 event blocked

Подходит 101-е событие. Subtask блокирован — не может принять новое событие, пока хоть один запрос не завершится.

Request done, slot freed

Один из запросов завершился — освободил слот в capacity. Subtask принимает следующее событие, отправляет запрос.

Backpressure propagates

Backpressure распространяется upstream: source замедляется, события буферизуются в Kafka.

Как выбирать capacity:

  • Маленький (10-50): для медленных или нестабильных внешних систем — снижает нагрузку, защищает от overwhelming.
  • Средний (100-500): обычный случай для здоровых external API.
  • Большой (1000+): для очень быстрых, masively parallel систем (Redis cluster, DynamoDB).

Capacity также влияет на размер state, который Flink держит в OperatorState для recovery — каждый in-flight запрос сохраняется, чтобы при рестарте можно было повторить.


Timeout: что делать с зависшими запросами

timeout — максимальное время, через которое Flink считает запрос «зависшим» и вызывает timeout(input, resultFuture) метод вашей функции.

@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
    // Вариант 1: эмитить событие без enrichment
    resultFuture.complete(Collections.singletonList(
        new Enriched(tx, UserProfile.UNKNOWN)
    ));

    // Вариант 2: отбросить событие
    // resultFuture.complete(Collections.emptyList());

    // Вариант 3: бросить exception -> job упадёт, restart from checkpoint
    // throw new RuntimeException("Lookup failed");
}
WARNING

По умолчанию timeout бросает TimeoutException, которое роняет job. В production это редко нужно — лучше эмитить с fallback (например, UNKNOWN_PROFILE) или отправлять в side output для DLQ.


Ordered vs unordered: порядок выхода

Async I/O предлагает два варианта вывода результатов:

orderedWait: сохранение порядка

AsyncDataStream.orderedWait(input, func, 5, TimeUnit.SECONDS, 100);

Flink буферизует завершённые запросы, чтобы выдать их в том же порядке, в каком пришли события. Если событие #5 завершилось раньше, чем #4 — Flink ждёт #4 перед эмитом.

Минусы: медленнее, требует буфера. Один медленный запрос задерживает все последующие.

unorderedWait: порядок не важен

AsyncDataStream.unorderedWait(input, func, 5, TimeUnit.SECONDS, 100);

События эмитятся в downstream в порядке завершения async запросов. Если #5 завершился раньше #4 — #5 выходит сначала.

Плюсы: максимальный throughput, low latency. Минусы: downstream должен быть толерантен к out-of-order событиям.

Когда какой выбирать

СценарийВыбор
Простое обогащение, downstream statelessunorderedWait
Sink in Kafka с key-ordering требованиемorderedWait (но лучше keyBy перед sink)
Event time processingunorderedWait (watermarks всё равно гарантируют корректность окон)
ProcessFunction с timer, зависящим от порядкаorderedWait
Maximum throughputunorderedWait всегда быстрее

В подавляющем большинстве случаев — unorderedWait. Flink использует watermarks для гарантии корректности event-time processing независимо от порядка.


Watermarks и async

Async I/O корректно обрабатывает watermarks. Между event’ами и watermark’ами Flink гарантирует:

  • Watermark не обгонит ни один из in-flight событий с timestamp меньше него.
  • Watermark распространяется вниз только после того, как все события с меньшим timestamp эмиттены.

Это значит, что async I/O безопасно для event-time агрегаций. Можно ставить unorderedWait перед window aggregation — watermarks гарантируют корректность.


Метрики async I/O

Flink экспортирует ключевые метрики для AsyncFunction:

  • numAsyncRequestsInFlight — текущее число in-flight запросов (если близко к capacity -> backpressure).
  • numAsyncRequestsCompleted — total completed.
  • numAsyncTimeouts — total timeouts (если растёт — увеличьте timeout или fix downstream).

Мониторьте inflight/capacity ratio в Grafana — высокий ratio (>80%) сигнализирует, что вы упёрлись в external system.


Попробуй сам

  1. Возьми job из урока 1 (Redis enrichment).
  2. Запусти с capacity=10, timeout=1s. Замерь throughput.
  3. Увеличь до capacity=200, timeout=1s. Сравни throughput — должен вырасти.
  4. Поставь Redis в degraded режим: tc qdisc add dev lo root netem delay 100ms (добавляет 100 ms latency). Посмотри, как изменится numAsyncRequestsInFlight (должен подскочить).
  5. Переключи unorderedWait на orderedWait — посмотри, как изменится latency.

Ключевые выводы

  1. AsyncDataStream.unorderedWait/orderedWait — два варианта API. В 90% случаев нужен unorderedWait.
  2. RichAsyncFunction.asyncInvoke должен отправить запрос и не блокировать subtask thread. Регистрируйте callback, который вызовет resultFuture.complete().
  3. capacity — max in-flight запросов на subtask. Backpressure механизм при превышении.
  4. timeout — время до вызова timeout() метода. По умолчанию роняет job; в production лучше эмитить fallback.
  5. ordered блокирует выход, ожидая запросы в порядке; unordered эмитит как завершилось — быстрее и предпочтительнее.
  6. Watermarks корректно обрабатываются в async — безопасно перед window aggregation.
Проверка знанийKnowledge check
Вы поставили AsyncFunction с capacity=500, timeout=5s. Через час работы Flink метрики показывают: numAsyncRequestsInFlight=500 (постоянно), numAsyncTimeouts растёт по 100/мин. Что происходит, и какие три действия имеет смысл предпринять?
ОтветAnswer
Происходит: external system не справляется с нагрузкой. Subtask постоянно заполнен 500 in-flight запросами (полная capacity), запросы массово таймаутятся. Backpressure распространяется upstream, источники замедляются. Действия: (1) Снизить нагрузку на external system — добавить local cache в operator state с TTL (например, через CaffeineCache), чтобы часть запросов закрывалась локально. (2) Проверить external system: возможно, нужно горизонтальное масштабирование Redis cluster или увеличение PostgreSQL connection pool. (3) Снизить capacity до 100-200: текущая capacity=500 в одном subtask может топить external system, лучше дать меньше параллельных запросов, но более стабильно. Также проверить timeout метод — должен ли он эмитить fallback или отправлять в DLQ через side output, а не падать с exception.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В asyncInvoke вы вызываете redisFuture.get() (блокирующий wait) и потом resultFuture.complete(result). Job работает, но throughput низкий, как при sync lookup. Почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3