В прошлом уроке мы разобрались с концепциями Airflow. Теперь — конкретный, рабочий end-to-end пример DAG. Этот DAG делает обычную DE-задачу: ежедневно забирает данные из внешнего API, сохраняет в S3, загружает в DWH, и проверяет качество. Это типичный production-сценарий.
Цель — показать, как абстрактные понятия (DAG, Task, Operator, XCom, Sensor) складываются в реальный пайплайн.
Бизнес-задача
Компания продаёт онлайн-курсы. Платежи идут через Stripe. Финансовый отдел хочет каждое утро видеть в Snowflake таблицу stripe_charges_daily с агрегатами по вчерашним платежам: количество, сумма, средний чек, валюта.
Источник — Stripe API. Целевое хранилище — Snowflake. Расписание — ежедневно в 06:00 UTC (буфер 6 часов для late-arriving).
Архитектура пайплайна
Пять задач: extract из Stripe API, валидация ответа, загрузка raw в S3, merge в Snowflake, проверка качества.
Это типичный shape DE-пайплайна: extract -> validate -> land -> load -> quality. Пять задач, понятные зависимости.
Полный код DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
import json
import os
import stripe
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
'email_on_failure': True,
'email': ['[email protected]'],
}
with DAG(
dag_id='stripe_charges_daily',
description='Ежедневная выгрузка платежей Stripe в Snowflake',
schedule_interval='0 6 * * *',
start_date=datetime(2026, 5, 1),
catchup=False,
max_active_runs=1,
default_args=default_args,
tags=['finance', 'stripe', 'daily'],
) as dag:
# -------- Task 1: extract from Stripe API --------
def extract_from_stripe(**context):
execution_date = context['ds']
ds_nodash = context['ds_nodash']
# дата начала и конца окна
start_ts = int(datetime.strptime(execution_date, '%Y-%m-%d').timestamp()) - 86400
end_ts = start_ts + 86400
stripe.api_key = os.environ['STRIPE_API_KEY']
charges = stripe.Charge.list(
created={'gte': start_ts, 'lt': end_ts},
limit=100,
)
records = []
for charge in charges.auto_paging_iter():
records.append({
'id': charge.id,
'amount': charge.amount,
'currency': charge.currency,
'status': charge.status,
'created': charge.created,
'customer_id': charge.customer,
})
local_path = f'/tmp/stripe_charges_{ds_nodash}.json'
with open(local_path, 'w') as f:
json.dump(records, f)
# push в XCom путь к файлу и количество строк
context['ti'].xcom_push(key='local_path', value=local_path)
context['ti'].xcom_push(key='row_count', value=len(records))
print(f"Extracted {len(records)} charges to {local_path}")
extract = PythonOperator(
task_id='extract_from_stripe',
python_callable=extract_from_stripe,
)
# -------- Task 2: validate extract --------
def validate_extract(**context):
ti = context['ti']
row_count = ti.xcom_pull(task_ids='extract_from_stripe', key='row_count')
if row_count == 0:
raise ValueError("Stripe API returned 0 rows — likely an issue with API or window")
# ожидаемый минимум — 10 платежей в день, иначе подозрительно
if row_count < 10:
print(f"WARN: only {row_count} rows, but proceeding")
print(f"Validated: {row_count} rows OK")
validate = PythonOperator(
task_id='validate_extract',
python_callable=validate_extract,
)
# -------- Task 3: upload to S3 --------
upload_to_s3 = LocalFilesystemToS3Operator(
task_id='upload_to_s3',
filename="{{ ti.xcom_pull(task_ids='extract_from_stripe', key='local_path') }}",
dest_bucket='my-data-bucket',
dest_key="raw/stripe/dt={{ ds }}/charges.json",
aws_conn_id='aws_default',
replace=True,
)
# -------- Task 4: load to Snowflake --------
load_to_snowflake = SnowflakeOperator(
task_id='load_to_snowflake',
snowflake_conn_id='snowflake_prod',
sql="""
-- staging table
CREATE OR REPLACE TEMPORARY TABLE stage_stripe_charges AS
SELECT
$1:id::STRING AS charge_id,
$1:amount::INT AS amount_cents,
$1:currency::STRING AS currency,
$1:status::STRING AS status,
TO_TIMESTAMP($1:created) AS created_at,
$1:customer_id::STRING AS customer_id,
'{{ ds }}'::DATE AS dt
FROM @my_s3_stage/raw/stripe/dt={{ ds }}/charges.json
(FILE_FORMAT => 'json_format');
-- merge в финальную
MERGE INTO raw.stripe_charges AS target
USING stage_stripe_charges AS source
ON target.charge_id = source.charge_id
WHEN MATCHED THEN UPDATE SET
amount_cents = source.amount_cents,
status = source.status,
updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT
(charge_id, amount_cents, currency, status, created_at, customer_id, dt, loaded_at)
VALUES
(source.charge_id, source.amount_cents, source.currency, source.status,
source.created_at, source.customer_id, source.dt, CURRENT_TIMESTAMP);
""",
)
# -------- Task 5: quality check --------
quality_check = BashOperator(
task_id='quality_check',
bash_command='cd /opt/dbt_project && dbt test --select stripe_charges',
)
# -------- зависимости --------
extract >> validate >> upload_to_s3 >> load_to_snowflake >> quality_check
Это полностью рабочий DAG. Разберём каждую задачу по порядку.
Task 1: extract_from_stripe
Python-функция делает запрос к Stripe API за вчерашний период (между двумя timestamp-ами), итерирует по страницам результата (auto_paging_iter()), собирает в список словарей и пишет JSON-файл в /tmp.
Через XCom передаются: путь к файлу (local_path) и количество строк (row_count). Downstream-задачи используют эти значения через Jinja-шаблоны ({{ ti.xcom_pull(...) }}) или Python (ti.xcom_pull).
Важно: API-ключ Stripe берётся из переменной окружения, а не из кода. Это правильный pattern — секреты должны быть в Airflow Variables, в env, или в Vault/Secrets Manager.
Task 2: validate_extract
Простая проверка: 0 строк — fail. Это сигнал о проблеме (API недоступен, неправильное окно, пустой день — стоит проверить). Меньше 10 — warning.
Эта задача спасает от молчаливых ошибок: если бы DAG продолжил с пустыми данными, мы бы заметили это только утром, когда CFO открыл бы пустой дашборд.
Task 3: upload_to_s3
Готовый LocalFilesystemToS3Operator загружает файл из /tmp в S3 в раздел raw/stripe/dt=YYYY-MM-DD/charges.json. Партиционирование по дате — стандартная практика для raw-слоя.
Параметр replace=True означает, что если файл уже есть (например, после retry или backfill), он перезаписывается. Это поддерживает идемпотентность.
Task 4: load_to_snowflake
SnowflakeOperator выполняет SQL: создаёт temporary staging-таблицу из JSON в S3, потом делает MERGE в финальную raw.stripe_charges. MERGE по charge_id — идемпотентный: повторный запуск не создаст дубликаты.
{{ ds }} — Jinja-переменная Airflow, подставляется на дату DAG run (YYYY-MM-DD). Это позволяет backfill: при запуске за прошлую дату подставится та дата.
В Airflow есть встроенные Jinja-макросы: ds (дата запуска), ds_nodash (без дефисов), prev_ds, next_ds, execution_date, data_interval_start и многие другие — все они оборачиваются в двойные фигурные скобки в SQL и других шаблонах. Это критически важно для параметризации DAG по дате — без них backfill не работает.
Task 5: quality_check
BashOperator запускает dbt test --select stripe_charges. dbt прогоняет тесты на колонки: not_null, accepted_values для валюты, expression_is_true: amount > 0. Если хоть один тест падает — задача fail, в UI красная, алерт в Slack.
Это обязательный шаг в production-DE: данные нельзя считать загруженными, пока не пройдены тесты качества. Иначе ты узнаешь о проблеме от бизнеса.
dbt schema tests: not_null, unique, accepted_values — то, что запускает quality_check таскаЗависимости
extract >> validate >> upload_to_s3 >> load_to_snowflake >> quality_check
Линейная цепочка: каждая следующая стартует после успеха предыдущей. Если extract упал — никто дальше не стартует. Если validate поймал 0 строк — нет смысла грузить пустоту в S3.
Retries и SLA
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=10),
'email_on_failure': True,
}
Каждая задача имеет 2 попытки с интервалом 10 минут. Если упала и при ретрае — email DE-команде. Это спасает от транзиентных проблем: API недоступен 5 минут — ретрай решит.
Можно добавить sla=timedelta(hours=2) на DAG или на конкретные задачи. Тогда при превышении SLA — alert.
Что произойдёт при сбое
Реальные сценарии:
Stripe API упал на 30 секунд. extract упадёт, через 10 минут ретрай — Stripe уже доступен, всё хорошо.
API вернул 0 строк (например, был баг в Stripe). validate поймает, DAG run упадёт с понятной ошибкой. DE утром разберётся.
S3 временно недоступен. upload_to_s3 упадёт, ретрай через 10 минут — обычно решает.
MERGE в Snowflake упал из-за блокировки. load_to_snowflake упадёт, ретрай через 10 минут — конкурент закончил, всё ок.
dbt-тест упал, например, amount = NULL у нескольких строк. quality_check падает, DAG run помечается failed. DE-команда получает alert и разбирается, что в исходных данных не так.
Во всех сценариях оркестратор обеспечивает: автоматический retry где возможно, понятный failure где невозможно, observability через UI, alert при проблемах.
Что осталось за кадром
Этот пример упрощённый. В production добавилось бы:
- Sensor на готовность data lake (например, что предыдущий DAG завершён).
- Branching — разная логика для разных дней (выходные vs будни).
- Dynamic task generation — выгрузить из 10 API параллельно, не дублируя код.
- Data quality framework (Great Expectations, Soda) — более сложные проверки.
- OpenLineage integration — автоматический lineage для всего пайплайна.
- Cost tracking — мониторинг затрат на каждую задачу.
Для глубокого изучения этих тем — airflow-course на платформе.
Airflow production-архитектура: Scheduler, Workers, Executor, XCom под капотомХорошая привычка — всегда оборачивать пайплайн в три зоны: extract (получение данных), load (запись в storage), test (проверка качества). Без последней зоны ты не знаешь, корректны ли данные. dbt + Great Expectations + ручные SQL-проверки — стандартный набор.
Попробуй сам
Возьми любой API, который тебе интересен (Twitter, GitHub, погодный сервис), и набросай схему такого же DAG: extract -> validate -> upload -> load -> quality. Какие задачи будут? Какие у них retries? Какие тесты на финальные данные? Это упражнение даёт навык проектировать пайплайны end-to-end.