Learning Platform
Глоссарий Troubleshooting
Урок 10.01 · 15 мин
Средний
Async I/OThroughputEnrichmentLatency

Зачем нужен Async I/O

В production Flink джобы регулярно делают enrichment — обогащение событий из внешнего источника. Транзакция приходит с user_id, и нужно получить user_profile из Redis. Order приходит с product_id, и нужно подтянуть product_metadata из PostgreSQL. Заявка на займ требует scoring через ML model в gRPC сервисе. Все эти операции — блокирующие I/O.

Наивная реализация через обычный map/flatMap приведёт к тому, что один медленный subtask остановит весь stream. В этом уроке разберёмся, почему sync-lookup убивает throughput, и как Async I/O решает эту проблему — на интуитивном уровне, без deep dive в детали API.

Паттерны обогащения потока: sync, async, cache

Проблема: sync lookup в map

Представим, что нам нужно обогатить Transaction событие user profile из Redis:

DataStream<EnrichedTransaction> enriched = transactions
    .map(tx -> {
        UserProfile profile = redisClient.get(tx.getUserId());  // <- блокирующий вызов
        return new EnrichedTransaction(tx, profile);
    });

Каждый вызов redisClient.get(...) блокирует thread исполнения subtask’а. Пока Redis отвечает (даже 1-5 ms latency), subtask не может обработать следующее событие.

Математика throughput’а subtask’а:

  • Latency Redis = 5 ms.
  • Один subtask = один thread = последовательное исполнение.
  • Maximum throughput одного subtask = 1 / 0.005 = 200 событий/сек.

Если в потоке 100 000 событий/сек, нужен parallelism = 500. Это безумно. И мы не учли, что Redis тоже не бесконечно масштабируется.

Sync lookup: timeline блокирующих вызовов

t=0: Event 1 received

Subtask получает Event 1 в момент t=0.
lookup Redis

t=0 to 5ms: blocked

Subtask блокирован: ждёт Redis. Не обрабатывает другие события 5 ms.

t=5: Event 2 received

Subtask получает Event 2 в момент t=5ms — сразу после возврата из Redis.
lookup Redis

t=5 to 10ms: blocked

Subtask снова блокирован — ждёт Redis для Event 2. Ещё 5 ms простоя.

t=100ms: 20 events processed

После 100 ms обработано 20 событий. Throughput = 200 событий/сек на subtask.
итог

Throughput = 200 events/sec

Subtask тратит 100% времени в блокирующем ожидании Redis. CPU практически простаивает. Throughput ограничен latency Redis.

Решение: async I/O — много lookups одновременно

Async I/O позволяет отправить много запросов параллельно, не блокируя subtask thread. Пока 100 запросов в Redis обрабатываются, subtask может принимать новые события и отправлять новые запросы.

DataStream<EnrichedTransaction> enriched = AsyncDataStream
    .unorderedWait(
        transactions,
        new RedisEnrichmentFunction(),
        1000,  // timeout ms
        TimeUnit.MILLISECONDS,
        100    // capacity — сколько запросов параллельно
    );

Что меняется:

  • Subtask отправляет до capacity запросов одновременно (например, 100).
  • Каждый запрос — non-blocking (использует асинхронный клиент Redis).
  • Когда любой запрос завершается — соответствующее событие отправляется в downstream.
  • Subtask thread свободен между отправкой запросов и обработкой ответов.
Async lookup: параллельные запросы

t=0: send 100 requests

В момент t=0 subtask отправляет 100 запросов в Redis параллельно. Не ждёт ни один из них — продолжает обработку.
async

Redis processes in parallel

Redis обрабатывает 100 запросов параллельно. Каждый завершается через ~5 ms (но они независимы — overall время = ~5-10 ms).

t=5: 100 responses

В момент t=5ms приходят ответы. Subtask обрабатывает и сразу отправляет следующую пачку из 100 запросов.
continue

t=5: send 100 more

Subtask отправляет вторую пачку из 100 запросов. Цикл продолжается.

t=100: 10000 events

За 100 ms обработано 100 пачек по 100 = 10 000 событий. Throughput = 100 000 событий/сек на subtask.
итог

Throughput = 100000 events/sec

Throughput выросла в 500 раз. Бутылочное горлышко уехало с одного subtask thread на Redis cluster.

Та же математика, async:

  • Latency Redis = 5 ms (одного запроса).
  • Capacity = 100 — параллельные запросы.
  • Throughput одного subtask = 100 / 0.005 = 20 000 событий/сек.
  • Прирост: в 100 раз по сравнению с sync.

Что должен поддерживать клиент

