Learning Platform
Глоссарий Troubleshooting
Урок 18.01 · 25 мин
Начальный
idempotencymergeupsertdeduplicationretries

Зачем вообще нужна идемпотентность

Представь: ночной pipeline загружает заказы из CRM в DWH. В 03:14 коннект к CRM падает на середине задачи. Через пять минут Airflow делает retry. Через два часа аналитик открывает дашборд и видит: продажи выросли на 47%. Менеджмент пишет восторженные сообщения в Slack. К утру обнаруживается, что 47% — это не рост, а дубли из двойной загрузки.

Это классическая история — и она почти всегда про отсутствие идемпотентности. Идемпотентность — свойство операции, при котором повтор не меняет результат. Один раз выполнил, два раза, десять раз — состояние данных одинаковое. В мире распределённых систем, где сбои случаются постоянно (сеть моргнула, под убили, диск переполнился), идемпотентность — не “nice to have”, а базовое требование.

В обычной разработке часто можно отделаться “тестами и логами”. В DE нельзя: данные имеют память. Если ты в 03:14 вставил 50 000 дублей, они там и останутся, пока ты их не уберёшь руками — а это значит ночные звонки, грязный rollback, потеря доверия бизнеса.

Что значит “идемпотентно”

Формально: функция f идемпотентна, если f(f(x)) = f(x). В DE-практике это значит: выполнение задачи дважды (или десять раз) с одним и тем же входом приводит к одному и тому же конечному состоянию хранилища.

Идемпотентность: ожидаемое vs реальное поведение

Без идемпотентности retry удваивает данные, с ней — финальное состояние одинаково

Первый запуск: задача отработала корректно, в таблице 1000 строк
Падение на 50%: 500 строк committed, остальные — нет. В типичном INSERT это означает, что половина данных в DWH
Без идемпотентности retry добавляет все 1000 строк заново — финал 1500 строк, из них 500 дубликатов
То же первое выполнение
Та же половина committed, та же ошибка
С MERGE/UPSERT retry перепишет существующие строки и добавит недостающие — финал ровно 1000 строк, как и должно быть

Важно отличать идемпотентность от корректности. Идемпотентная задача с багом даст консистентно неправильный результат — повторы не сломают данные, но и не исправят логику. Идемпотентность защищает от retries, корректность — от багов. Нужны оба.

Источники неидемпотентности в DE

Большинство багов с дублями приходят из трёх типов операций:

  1. Чистый INSERT без проверки на дубли. Самый частый антипаттерн. Любой retry — это +N строк.
  2. APPEND в файлы или партиции без перезаписи. Положить файл part-001.parquet в папку, при retry — part-001-retry.parquet рядом.
  3. Side effects во внешние системы: отправить email, дернуть webhook, списать деньги. Здесь идемпотентность нужна особенно строгая (через ключи идемпотентности).

Как достичь идемпотентности

SQL MERGE и INSERT ON CONFLICT: синтаксис upsert-операций для идемпотентных загрузок

Паттерн 1: MERGE / UPSERT в SQL

Замена INSERT на операцию, которая знает, что делать при конфликте по ключу. Стандарт SQL — MERGE:

MERGE INTO orders AS tgt
USING staging_orders AS src
  ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET
  amount = src.amount,
  status = src.status,
  updated_at = src.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, amount, status, created_at)
  VALUES (src.order_id, src.amount, src.status, src.created_at);

MERGE поддерживается в Snowflake, BigQuery, Postgres 15+, SQL Server, Oracle. В Postgres до 15 используется идиома INSERT ... ON CONFLICT:

INSERT INTO orders (order_id, amount, status, created_at)
SELECT order_id, amount, status, created_at FROM staging_orders
ON CONFLICT (order_id) DO UPDATE SET
  amount = EXCLUDED.amount,
  status = EXCLUDED.status,
  updated_at = NOW();

Ключевое условие: на таргет-таблице должен быть unique constraint или primary key на колонке, по которой делается конфликт. Без него ON CONFLICT не сработает.

Паттерн 2: DELETE + INSERT в одной транзакции

