Зачем вообще нужна идемпотентность
Представь: ночной pipeline загружает заказы из CRM в DWH. В 03:14 коннект к CRM падает на середине задачи. Через пять минут Airflow делает retry. Через два часа аналитик открывает дашборд и видит: продажи выросли на 47%. Менеджмент пишет восторженные сообщения в Slack. К утру обнаруживается, что 47% — это не рост, а дубли из двойной загрузки.
Это классическая история — и она почти всегда про отсутствие идемпотентности. Идемпотентность — свойство операции, при котором повтор не меняет результат. Один раз выполнил, два раза, десять раз — состояние данных одинаковое. В мире распределённых систем, где сбои случаются постоянно (сеть моргнула, под убили, диск переполнился), идемпотентность — не “nice to have”, а базовое требование.
В обычной разработке часто можно отделаться “тестами и логами”. В DE нельзя: данные имеют память. Если ты в 03:14 вставил 50 000 дублей, они там и останутся, пока ты их не уберёшь руками — а это значит ночные звонки, грязный rollback, потеря доверия бизнеса.
Что значит “идемпотентно”
Формально: функция f идемпотентна, если f(f(x)) = f(x). В DE-практике это значит: выполнение задачи дважды (или десять раз) с одним и тем же входом приводит к одному и тому же конечному состоянию хранилища.
Без идемпотентности retry удваивает данные, с ней — финальное состояние одинаково
Важно отличать идемпотентность от корректности. Идемпотентная задача с багом даст консистентно неправильный результат — повторы не сломают данные, но и не исправят логику. Идемпотентность защищает от retries, корректность — от багов. Нужны оба.
Источники неидемпотентности в DE
Большинство багов с дублями приходят из трёх типов операций:
- Чистый INSERT без проверки на дубли. Самый частый антипаттерн. Любой retry — это +N строк.
- APPEND в файлы или партиции без перезаписи. Положить файл
part-001.parquetв папку, при retry —part-001-retry.parquetрядом. - Side effects во внешние системы: отправить email, дернуть webhook, списать деньги. Здесь идемпотентность нужна особенно строгая (через ключи идемпотентности).
Как достичь идемпотентности
SQL MERGE и INSERT ON CONFLICT: синтаксис upsert-операций для идемпотентных загрузокПаттерн 1: MERGE / UPSERT в SQL
Замена INSERT на операцию, которая знает, что делать при конфликте по ключу. Стандарт SQL — MERGE:
MERGE INTO orders AS tgt
USING staging_orders AS src
ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET
amount = src.amount,
status = src.status,
updated_at = src.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, amount, status, created_at)
VALUES (src.order_id, src.amount, src.status, src.created_at);
MERGE поддерживается в Snowflake, BigQuery, Postgres 15+, SQL Server, Oracle. В Postgres до 15 используется идиома INSERT ... ON CONFLICT:
INSERT INTO orders (order_id, amount, status, created_at)
SELECT order_id, amount, status, created_at FROM staging_orders
ON CONFLICT (order_id) DO UPDATE SET
amount = EXCLUDED.amount,
status = EXCLUDED.status,
updated_at = NOW();
Ключевое условие: на таргет-таблице должен быть unique constraint или primary key на колонке, по которой делается конфликт. Без него ON CONFLICT не сработает.
Паттерн 2: DELETE + INSERT в одной транзакции
Когда грейн данных — это партиция (дата, регион, тенант), часто проще удалить старое и вставить заново:
BEGIN;
DELETE FROM orders WHERE event_date = '2026-05-17';
INSERT INTO orders
SELECT * FROM staging_orders WHERE event_date = '2026-05-17';
COMMIT;
Главное — всё в одной транзакции. Если упадёт между DELETE и INSERT, то commit не пройдёт, старые данные не пропадут. Подход проще MERGE, работает в любой SQL-БД, идеален для batch-loads “по дате”.
Паттерн 3: Deterministic file paths
Для файловых пайплайнов (Spark, dump в S3) — пиши в путь, который детерминируется входом:
output_path = f"s3://lake/orders/dt={ds}/data.parquet"
df.write.mode("overwrite").parquet(output_path)
Здесь ds — execution date из Airflow (например, 2026-05-17). При retry путь тот же, mode("overwrite") перепишет данные, орфан-файлов не остаётся. Антипаттерн — использовать datetime.now(): каждый retry создаст новый файл рядом, и через месяц в lake накопится мусор.
Никогда не используй datetime.now() или случайные UUID в путях файлов или ID записей в data pipeline. Это убивает идемпотентность. Используй execution_date / ds / data_interval_start из контекста оркестратора.
Паттерн 4: Deduplication на чтении
Если источник сам присылает дубли (например, Kafka с at-least-once семантикой), дедуп — на стороне трансформации:
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY ingested_at DESC) AS rn
FROM raw_events
)
SELECT * FROM ranked WHERE rn = 1;
ROW_NUMBER + PARTITION BY по бизнес-ключу — самый частый идиом дедупа. Берём свежую запись, отбрасываем дубликаты.
Паттерн 5: Idempotency key в side effects
Когда нужно вызвать внешний API (Stripe charge, send email, webhook), сервис должен принять Idempotency-Key. Тот же ключ — тот же результат:
import requests
response = requests.post(
"https://api.stripe.com/v1/charges",
headers={"Idempotency-Key": f"order-{order_id}-charge-v1"},
data={"amount": 5000, "currency": "usd", "customer": customer_id}
)
Stripe гарантирует: при втором запросе с тем же ключом не будет второго списания, вернётся тот же результат. Похожий паттерн есть в AWS SQS (MessageDeduplicationId), Twilio, Sendgrid.
По типу операции — свой паттерн
MERGE / UPSERTMERGE/UPSERT в DWH — стандарт SQL для update-or-insert. Требует unique key. Snowflake, BigQuery, Postgres 15+ — нативная поддержка
DELETE + INSERTDELETE+INSERT в транзакции — для batch-load по партиции. Грейн совпадает с партицией, типично date или date+tenant
Deterministic path + overwriteDetereministic path с overwrite — для файловых сиситем. Spark, Pandas, любой объектный сторэдж. Ключ — execution_date в пути
Idempotency-Key headerIdempotency key — для side effects во внешние сервисы. Stripe, payment, email, webhook
Полный пример: идемпотентный Airflow task
Соберём всё вместе. Задача: каждый день грузить заказы из API в Postgres-DWH, без дублей при retry.
from airflow.decorators import task
from datetime import datetime
import requests
import psycopg2
@task
def load_orders(ds: str):
# 1. Тащим из API за конкретную дату — detеrministic input
response = requests.get(
f"https://api.partner.com/orders?date={ds}",
headers={"Authorization": "Bearer ..."}
)
orders = response.json()
conn = psycopg2.connect(...)
cur = conn.cursor()
# 2. Грузим в staging — overwrite по партиции
cur.execute("DELETE FROM stg_orders WHERE event_date = %s", (ds,))
cur.executemany(
"INSERT INTO stg_orders VALUES (%(order_id)s, %(amount)s, %(event_date)s)",
orders
)
# 3. MERGE из staging в orders — идемпотентно по order_id
cur.execute("""
INSERT INTO orders (order_id, amount, event_date)
SELECT order_id, amount, event_date FROM stg_orders
WHERE event_date = %s
ON CONFLICT (order_id) DO UPDATE SET
amount = EXCLUDED.amount,
updated_at = NOW()
""", (ds,))
conn.commit()
Эта задача — идемпотентна. Запусти её десять раз с ds='2026-05-17', в orders будут те же строки. Падение на любом шаге не приведёт к дублям: staging чистится перед загрузкой, MERGE по order_id исключает дублирование в финальной таблице.
Идемпотентность стоит проверять автоматически. Простой паттерн: airflow tasks test -> запустить дважды -> assert count неизменен. Если упало — значит, где-то под капотом INSERT без conflict handling. Это лучший unit test для DE.
Чек-лист для каждой задачи
Перед коммитом в продакшен спроси себя про каждый task:
- Что произойдёт, если эта задача упадёт на 50%?
- Что произойдёт, если её перезапустить три раза подряд?
- Что произойдёт при backfill за прошлую дату?
- Где здесь side effects во внешние системы? Есть ли idempotency keys?
- Детерминированы ли пути файлов и ID записей?
Если на любой вопрос ответ “не знаю” или “будут дубли” — задача неидемпотентна, нужно править.
Попробуй сам
- Открой свой текущий pipeline (или любой open-source DAG на GitHub). Найди INSERT-запросы. Для каждого пройдись по чек-листу выше.
- Возьми Postgres, создай таблицу
orders(order_id PK, amount, event_date). Напиши Python-скрипт, который вставляет 100 строк, ломается на 50-й черезraise Exception. Запусти дважды. Сравни количество строк — будет 150 (дубли). Перепиши наON CONFLICT DO UPDATE— запусти ещё раз, будет 100. - Найди в документации Stripe (или любого payment-провайдера) раздел про
Idempotency-Key. Подумай, какой ключ ты бы использовал для своего pipeline.
Идемпотентность + backfill
Airflow backfill: как запустить DAG за прошлые даты с гарантией идемпотентностиИдемпотентность критична именно для backfill — массового перезапуска задач за прошлые даты. Сценарий: вчера выяснилось, что в SQL был баг две недели подряд. Нужно прогнать pipeline за каждый день из этих двух недель — и получить тот же результат, который получился бы, если бы он отработал штатно. Если задача идемпотентна (MERGE по ключу или DELETE+INSERT по партиции), backfill — это просто airflow tasks backfill ... на N дат, безопасно даже при параллельном запуске. Без идемпотентности backfill = дубли и грязные данные.
Тот же принцип лежит в основе incremental processing (грузим только дельту по watermark): incremental-задача обязана быть идемпотентной по своему окну, иначе любой retry или повтор окна порождает дубли. Полный набор pipeline-паттернов (backfill стратегии, incremental processing patterns, recovery playbook) разбираем в будущем advanced-de-patterns deep-dive.