Зачем нужен Async I/O
В production Flink джобы регулярно делают enrichment — обогащение событий из внешнего источника. Транзакция приходит с user_id, и нужно получить user_profile из Redis. Order приходит с product_id, и нужно подтянуть product_metadata из PostgreSQL. Заявка на займ требует scoring через ML model в gRPC сервисе. Все эти операции — блокирующие I/O.
Наивная реализация через обычный map/flatMap приведёт к тому, что один медленный subtask остановит весь stream. В этом уроке разберёмся, почему sync-lookup убивает throughput, и как Async I/O решает эту проблему — на интуитивном уровне, без deep dive в детали API.
Проблема: sync lookup в map
Представим, что нам нужно обогатить Transaction событие user profile из Redis:
DataStream<EnrichedTransaction> enriched = transactions
.map(tx -> {
UserProfile profile = redisClient.get(tx.getUserId()); // <- блокирующий вызов
return new EnrichedTransaction(tx, profile);
});
Каждый вызов redisClient.get(...) блокирует thread исполнения subtask’а. Пока Redis отвечает (даже 1-5 ms latency), subtask не может обработать следующее событие.
Математика throughput’а subtask’а:
- Latency Redis = 5 ms.
- Один subtask = один thread = последовательное исполнение.
- Maximum throughput одного subtask = 1 / 0.005 = 200 событий/сек.
Если в потоке 100 000 событий/сек, нужен parallelism = 500. Это безумно. И мы не учли, что Redis тоже не бесконечно масштабируется.
t=0: Event 1 received
Subtask получает Event 1 в момент t=0.t=0 to 5ms: blocked
Subtask блокирован: ждёт Redis. Не обрабатывает другие события 5 ms.t=5: Event 2 received
Subtask получает Event 2 в момент t=5ms — сразу после возврата из Redis.t=5 to 10ms: blocked
Subtask снова блокирован — ждёт Redis для Event 2. Ещё 5 ms простоя.t=100ms: 20 events processed
После 100 ms обработано 20 событий. Throughput = 200 событий/сек на subtask.Throughput = 200 events/sec
Subtask тратит 100% времени в блокирующем ожидании Redis. CPU практически простаивает. Throughput ограничен latency Redis.Решение: async I/O — много lookups одновременно
Async I/O позволяет отправить много запросов параллельно, не блокируя subtask thread. Пока 100 запросов в Redis обрабатываются, subtask может принимать новые события и отправлять новые запросы.
DataStream<EnrichedTransaction> enriched = AsyncDataStream
.unorderedWait(
transactions,
new RedisEnrichmentFunction(),
1000, // timeout ms
TimeUnit.MILLISECONDS,
100 // capacity — сколько запросов параллельно
);
Что меняется:
- Subtask отправляет до
capacityзапросов одновременно (например, 100). - Каждый запрос — non-blocking (использует асинхронный клиент Redis).
- Когда любой запрос завершается — соответствующее событие отправляется в downstream.
- Subtask thread свободен между отправкой запросов и обработкой ответов.
t=0: send 100 requests
В момент t=0 subtask отправляет 100 запросов в Redis параллельно. Не ждёт ни один из них — продолжает обработку.Redis processes in parallel
Redis обрабатывает 100 запросов параллельно. Каждый завершается через ~5 ms (но они независимы — overall время = ~5-10 ms).t=5: 100 responses
В момент t=5ms приходят ответы. Subtask обрабатывает и сразу отправляет следующую пачку из 100 запросов.t=5: send 100 more
Subtask отправляет вторую пачку из 100 запросов. Цикл продолжается.t=100: 10000 events
За 100 ms обработано 100 пачек по 100 = 10 000 событий. Throughput = 100 000 событий/сек на subtask.Throughput = 100000 events/sec
Throughput выросла в 500 раз. Бутылочное горлышко уехало с одного subtask thread на Redis cluster.Та же математика, async:
- Latency Redis = 5 ms (одного запроса).
- Capacity = 100 — параллельные запросы.
- Throughput одного subtask = 100 / 0.005 = 20 000 событий/сек.
- Прирост: в 100 раз по сравнению с sync.
Что должен поддерживать клиент
Async I/O работает только если ваш клиент к внешней системе предоставляет async API. Если клиент только синхронный — async I/O бесполезен (это просто sync в обёртке).
Примеры:
| Внешняя система | Async client (Java) | Async client (Python) |
|---|---|---|
| Redis | Lettuce (RedisAsyncCommands) | aioredis |
| PostgreSQL | r2dbc-postgres | asyncpg |
| HTTP | OkHttp Async, AsyncHttpClient | aiohttp |
| gRPC | StubAsyncCallable | aio.grpc |
| Kafka | KafkaProducer.send returns CompletableFuture | aiokafka |
Никогда не оборачивайте sync клиент в CompletableFuture.supplyAsync(...), чтобы «сделать его async». Это просто переносит блокировку в другой thread pool, который тоже исчерпается. Используйте библиотеки, которые делают non-blocking I/O на уровне сокета (через Netty, epoll, async-await).
Чек-лист: подходит ли вам async I/O
| Условие | Async I/O нужен |
|---|---|
| Latency внешней системы > 1 ms | Да |
| Internal: lookup в Flink keyed state | Нет (keyed state локален) |
| External: lookup в Redis, RDBMS, gRPC | Да |
| Запросы можно отправить параллельно (нет глобальных зависимостей) | Да |
| Каждый запрос зависит от результата предыдущего | Нет (тут нужен иной подход) |
| Внешняя система имеет async client | Да |
| Только sync client | Не имеет смысла — сначала меняйте client |
Альтернативы async I/O
Иногда async I/O — не лучший инструмент. Альтернативы:
- Broadcast state (модуль 08): если справочник маленький (тысячи записей) — храните его в state, никаких lookups.
- Keyed state с side input: если событие приходит с
user_id, а profile тоже приходит как отдельный stream, можно сделатьcoGroup/join— никаких внешних lookups. - Cache в operator state: если можно кешировать результаты lookups на стороне subtask (с TTL), это убирает 80% нагрузки на внешнюю систему.
- Lookup join в Flink Table API/SQL (модуль 12): декларативный синтаксис, под капотом всё равно async I/O.
Async I/O — мощный инструмент, но не panacea. Если можно избежать внешних вызовов через broadcast или state — это всегда быстрее.
Попробуй сам
Сделай бенчмарк sync vs async на 10 000 событий:
- Подними локальный Redis:
docker run -p 6379:6379 redis:7-alpine. - Заполни Redis 10 000 ключей:
for i in $(seq 0 9999); do redis-cli set user:$i '{"name":"u'$i'"}'; done. - Напиши Flink job, который читает 10 000 событий из collection source, делает sync-lookup в Redis в
map, и измерь время total. - Напиши тот же job с async I/O (Lettuce/aioredis), capacity=100. Измерь время.
- Сравни. Должно быть как минимум в 10x быстрее, чаще — в 50x-100x.
Ключевые выводы
- Sync lookup в обычном
mapблокирует subtask thread. Throughput ограничен1 / latency_externalсобытий/сек на subtask. - Async I/O позволяет отправить до
capacityзапросов параллельно. Throughput растёт доcapacity / latency_external. - Прирост обычно 50x-500x, в зависимости от capacity и latency внешней системы.
- Работает только с async клиентами (Lettuce, asyncpg, aiohttp, aio.grpc). Оборачивать sync client в CompletableFuture бесполезно.
- Альтернативы: broadcast state для малых справочников, keyed state + co-stream для side input, lookup join в Table API.