Когда грейн данных — это партиция (дата, регион, тенант), часто проще удалить старое и вставить заново:

BEGIN;
  DELETE FROM orders WHERE event_date = '2026-05-17';
  INSERT INTO orders
  SELECT * FROM staging_orders WHERE event_date = '2026-05-17';
COMMIT;

Главное — всё в одной транзакции. Если упадёт между DELETE и INSERT, то commit не пройдёт, старые данные не пропадут. Подход проще MERGE, работает в любой SQL-БД, идеален для batch-loads “по дате”.

Паттерн 3: Deterministic file paths

Для файловых пайплайнов (Spark, dump в S3) — пиши в путь, который детерминируется входом:

output_path = f"s3://lake/orders/dt={ds}/data.parquet"
df.write.mode("overwrite").parquet(output_path)

Здесь ds — execution date из Airflow (например, 2026-05-17). При retry путь тот же, mode("overwrite") перепишет данные, орфан-файлов не остаётся. Антипаттерн — использовать datetime.now(): каждый retry создаст новый файл рядом, и через месяц в lake накопится мусор.

WARNING

Никогда не используй datetime.now() или случайные UUID в путях файлов или ID записей в data pipeline. Это убивает идемпотентность. Используй execution_date / ds / data_interval_start из контекста оркестратора.

Паттерн 4: Deduplication на чтении

Если источник сам присылает дубли (например, Kafka с at-least-once семантикой), дедуп — на стороне трансформации:

WITH ranked AS (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY ingested_at DESC) AS rn
  FROM raw_events
)
SELECT * FROM ranked WHERE rn = 1;

ROW_NUMBER + PARTITION BY по бизнес-ключу — самый частый идиом дедупа. Берём свежую запись, отбрасываем дубликаты.

Паттерн 5: Idempotency key в side effects

Когда нужно вызвать внешний API (Stripe charge, send email, webhook), сервис должен принять Idempotency-Key. Тот же ключ — тот же результат:

import requests

response = requests.post(
    "https://api.stripe.com/v1/charges",
    headers={"Idempotency-Key": f"order-{order_id}-charge-v1"},
    data={"amount": 5000, "currency": "usd", "customer": customer_id}
)

Stripe гарантирует: при втором запросе с тем же ключом не будет второго списания, вернётся тот же результат. Похожий паттерн есть в AWS SQS (MessageDeduplicationId), Twilio, Sendgrid.

Карта паттернов идемпотентности

По типу операции — свой паттерн

SQL load
MERGE / UPSERTMERGE/UPSERT в DWH — стандарт SQL для update-or-insert. Требует unique key. Snowflake, BigQuery, Postgres 15+ — нативная поддержка
Partition reload
DELETE + INSERTDELETE+INSERT в транзакции — для batch-load по партиции. Грейн совпадает с партицией, типично date или date+tenant
File write
Deterministic path + overwriteDetereministic path с overwrite — для файловых сиситем. Spark, Pandas, любой объектный сторэдж. Ключ — execution_date в пути
External API
Idempotency-Key headerIdempotency key — для side effects во внешние сервисы. Stripe, payment, email, webhook

Полный пример: идемпотентный Airflow task

Соберём всё вместе. Задача: каждый день грузить заказы из API в Postgres-DWH, без дублей при retry.

from airflow.decorators import task
from datetime import datetime
import requests
import psycopg2

@task
def load_orders(ds: str):
    # 1. Тащим из API за конкретную дату — detеrministic input
    response = requests.get(
        f"https://api.partner.com/orders?date={ds}",
        headers={"Authorization": "Bearer ..."}
    )
    orders = response.json()

    conn = psycopg2.connect(...)
    cur = conn.cursor()

    # 2. Грузим в staging — overwrite по партиции
    cur.execute("DELETE FROM stg_orders WHERE event_date = %s", (ds,))
    cur.executemany(
        "INSERT INTO stg_orders VALUES (%(order_id)s, %(amount)s, %(event_date)s)",
        orders
    )

    # 3. MERGE из staging в orders — идемпотентно по order_id
    cur.execute("""
        INSERT INTO orders (order_id, amount, event_date)
        SELECT order_id, amount, event_date FROM stg_orders
        WHERE event_date = %s
        ON CONFLICT (order_id) DO UPDATE SET
            amount = EXCLUDED.amount,
            updated_at = NOW()
    """, (ds,))

    conn.commit()

