AsyncFunction API: capacity, timeout, ordered/unordered
В предыдущем уроке мы выяснили, зачем нужен async I/O — sync lookups убивают throughput. В этом уроке смотрим как написать AsyncFunction в Flink: API, обязательные параметры (capacity, timeout), выбор между ordered и unordered output, и production-готовые примеры с Redis и PostgreSQL.
API: AsyncDataStream и RichAsyncFunction
Async I/O в Flink — это пара компонентов:
AsyncDataStream— статический фабричный класс. Превращает обычный DataStream в async-обогащённый.AsyncFunction<IN, OUT>(илиRichAsyncFunctionдля доступа кopen/close) — ваша функция, которая делает async lookup.
Базовая структура:
DataStream<Enriched> enriched = AsyncDataStream.unorderedWait(
inputStream, // источник
new MyAsyncFunction(), // ваша функция
5, // timeout
TimeUnit.SECONDS, // единица timeout
100 // capacity
);
# PyFlink 2.x
from pyflink.datastream import AsyncDataStream
enriched = AsyncDataStream.unordered_wait(
input_stream,
MyAsyncFunction(),
5000, # timeout ms
100 # capacity
)
RichAsyncFunction: lifecycle и async invoke
RichAsyncFunction имеет три ключевых метода:
public abstract class RichAsyncFunction<IN, OUT>
extends AbstractRichFunction
implements AsyncFunction<IN, OUT> {
// Инициализация ресурсов: connection pools, async clients
@Override
public void open(Configuration parameters) throws Exception {}
// Главный метод: отправить запрос асинхронно и завершить ResultFuture
@Override
public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture)
throws Exception;
// Вызывается при timeout — можно вернуть default или эмитить null
@Override
public void timeout(IN input, ResultFuture<OUT> resultFuture)
throws Exception {
resultFuture.complete(Collections.emptyList());
}
// Закрытие ресурсов
@Override
public void close() throws Exception {}
}
Ключевая идея asyncInvoke: функция получает ResultFuture и должна вызвать .complete(...) когда async операция завершится. До вызова .complete() Flink держит элемент «в полёте» и считает его незавершённым.
Пример: Redis enrichment с Lettuce
Lettuce — Reactive Redis-клиент с native async API.
public class RedisEnrichmentFunction extends RichAsyncFunction<Transaction, Enriched> {
private transient RedisClient client;
private transient StatefulRedisConnection<String, String> connection;
private transient RedisAsyncCommands<String, String> async;
@Override
public void open(Configuration parameters) throws Exception {
client = RedisClient.create("redis://redis-cluster:6379");
connection = client.connect();
async = connection.async();
}
@Override
public void asyncInvoke(Transaction tx, ResultFuture<Enriched> resultFuture) {
// Lettuce возвращает RedisFuture (наследует CompletableFuture)
RedisFuture<String> future = async.get("user:" + tx.getUserId());
// thenAccept — non-blocking callback
future.thenAccept(json -> {
UserProfile profile = parseJson(json);
resultFuture.complete(Collections.singletonList(
new Enriched(tx, profile)
));
}).exceptionally(err -> {
// Ошибка — эмитить пустой результат и логировать
log.error("Redis lookup failed for {}", tx.getUserId(), err);
resultFuture.complete(Collections.emptyList());
return null;
});
}
@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture)
throws Exception {
// Timeout — эмитить без enrichment вместо падения
resultFuture.complete(Collections.singletonList(
new Enriched(tx, UserProfile.UNKNOWN)
));
}
@Override
public void close() throws Exception {
if (connection != null) connection.close();
if (client != null) client.shutdown();
}
}
# PyFlink с asyncpg для PostgreSQL
import asyncio
import asyncpg
from pyflink.datastream.functions import AsyncFunction
class PostgresEnrichment(AsyncFunction):
def open(self, ctx):
self.loop = asyncio.new_event_loop()
self.pool = self.loop.run_until_complete(
asyncpg.create_pool(
host='postgres', user='flink', database='users',
min_size=10, max_size=50
)
)
def async_invoke(self, input_, result_future):
future = asyncio.run_coroutine_threadsafe(
self._lookup(input_), self.loop
)
future.add_done_callback(
lambda f: result_future.complete([f.result()])
)
async def _lookup(self, tx):
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT name, country FROM users WHERE id = $1',
tx['user_id']
)
return {**tx, 'name': row['name'], 'country': row['country']}
def close(self):
self.loop.run_until_complete(self.pool.close())
self.loop.close()
В asyncInvoke нельзя ждать результата (например, через .get() или await вне coroutine’ы). Это блокирует subtask thread и убивает смысл async. Всегда регистрируйте callback (thenAccept, add_done_callback), который вызовет resultFuture.complete().
Capacity: сколько «в полёте» одновременно
capacity — максимальное число async запросов, которые могут быть отправлены одним subtask’ом одновременно без получения ответа. Это back-pressure mechanism.
Когда capacity исчерпан, Flink блокирует subtask thread — новые входящие события ждут, пока какой-то из in-flight запросов завершится.
100 in-flight, capacity 100
Subtask получил 100 событий, отправил 100 запросов (заполнил capacity).101 event blocked
Подходит 101-е событие. Subtask блокирован — не может принять новое событие, пока хоть один запрос не завершится.Request done, slot freed
Один из запросов завершился — освободил слот в capacity. Subtask принимает следующее событие, отправляет запрос.Backpressure propagates
Backpressure распространяется upstream: source замедляется, события буферизуются в Kafka.Как выбирать capacity:
- Маленький (10-50): для медленных или нестабильных внешних систем — снижает нагрузку, защищает от overwhelming.
- Средний (100-500): обычный случай для здоровых external API.
- Большой (1000+): для очень быстрых, masively parallel систем (Redis cluster, DynamoDB).
Capacity также влияет на размер state, который Flink держит в OperatorState для recovery — каждый in-flight запрос сохраняется, чтобы при рестарте можно было повторить.
Timeout: что делать с зависшими запросами
timeout — максимальное время, через которое Flink считает запрос «зависшим» и вызывает timeout(input, resultFuture) метод вашей функции.
@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
// Вариант 1: эмитить событие без enrichment
resultFuture.complete(Collections.singletonList(
new Enriched(tx, UserProfile.UNKNOWN)
));
// Вариант 2: отбросить событие
// resultFuture.complete(Collections.emptyList());
// Вариант 3: бросить exception -> job упадёт, restart from checkpoint
// throw new RuntimeException("Lookup failed");
}
По умолчанию timeout бросает TimeoutException, которое роняет job. В production это редко нужно — лучше эмитить с fallback (например, UNKNOWN_PROFILE) или отправлять в side output для DLQ.
Ordered vs unordered: порядок выхода
Async I/O предлагает два варианта вывода результатов:
orderedWait: сохранение порядка
AsyncDataStream.orderedWait(input, func, 5, TimeUnit.SECONDS, 100);
Flink буферизует завершённые запросы, чтобы выдать их в том же порядке, в каком пришли события. Если событие #5 завершилось раньше, чем #4 — Flink ждёт #4 перед эмитом.
Минусы: медленнее, требует буфера. Один медленный запрос задерживает все последующие.
unorderedWait: порядок не важен
AsyncDataStream.unorderedWait(input, func, 5, TimeUnit.SECONDS, 100);
События эмитятся в downstream в порядке завершения async запросов. Если #5 завершился раньше #4 — #5 выходит сначала.
Плюсы: максимальный throughput, low latency. Минусы: downstream должен быть толерантен к out-of-order событиям.
Когда какой выбирать
| Сценарий | Выбор |
|---|---|
| Простое обогащение, downstream stateless | unorderedWait |
| Sink in Kafka с key-ordering требованием | orderedWait (но лучше keyBy перед sink) |
| Event time processing | unorderedWait (watermarks всё равно гарантируют корректность окон) |
| ProcessFunction с timer, зависящим от порядка | orderedWait |
| Maximum throughput | unorderedWait всегда быстрее |
В подавляющем большинстве случаев — unorderedWait. Flink использует watermarks для гарантии корректности event-time processing независимо от порядка.
Watermarks и async
Async I/O корректно обрабатывает watermarks. Между event’ами и watermark’ами Flink гарантирует:
- Watermark не обгонит ни один из in-flight событий с timestamp меньше него.
- Watermark распространяется вниз только после того, как все события с меньшим timestamp эмиттены.
Это значит, что async I/O безопасно для event-time агрегаций. Можно ставить unorderedWait перед window aggregation — watermarks гарантируют корректность.
Метрики async I/O
Flink экспортирует ключевые метрики для AsyncFunction:
numAsyncRequestsInFlight— текущее число in-flight запросов (если близко к capacity -> backpressure).numAsyncRequestsCompleted— total completed.numAsyncTimeouts— total timeouts (если растёт — увеличьте timeout или fix downstream).
Мониторьте inflight/capacity ratio в Grafana — высокий ratio (>80%) сигнализирует, что вы упёрлись в external system.
Попробуй сам
- Возьми job из урока 1 (Redis enrichment).
- Запусти с
capacity=10, timeout=1s. Замерь throughput. - Увеличь до
capacity=200, timeout=1s. Сравни throughput — должен вырасти. - Поставь Redis в degraded режим:
tc qdisc add dev lo root netem delay 100ms(добавляет 100 ms latency). Посмотри, как изменитсяnumAsyncRequestsInFlight(должен подскочить). - Переключи
unorderedWaitнаorderedWait— посмотри, как изменится latency.
Ключевые выводы
AsyncDataStream.unorderedWait/orderedWait— два варианта API. В 90% случаев нуженunorderedWait.RichAsyncFunction.asyncInvokeдолжен отправить запрос и не блокировать subtask thread. Регистрируйте callback, который вызоветresultFuture.complete().capacity— max in-flight запросов на subtask. Backpressure механизм при превышении.timeout— время до вызоваtimeout()метода. По умолчанию роняет job; в production лучше эмитить fallback.orderedблокирует выход, ожидая запросы в порядке;unorderedэмитит как завершилось — быстрее и предпочтительнее.- Watermarks корректно обрабатываются в async — безопасно перед window aggregation.