Async I/O работает только если ваш клиент к внешней системе предоставляет async API. Если клиент только синхронный — async I/O бесполезен (это просто sync в обёртке).

Примеры:

Внешняя системаAsync client (Java)Async client (Python)
RedisLettuce (RedisAsyncCommands)aioredis
PostgreSQLr2dbc-postgresasyncpg
HTTPOkHttp Async, AsyncHttpClientaiohttp
gRPCStubAsyncCallableaio.grpc
KafkaKafkaProducer.send returns CompletableFutureaiokafka
WARNING

Никогда не оборачивайте sync клиент в CompletableFuture.supplyAsync(...), чтобы «сделать его async». Это просто переносит блокировку в другой thread pool, который тоже исчерпается. Используйте библиотеки, которые делают non-blocking I/O на уровне сокета (через Netty, epoll, async-await).


Чек-лист: подходит ли вам async I/O

УсловиеAsync I/O нужен
Latency внешней системы > 1 msДа
Internal: lookup в Flink keyed stateНет (keyed state локален)
External: lookup в Redis, RDBMS, gRPCДа
Запросы можно отправить параллельно (нет глобальных зависимостей)Да
Каждый запрос зависит от результата предыдущегоНет (тут нужен иной подход)
Внешняя система имеет async clientДа
Только sync clientНе имеет смысла — сначала меняйте client

Альтернативы async I/O

Иногда async I/O — не лучший инструмент. Альтернативы:

  1. Broadcast state (модуль 08): если справочник маленький (тысячи записей) — храните его в state, никаких lookups.
  2. Keyed state с side input: если событие приходит с user_id, а profile тоже приходит как отдельный stream, можно сделать coGroup/join — никаких внешних lookups.
  3. Cache в operator state: если можно кешировать результаты lookups на стороне subtask (с TTL), это убирает 80% нагрузки на внешнюю систему.
  4. Lookup join в Flink Table API/SQL (модуль 12): декларативный синтаксис, под капотом всё равно async I/O.

Async I/O — мощный инструмент, но не panacea. Если можно избежать внешних вызовов через broadcast или state — это всегда быстрее.


Попробуй сам

Сделай бенчмарк sync vs async на 10 000 событий:

  1. Подними локальный Redis: docker run -p 6379:6379 redis:7-alpine.
  2. Заполни Redis 10 000 ключей: for i in $(seq 0 9999); do redis-cli set user:$i '{"name":"u'$i'"}'; done.
  3. Напиши Flink job, который читает 10 000 событий из collection source, делает sync-lookup в Redis в map, и измерь время total.
  4. Напиши тот же job с async I/O (Lettuce/aioredis), capacity=100. Измерь время.
  5. Сравни. Должно быть как минимум в 10x быстрее, чаще — в 50x-100x.

Ключевые выводы

  1. Sync lookup в обычном map блокирует subtask thread. Throughput ограничен 1 / latency_external событий/сек на subtask.
  2. Async I/O позволяет отправить до capacity запросов параллельно. Throughput растёт до capacity / latency_external.
  3. Прирост обычно 50x-500x, в зависимости от capacity и latency внешней системы.
  4. Работает только с async клиентами (Lettuce, asyncpg, aiohttp, aio.grpc). Оборачивать sync client в CompletableFuture бесполезно.
  5. Альтернативы: broadcast state для малых справочников, keyed state + co-stream для side input, lookup join в Table API.
Проверка знанийKnowledge check
У вас Flink job обрабатывает 50 000 событий/сек. Каждое событие требует lookup в PostgreSQL с latency 10 ms через psycopg2 (sync клиент). Какой parallelism теоретически нужен с sync подходом? Какой подход лучше в этом сценарии?
ОтветAnswer
С sync клиентом throughput одного subtask = 1/0.010 = 100 событий/сек. Чтобы обработать 50 000 событий/сек, нужно parallelism = 50 000 / 100 = 500 subtask. Это нереально: 500 thread'ов в JVM, 500 connection'ов в PostgreSQL (БД упадёт от такого pool). Лучший подход: использовать асинхронный клиент (asyncpg в PyFlink или r2dbc в Java) и AsyncDataStream.unorderedWait с capacity=200. Тогда throughput одного subtask = 200 / 0.010 = 20 000 событий/сек, нужно parallelism = 3 (запас). Suggest также: добавить local cache в operator state с TTL 10 минут — если 80% запросов попадают в одни и те же ключи, нагрузка на PostgreSQL упадёт ещё в 5 раз. И поставить PostgreSQL за PgBouncer для управления connection pool.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Sync lookup в обычном map function блокирует subtask thread. Какой теоретический максимальный throughput одного subtask, если внешняя система отвечает за 20 ms?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3