Эта задача — идемпотентна. Запусти её десять раз с ds='2026-05-17', в orders будут те же строки. Падение на любом шаге не приведёт к дублям: staging чистится перед загрузкой, MERGE по order_id исключает дублирование в финальной таблице.

TIP

Идемпотентность стоит проверять автоматически. Простой паттерн: airflow tasks test -> запустить дважды -> assert count неизменен. Если упало — значит, где-то под капотом INSERT без conflict handling. Это лучший unit test для DE.

Чек-лист для каждой задачи

Перед коммитом в продакшен спроси себя про каждый task:

  1. Что произойдёт, если эта задача упадёт на 50%?
  2. Что произойдёт, если её перезапустить три раза подряд?
  3. Что произойдёт при backfill за прошлую дату?
  4. Где здесь side effects во внешние системы? Есть ли idempotency keys?
  5. Детерминированы ли пути файлов и ID записей?

Если на любой вопрос ответ “не знаю” или “будут дубли” — задача неидемпотентна, нужно править.

Попробуй сам

  1. Открой свой текущий pipeline (или любой open-source DAG на GitHub). Найди INSERT-запросы. Для каждого пройдись по чек-листу выше.
  2. Возьми Postgres, создай таблицу orders(order_id PK, amount, event_date). Напиши Python-скрипт, который вставляет 100 строк, ломается на 50-й через raise Exception. Запусти дважды. Сравни количество строк — будет 150 (дубли). Перепиши на ON CONFLICT DO UPDATE — запусти ещё раз, будет 100.
  3. Найди в документации Stripe (или любого payment-провайдера) раздел про Idempotency-Key. Подумай, какой ключ ты бы использовал для своего pipeline.

Идемпотентность + backfill

Airflow backfill: как запустить DAG за прошлые даты с гарантией идемпотентности

Идемпотентность критична именно для backfill — массового перезапуска задач за прошлые даты. Сценарий: вчера выяснилось, что в SQL был баг две недели подряд. Нужно прогнать pipeline за каждый день из этих двух недель — и получить тот же результат, который получился бы, если бы он отработал штатно. Если задача идемпотентна (MERGE по ключу или DELETE+INSERT по партиции), backfill — это просто airflow tasks backfill ... на N дат, безопасно даже при параллельном запуске. Без идемпотентности backfill = дубли и грязные данные.

Тот же принцип лежит в основе incremental processing (грузим только дельту по watermark): incremental-задача обязана быть идемпотентной по своему окну, иначе любой retry или повтор окна порождает дубли. Полный набор pipeline-паттернов (backfill стратегии, incremental processing patterns, recovery playbook) разбираем в будущем advanced-de-patterns deep-dive.

Проверка знанийKnowledge check
Pipeline пишет ежедневный CSV в s3://lake/orders/dt=2026-05-17/data.csv через путь, генерируемый как f"s3://lake/orders/{datetime.now()}/data.csv". Какие проблемы это создаёт для идемпотентности и retry? Как починить?
ОтветAnswer
Проблема: datetime.now() — недетерминированный. При каждом запуске и retry создаётся уникальный путь, поэтому: 1) старые orphan-файлы накапливаются и не перезаписываются — мусор в lake; 2) downstream-задачи, ожидающие фиксированный путь, его не находят; 3) backfill за прошлую дату пишет в "сегодняшний" путь, что неправильно. Починить: использовать execution_date / ds из Airflow контекста — f"s3://lake/orders/dt={ds}/data.parquet", и писать с overwrite-mode. Тогда retry перезапишет файл по тому же пути, backfill пишет в свой партиционный префикс, орфанов нет.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Pipeline делает INSERT INTO orders SELECT FROM staging без conflict handling. Первый запуск прошёл, retry после сбоя — тоже. В orders теперь 200% от ожидаемого. В чём первопричина и как починить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3