AsyncRetryStrategy: retry с backoff и DLQ
В production async lookups регулярно завершаются ошибкой: external system перегружен (HTTP 503), сетевой джиттер, временные degraded periods Redis cluster. Базовая стратегия из урока 2 — упасть в timeout() или эмитить fallback — недостаточна для transient ошибок. Часто проблема решается простым повтором через 100-500 ms.
Flink 1.16+ предоставляет встроенный AsyncRetryStrategy для декларативного retry’я с backoff’ом. В этом уроке разберём, как его настроить, когда использовать fixedDelay vs exponentialBackoff, и где граница между «нужен retry» и «пора в DLQ».
API: AsyncDataStream.unorderedWaitWithRetry
В Flink 1.16+ появился unorderedWaitWithRetry и orderedWaitWithRetry:
AsyncRetryStrategy<EnrichedEvent> retryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<EnrichedEvent>(
3, // максимум 3 попытки
500 // 500 ms между попытками
)
.ifResult(result -> result.isEmpty()) // retry если результат пуст
.ifException(ex -> ex instanceof TimeoutException) // retry на timeout
.build();
DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWaitWithRetry(
inputStream,
new MyAsyncFunction(),
5,
TimeUnit.SECONDS,
100,
retryStrategy
);
Flink сам выполнит retry внутри (не нужно вручную в asyncInvoke). Если все попытки исчерпаны — вызывается timeout() метод вашей функции.
FixedDelay vs ExponentialBackoff
Две основные стратегии:
FixedDelay: одинаковая пауза между попытками
AsyncRetryStrategy strategy = new AsyncRetryStrategies
.FixedDelayRetryStrategyBuilder(3, 500) // 3 попытки, 500ms
.ifException(ex -> isRetryable(ex))
.build();
Timeline:
- Попытка 1: 0 ms
- Попытка 2: 500 ms (после первой ошибки)
- Попытка 3: 1000 ms (после второй ошибки)
- Итог: ошибка через ~1500 ms или результат
Когда: transient errors с предсказуемым recovery — Redis temporarily slow, network jitter.
ExponentialBackoff: пауза растёт
AsyncRetryStrategy strategy = new AsyncRetryStrategies
.ExponentialBackoffDelayRetryStrategyBuilder(
5, // max 5 попыток
100, // initial delay 100 ms
5000, // max delay 5000 ms
2.0 // multiplier — пауза удваивается
)
.ifException(ex -> isRetryable(ex))
.build();
Timeline:
- Попытка 1: 0 ms
- Попытка 2: 100 ms
- Попытка 3: 200 ms
- Попытка 4: 400 ms
- Попытка 5: 800 ms
- Итог: ошибка через ~1500 ms или результат
Когда: external system overloaded — не хотим бомбить, даём время восстановиться. Стандартная практика для микросервисов.
Exponential backoff с jitter — золотой стандарт в распределённых системах. Flink не добавляет jitter автоматически — если у вас 1000 subtask’ов одновременно делают retry, они синхронизируются и создадут «громовое стадо». Добавляйте jitter вручную в asyncInvoke (например, Thread.sleep(random.nextInt(50)) перед запросом).
Что считать retry-кейсом
Не все ошибки заслуживают retry. Различайте:
| Ошибка | Retry? | Действие |
|---|---|---|
TimeoutException | Да | Скорее всего transient |
HTTP 503 Service Unavailable | Да | External overloaded |
HTTP 429 Too Many Requests | Да (с backoff) | Rate limited |
HTTP 504 Gateway Timeout | Да | Upstream timeout |
HTTP 500 Internal Server Error | Иногда | Может быть transient, может быть bug |
HTTP 401 Unauthorized | Нет | Token issue — не починится |
HTTP 404 Not Found | Нет | Не существует — DLQ или fallback |
HTTP 400 Bad Request | Нет | Malformed — DLQ |
ClassCastException | Нет | Bug в коде |
Network timeout | Да | Transient |
Connection refused | Иногда | External down или firewall |
Стандартный фильтр:
private boolean isRetryable(Throwable ex) {
return ex instanceof TimeoutException
|| ex instanceof IOException // network errors
|| (ex instanceof HttpException
&& ((HttpException) ex).getStatusCode() >= 500)
|| (ex instanceof HttpException
&& ((HttpException) ex).getStatusCode() == 429);
}
Когда retry не работает: DLQ
Если все retry исчерпаны, событие попадает в timeout() метод. Здесь у вас три варианта:
1. Эмитить fallback значение
@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
resultFuture.complete(Collections.singletonList(
new Enriched(tx, UserProfile.UNKNOWN)
));
}
Хорошо, когда отсутствие enrichment не критично (downstream может обработать default).
2. Side output -> DLQ
private final OutputTag<Transaction> DLQ_TAG =
new OutputTag<Transaction>("dlq") {};
// В timeout не можем эмитить в side output напрямую — нужен иной паттерн.
// Решение: эмитить специальный wrapper и фильтровать downstream.
@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
// Эмитим failed marker
resultFuture.complete(Collections.singletonList(
new Enriched(tx, UserProfile.FAILED, /* isFailed= */ true)
));
}
// Downstream
SingleOutputStreamOperator<Enriched> processed = enriched.process(
new ProcessFunction<Enriched, Enriched>() {
@Override
public void processElement(Enriched value, Context ctx, Collector<Enriched> out) {
if (value.isFailed()) {
ctx.output(DLQ_TAG, value.getTransaction());
} else {
out.collect(value);
}
}
}
);
processed.getSideOutput(DLQ_TAG).sinkTo(kafkaSink("dlq-transactions"));
DLQ позволяет асинхронно расследовать failed events: команда поддержки видит, что не обработалось, может сделать manual retry или fix.
3. Бросить exception -> job restart
@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture)
throws Exception {
throw new RuntimeException("All retries exhausted for " + tx.getId());
}
Job упадёт и Flink сам сделает restart from checkpoint. Подходит для случаев, когда лучше остановить, чем потерять данные. Но обычно это последнее средство — restart дорогой.
Архитектура с retry и DLQ
Transaction in
Транзакция приходит в async I/O.Attempt 1
Первая попытка lookup в Redis.Error 503
Ошибка retryable. Flink ждёт backoff и повторяет.Attempt 2
Вторая попытка.Error 503
Опять ошибка. Удваиваем backoff.Attempt 3
Третья попытка.All exhausted
Все попытки исчерпаны. Вызывается timeout() метод.DLQ side output
Event направляется в DLQ через side output. Команда видит в Kafka topic dlq-transactions.Idempotency: важно для retry
Retry безопасен только для идемпотентных операций. Lookup в Redis (GET) — идемпотентен: повторение даст тот же результат. Запись в БД (INSERT) — нет: повтор создаст дубликат.
Для async sink с retry:
- INSERT INTO orders -> UPSERT (
ON CONFLICT DO UPDATE). - POST /api/payments -> используйте
Idempotency-Keyheader (стандарт Stripe, Square). - Send email -> не делайте retry email-send’а вообще, используйте message queue.
Async I/O в Flink хорошо подходит для lookup операций (read), а не для write. Если делаете write через async I/O — обязательно идемпотентность через UPSERT, idempotency keys или дедупликация на стороне target system.
Local cache + retry: лучше backoff
Часто более эффективно, чем retry — снизить нагрузку на external system через cache. Если 70% запросов попадают в одни и те же ключи, добавьте in-memory cache (Caffeine для Java, lru_cache для Python) с TTL.
private transient Cache<String, UserProfile> cache;
@Override
public void open(Configuration parameters) {
cache = Caffeine.newBuilder()
.maximumSize(100_000)
.expireAfterWrite(Duration.ofMinutes(10))
.build();
// ... init Redis client
}
@Override
public void asyncInvoke(Transaction tx, ResultFuture<Enriched> resultFuture) {
UserProfile cached = cache.getIfPresent(tx.getUserId());
if (cached != null) {
resultFuture.complete(Collections.singletonList(
new Enriched(tx, cached)
));
return;
}
async.get("user:" + tx.getUserId()).thenAccept(json -> {
UserProfile profile = parseJson(json);
cache.put(tx.getUserId(), profile);
resultFuture.complete(Collections.singletonList(
new Enriched(tx, profile)
));
});
}
Cache + retry = резильентность к временным сбоям без бомбардировки external system.
Transactional outbox паттерн: надёжная запись без дублейПопробуй сам
- Возьми async I/O job из урока 2 (Redis lookup).
- Добавь
AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder(5, 100, 5000, 2.0). - Симулируй ошибки Redis:
docker pause redis(Redis встанет на 5 секунд). - Посмотри в логах, как Flink делает 5 попыток с backoff 100ms, 200ms, 400ms, 800ms, 1600ms.
- Затем удалите retry, поставь fallback (
UNKNOWN_PROFILE) вtimeout. Сравни: что лучше для downstream — задержки, но enriched, или мгновенный fallback?
Ключевые выводы
AsyncRetryStrategy— встроенный механизм retry в Flink 1.16+.unorderedWaitWithRetryпринимает стратегию.FixedDelay— одинаковая пауза.ExponentialBackoff— пауза удваивается. Стандартный выбор — exponential для overload-сценариев.- Что retry’ить: TimeoutException, 5xx, 429, network errors. Не retry’ить: 4xx (кроме 429), bugs.
- Если retry не помог: эмитить fallback, side output -> DLQ, или поднимать exception (последнее средство).
- Idempotency — retry безопасен только для идемпотентных операций (lookup). Для write — UPSERT или idempotency-key.
- Cache перед retry: снижает нагрузку на external system на 80%+, часто эффективнее retry.