Learning Platform
Глоссарий Troubleshooting
Урок 10.03 · 16 мин
Средний
AsyncRetryStrategyExponential BackoffDead Letter QueueResilience

AsyncRetryStrategy: retry с backoff и DLQ

В production async lookups регулярно завершаются ошибкой: external system перегружен (HTTP 503), сетевой джиттер, временные degraded periods Redis cluster. Базовая стратегия из урока 2 — упасть в timeout() или эмитить fallback — недостаточна для transient ошибок. Часто проблема решается простым повтором через 100-500 ms.

Flink 1.16+ предоставляет встроенный AsyncRetryStrategy для декларативного retry’я с backoff’ом. В этом уроке разберём, как его настроить, когда использовать fixedDelay vs exponentialBackoff, и где граница между «нужен retry» и «пора в DLQ».


API: AsyncDataStream.unorderedWaitWithRetry

В Flink 1.16+ появился unorderedWaitWithRetry и orderedWaitWithRetry:

AsyncRetryStrategy<EnrichedEvent> retryStrategy =
    new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<EnrichedEvent>(
        3,         // максимум 3 попытки
        500        // 500 ms между попытками
    )
    .ifResult(result -> result.isEmpty())  // retry если результат пуст
    .ifException(ex -> ex instanceof TimeoutException)  // retry на timeout
    .build();

DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWaitWithRetry(
    inputStream,
    new MyAsyncFunction(),
    5,
    TimeUnit.SECONDS,
    100,
    retryStrategy
);

Flink сам выполнит retry внутри (не нужно вручную в asyncInvoke). Если все попытки исчерпаны — вызывается timeout() метод вашей функции.


FixedDelay vs ExponentialBackoff

Две основные стратегии:

FixedDelay: одинаковая пауза между попытками

AsyncRetryStrategy strategy = new AsyncRetryStrategies
    .FixedDelayRetryStrategyBuilder(3, 500)  // 3 попытки, 500ms
    .ifException(ex -> isRetryable(ex))
    .build();

Timeline:

  • Попытка 1: 0 ms
  • Попытка 2: 500 ms (после первой ошибки)
  • Попытка 3: 1000 ms (после второй ошибки)
  • Итог: ошибка через ~1500 ms или результат

Когда: transient errors с предсказуемым recovery — Redis temporarily slow, network jitter.

ExponentialBackoff: пауза растёт

AsyncRetryStrategy strategy = new AsyncRetryStrategies
    .ExponentialBackoffDelayRetryStrategyBuilder(
        5,        // max 5 попыток
        100,      // initial delay 100 ms
        5000,     // max delay 5000 ms
        2.0       // multiplier — пауза удваивается
    )
    .ifException(ex -> isRetryable(ex))
    .build();

Timeline:

  • Попытка 1: 0 ms
  • Попытка 2: 100 ms
  • Попытка 3: 200 ms
  • Попытка 4: 400 ms
  • Попытка 5: 800 ms
  • Итог: ошибка через ~1500 ms или результат

Когда: external system overloaded — не хотим бомбить, даём время восстановиться. Стандартная практика для микросервисов.

TIP

Exponential backoff с jitter — золотой стандарт в распределённых системах. Flink не добавляет jitter автоматически — если у вас 1000 subtask’ов одновременно делают retry, они синхронизируются и создадут «громовое стадо». Добавляйте jitter вручную в asyncInvoke (например, Thread.sleep(random.nextInt(50)) перед запросом).


Что считать retry-кейсом

Не все ошибки заслуживают retry. Различайте:

ОшибкаRetry?Действие
TimeoutExceptionДаСкорее всего transient
HTTP 503 Service UnavailableДаExternal overloaded
HTTP 429 Too Many RequestsДа (с backoff)Rate limited
HTTP 504 Gateway TimeoutДаUpstream timeout
HTTP 500 Internal Server ErrorИногдаМожет быть transient, может быть bug
HTTP 401 UnauthorizedНетToken issue — не починится
HTTP 404 Not FoundНетНе существует — DLQ или fallback
HTTP 400 Bad RequestНетMalformed — DLQ
ClassCastExceptionНетBug в коде
Network timeoutДаTransient
Connection refusedИногдаExternal down или firewall

