Зачем нужны retry
В мире распределённых систем сбои — норма. Сетевой запрос в API может упасть из-за ребалансировки контейнера. Запрос в DWH может упасть из-за того, что warehouse сейчас перезагружается после деплоя. Чтение из S3 может вернуть 503 при кратковременной перегрузке региона. Большинство этих сбоев — транзиентные: попробуй через минуту, всё заработает.
Без retry pipeline становится хрупким: одна моргнувшая сеть — failed DAG, ночные звонки, ручные перезапуски. С retry — система самовосстанавливается. Это базовая практика resiliency.
Но retry — не магия. Бессмысленный retry в цикле без задержек может усугубить проблему (DDoS-эффект на падающий сервис), привести к каскадным сбоям, или вообще не помочь, если ошибка — не транзиентная (например, баг в SQL или невалидные данные).
Когда retry оправдан, а когда нет
Retry помогает только при транзиентных сбоях; для остальных нужны другие механики
Network blip, 503, timeout
RETRY с backoffTransient: сеть моргнула, временный 503, timeout, lock contention в БД, rate limit. Помогает retry с задержкой
SQL error, schema mismatch
FAIL FAST, fix codeLogical: баг в коде, неправильный SQL, schema mismatch, division by zero. Retry бесполезен — следующий запуск даст ту же ошибку
Bad data, null, invalid
QUARANTINE в DLQData quality: невалидные данные, null в required-поле, нарушение constraint. Не падай и не повторяй: квиз/quarantine невалидное
404, 403, auth expired
ALERT, manual fixPermanent: 404, удалили ресурс, нет прав, истёк токен. Retry не поможет — обнови права или удали задачу
Правило: retry — только для транзиентных сбоев. Distinguish через HTTP-код, тип exception, или явный whitelist:
TRANSIENT_HTTP_CODES = {408, 429, 500, 502, 503, 504}
def should_retry(exception):
if isinstance(exception, ConnectionError):
return True
if isinstance(exception, requests.HTTPError):
return exception.response.status_code in TRANSIENT_HTTP_CODES
return False
На 4xx ошибки (кроме 408/429) retry смысла не имеет — это баг или невалидный запрос.
Стратегии retry
Linear backoff
Самая простая стратегия: повторять с фиксированной задержкой.
attempt 1 -> wait 5s -> attempt 2 -> wait 5s -> attempt 3 -> wait 5s -> fail
Плюсы: предсказуемо. Минусы: если сервис восстанавливается медленно (минуты), 5-секундный интервал бесполезен. Если же сервис перегружен — N клиентов одновременно бьют по нему каждые 5 секунд, ускоряя коллапс. Linear backoff редко идеален.
Exponential backoff
Задержка растёт экспоненциально: 1s, 2s, 4s, 8s, 16s, 32s…
import time
def retry_exponential(func, max_attempts=5, base=1.0):
for attempt in range(max_attempts):
try:
return func()
except TransientError:
if attempt == max_attempts - 1:
raise
delay = base * (2 ** attempt)
time.sleep(delay)
Преимущество: даёт сервису время восстановиться. Если сервис лежит из-за перегрузки, экспоненциальное “отступление” клиентов помогает разгрузить его. Это стандарт в AWS SDK (boto3), Google Cloud, Stripe SDK — везде, где client SDK ходит в сетевой API.
Exponential backoff + jitter
Проблема “thundering herd”: если 1000 клиентов одновременно ушли в exponential backoff после общего сбоя, они все вернутся одновременно (через 1с, 2с, 4с). Чтобы избежать пиков, добавляют jitter — случайную составляющую:
import random
def retry_exponential_jitter(func, max_attempts=5, base=1.0):
for attempt in range(max_attempts):
try:
return func()
except TransientError:
if attempt == max_attempts - 1:
raise
delay = base * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
С jitter возвраты клиентов размазываются по времени, нагрузка распределяется равномерно. Это де-факто стандарт. AWS-документ “Exponential Backoff And Jitter” — must-read для DE.
Linear vs exponential vs exponential+jitter — поведение во времени
Retry в Airflow
Airflow — самый популярный оркестратор в DE, в нём retry настраивается на уровне Task:
from airflow.decorators import task
from datetime import timedelta
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
def load_orders():
...
retries=3— три попытки после первого fail (всего 4 запуска).retry_delay— базовая задержка.retry_exponential_backoff=True— задержки растут экспоненциально.max_retry_delay— ограничивает максимальную паузу.
Без retry_exponential_backoff Airflow ждёт ровно retry_delay между попытками. С ним — retry_delay * 2^(attempt-1).
Production-default в Airflow: retries=2, retry_delay=5min, retry_exponential_backoff=True. Для critical-задач можно повысить до 4-5 retries с max_retry_delay=1h. Если задача всё равно фейлится после 4 попыток за час — это не транзиентный сбой, нужно человеческое вмешательство.
Dead Letter Queue (DLQ)
Что делать, если retry исчерпан, или ошибка — не транзиентная? Если задача обрабатывает поток событий, нельзя ронять весь поток из-за одного “плохого” события. Решение — Dead Letter Queue: складывать проблемные записи в отдельную “карантинную” очередь/таблицу, идти дальше.
def process_event(event):
try:
validate(event)
load_to_dwh(event)
except ValidationError as e:
# Не retry — данные битые. Отправляем в DLQ
dlq.send({
"event": event,
"error": str(e),
"timestamp": datetime.utcnow().isoformat(),
"source_topic": "orders"
})
except TransientError:
raise # пусть retry обработает
DLQ — это очередь или таблица, куда складываются (плохое сообщение, причина ошибки, метаданные). По ней потом аналитик/инженер разбирается: чинит данные в источнике, обновляет схему, удаляет невалидное. Без DLQ либо ты теряешь данные молча, либо весь поток встаёт.
Native DLQ есть в Kafka (через consumer error handler), AWS SQS (RedrivePolicy с maxReceiveCount), RabbitMQ, Google Pub/Sub. В Airflow паттерн ручной: пишешь “ошибочные” строки в отдельную таблицу events_failed.
Circuit Breaker
Если внешний сервис лежит, нет смысла делать retry в течение часа — все 100 попыток упадут, ты только нагружаешь упавший сервис. Circuit breaker — паттерн, в котором клиент “отключает” вызовы к сервису после N подряд ошибок и пробует снова только через таймаут.
Три состояния:
- Closed — нормальная работа, все запросы идут.
- Open — превысили threshold ошибок, запросы блокируются без вызова сервиса.
- Half-open — после таймаута пробуем один запрос. Успех -> Closed, ошибка -> Open.
В Python — библиотека pybreaker, в Java — Resilience4j, в Go — sony/gobreaker. На уровне инфраструктуры — sidecar Envoy/Istio.
В DE-пайплайнах circuit breaker применяется реже, чем в micro-services, потому что pipeline обычно выполняется по расписанию, а не в realtime. Но если задача дергает внешний сервис (API партнёра) в цикле — breaker может предотвратить долгое выполнение задачи на сломанном сервисе.
Когда retry бесполезен и опасен
- Side effects без идемпотентности. Retry платежа без
Idempotency-Key= двойное списание. Сначала идемпотентность, потом retry. - Long-running tasks без чекпоинтов. Retry 8-часового Spark-job = ещё 8 часов с нуля. Решение — incremental progress + checkpoint.
- Cascading failures. Если upstream-сервис сломан, ваш retry удержит его в плохом состоянии. Circuit breaker лучше.
- Bad data в источнике. Retry парсинга невалидного JSON даст ту же ошибку. Нужен DLQ, а не retry.
Реальный production-config
Типичная стратегия для задачи “загрузка из API в DWH”:
retry_policy:
max_attempts: 4
base_delay: 30s
max_delay: 30min
strategy: exponential_with_jitter
retryable_errors:
- ConnectionError
- TimeoutError
- HTTPError: [408, 429, 500, 502, 503, 504]
dead_letter:
table: events_failed
retention: 30d
alerting: slack#data-alerts
circuit_breaker:
threshold: 10 errors / 60s
timeout: 5min
Это разумная база. Точная настройка зависит от SLA (если SLA 1 час, не можешь backoff 30 минут), от частоты сбоев upstream, от стоимости false alerts.
Не ставь retries=10 “на всякий случай” — это анти-паттерн. После 4-5 попыток нужно либо чинить root cause, либо тушить пожар руками. Чем больше retry, тем дольше скрывается реальная проблема.
Попробуй сам
- Найди в open-source DAG (например, в репо
airflow-providers) задачу сretries=N. Посмотри, есть лиretry_exponential_backoff. Если нет — подумай, какие сбои не покрываются. - Напиши Python-функцию
retry_with_jitter(func, max_attempts, base). Протестируй: вызов с моком, который кидаетConnectionError3 раза, затем возвращает успех. Лог задержек. - Найди в документации AWS SQS, что такое
RedrivePolicyиmaxReceiveCount. Это нативный DLQ.