Стандартный фильтр:

private boolean isRetryable(Throwable ex) {
    return ex instanceof TimeoutException
        || ex instanceof IOException  // network errors
        || (ex instanceof HttpException
            && ((HttpException) ex).getStatusCode() >= 500)
        || (ex instanceof HttpException
            && ((HttpException) ex).getStatusCode() == 429);
}

Когда retry не работает: DLQ

Если все retry исчерпаны, событие попадает в timeout() метод. Здесь у вас три варианта:

1. Эмитить fallback значение

@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
    resultFuture.complete(Collections.singletonList(
        new Enriched(tx, UserProfile.UNKNOWN)
    ));
}

Хорошо, когда отсутствие enrichment не критично (downstream может обработать default).

2. Side output -> DLQ

private final OutputTag<Transaction> DLQ_TAG =
    new OutputTag<Transaction>("dlq") {};

// В timeout не можем эмитить в side output напрямую — нужен иной паттерн.
// Решение: эмитить специальный wrapper и фильтровать downstream.

@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture) {
    // Эмитим failed marker
    resultFuture.complete(Collections.singletonList(
        new Enriched(tx, UserProfile.FAILED, /* isFailed= */ true)
    ));
}

// Downstream
SingleOutputStreamOperator<Enriched> processed = enriched.process(
    new ProcessFunction<Enriched, Enriched>() {
        @Override
        public void processElement(Enriched value, Context ctx, Collector<Enriched> out) {
            if (value.isFailed()) {
                ctx.output(DLQ_TAG, value.getTransaction());
            } else {
                out.collect(value);
            }
        }
    }
);

processed.getSideOutput(DLQ_TAG).sinkTo(kafkaSink("dlq-transactions"));

DLQ позволяет асинхронно расследовать failed events: команда поддержки видит, что не обработалось, может сделать manual retry или fix.

3. Бросить exception -> job restart

@Override
public void timeout(Transaction tx, ResultFuture<Enriched> resultFuture)
        throws Exception {
    throw new RuntimeException("All retries exhausted for " + tx.getId());
}

Job упадёт и Flink сам сделает restart from checkpoint. Подходит для случаев, когда лучше остановить, чем потерять данные. Но обычно это последнее средство — restart дорогой.


Архитектура с retry и DLQ

Retry chain с fallback в DLQ

Transaction in

Транзакция приходит в async I/O.

Attempt 1

Первая попытка lookup в Redis.

Error 503

Ошибка retryable. Flink ждёт backoff и повторяет.
wait 100ms

Attempt 2

Вторая попытка.

Error 503

Опять ошибка. Удваиваем backoff.
wait 200ms

Attempt 3

Третья попытка.

All exhausted

Все попытки исчерпаны. Вызывается timeout() метод.

DLQ side output

Event направляется в DLQ через side output. Команда видит в Kafka topic dlq-transactions.

Idempotency: важно для retry

Retry безопасен только для идемпотентных операций. Lookup в Redis (GET) — идемпотентен: повторение даст тот же результат. Запись в БД (INSERT) — нет: повтор создаст дубликат.

Для async sink с retry:

  • INSERT INTO orders -> UPSERT (ON CONFLICT DO UPDATE).
  • POST /api/payments -> используйте Idempotency-Key header (стандарт Stripe, Square).
  • Send email -> не делайте retry email-send’а вообще, используйте message queue.
WARNING

Async I/O в Flink хорошо подходит для lookup операций (read), а не для write. Если делаете write через async I/O — обязательно идемпотентность через UPSERT, idempotency keys или дедупликация на стороне target system.


Local cache + retry: лучше backoff

Часто более эффективно, чем retry — снизить нагрузку на external system через cache. Если 70% запросов попадают в одни и те же ключи, добавьте in-memory cache (Caffeine для Java, lru_cache для Python) с TTL.

private transient Cache<String, UserProfile> cache;

@Override
public void open(Configuration parameters) {
    cache = Caffeine.newBuilder()
        .maximumSize(100_000)
        .expireAfterWrite(Duration.ofMinutes(10))
        .build();
    // ... init Redis client
}

@Override
public void asyncInvoke(Transaction tx, ResultFuture<Enriched> resultFuture) {
    UserProfile cached = cache.getIfPresent(tx.getUserId());
    if (cached != null) {
        resultFuture.complete(Collections.singletonList(
            new Enriched(tx, cached)
        ));
        return;
    }

    async.get("user:" + tx.getUserId()).thenAccept(json -> {
        UserProfile profile = parseJson(json);
        cache.put(tx.getUserId(), profile);
        resultFuture.complete(Collections.singletonList(
            new Enriched(tx, profile)
        ));
    });
}

Cache + retry = резильентность к временным сбоям без бомбардировки external system.

Transactional outbox паттерн: надёжная запись без дублей

Попробуй сам

  1. Возьми async I/O job из урока 2 (Redis lookup).
  2. Добавь AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder(5, 100, 5000, 2.0).
  3. Симулируй ошибки Redis: docker pause redis (Redis встанет на 5 секунд).
  4. Посмотри в логах, как Flink делает 5 попыток с backoff 100ms, 200ms, 400ms, 800ms, 1600ms.
  5. Затем удалите retry, поставь fallback (UNKNOWN_PROFILE) в timeout. Сравни: что лучше для downstream — задержки, но enriched, или мгновенный fallback?

Ключевые выводы

  1. AsyncRetryStrategy — встроенный механизм retry в Flink 1.16+. unorderedWaitWithRetry принимает стратегию.
  2. FixedDelay — одинаковая пауза. ExponentialBackoff — пауза удваивается. Стандартный выбор — exponential для overload-сценариев.
  3. Что retry’ить: TimeoutException, 5xx, 429, network errors. Не retry’ить: 4xx (кроме 429), bugs.
  4. Если retry не помог: эмитить fallback, side output -> DLQ, или поднимать exception (последнее средство).
  5. Idempotency — retry безопасен только для идемпотентных операций (lookup). Для write — UPSERT или idempotency-key.
  6. Cache перед retry: снижает нагрузку на external system на 80%+, часто эффективнее retry.
Проверка знанийKnowledge check
Ваш Flink job делает async lookup в HTTP сервис, который иногда отвечает 503 Service Unavailable, иногда 404 Not Found. Команда хочет, чтобы 503 обрабатывались через retry, а 404 — отбрасывались. Как настроить AsyncRetryStrategy? Что произойдёт, если событие даст 503, потом снова 503, потом 200 OK?
ОтветAnswer
Настройка: ExponentialBackoffDelayRetryStrategyBuilder(maxAttempts=3, initialDelay=200, maxDelay=2000, multiplier=2.0) с предикатом ifException(ex -> ex instanceof HttpException && ((HttpException) ex).getStatusCode() == 503). 404 НЕ попадает в этот предикат — для него предусмотреть отдельную логику в asyncInvoke: при получении 404 не бросать exception, а вызвать resultFuture.complete(emptyList()) — событие пропадёт без retry. Альтернатива: side output -> DLQ, чтобы потом расследовать пропущенные. Для последовательности 503 -> 503 -> 200 OK: первая попытка вернула 503 -> retry через 200ms. Вторая попытка вернула 503 -> retry через 400ms. Третья попытка вернула 200 OK -> resultFuture.complete с результатом. Событие успешно обработано, total latency ~600ms (200+400 backoff + 3 request times).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В чём ключевое различие FixedDelayRetryStrategy и ExponentialBackoffDelayRetryStrategy, и когда какой использовать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3