Learning Platform
Глоссарий Troubleshooting
Урок 13.03 · 22 мин
Начальный
airflowdag exampleapis3dwh

В прошлом уроке мы разобрались с концепциями Airflow. Теперь — конкретный, рабочий end-to-end пример DAG. Этот DAG делает обычную DE-задачу: ежедневно забирает данные из внешнего API, сохраняет в S3, загружает в DWH, и проверяет качество. Это типичный production-сценарий.

Цель — показать, как абстрактные понятия (DAG, Task, Operator, XCom, Sensor) складываются в реальный пайплайн.

Бизнес-задача

Компания продаёт онлайн-курсы. Платежи идут через Stripe. Финансовый отдел хочет каждое утро видеть в Snowflake таблицу stripe_charges_daily с агрегатами по вчерашним платежам: количество, сумма, средний чек, валюта.

Источник — Stripe API. Целевое хранилище — Snowflake. Расписание — ежедневно в 06:00 UTC (буфер 6 часов для late-arriving).

Архитектура пайплайна

DAG: Stripe -> S3 -> Snowflake -> quality check

Пять задач: extract из Stripe API, валидация ответа, загрузка raw в S3, merge в Snowflake, проверка качества.

extract_from_stripeAPI callPython-задача делает запрос к Stripe API за вчерашний период и сохраняет ответ в /tmp.
validate_extractrow count checkПроверка, что получили хоть какие-то данные. Если 0 строк — fail, потому что это сигнал о проблеме.
upload_to_s3raw layerЗагрузка JSON в S3 в раздел s3://my-bucket/raw/stripe/dt=YYYY-MM-DD/.
load_to_snowflakeCOPY INTOSnowflake-операция COPY INTO загружает данные из S3 в staging-таблицу, потом MERGE в финальную.
quality_checkdbt testФинальная проверка: not_null, валюта известная, amount > 0. Если падает — алерт в Slack.
successDAG run doneDAG run помечается успешным, в UI зелёная галочка, метрика 'успешные runs' растёт.

Это типичный shape DE-пайплайна: extract -> validate -> land -> load -> quality. Пять задач, понятные зависимости.

Полный код DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
import json
import os
import stripe


default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': True,
    'email': ['[email protected]'],
}


with DAG(
    dag_id='stripe_charges_daily',
    description='Ежедневная выгрузка платежей Stripe в Snowflake',
    schedule_interval='0 6 * * *',
    start_date=datetime(2026, 5, 1),
    catchup=False,
    max_active_runs=1,
    default_args=default_args,
    tags=['finance', 'stripe', 'daily'],
) as dag:

    # -------- Task 1: extract from Stripe API --------
    def extract_from_stripe(**context):
        execution_date = context['ds']
        ds_nodash = context['ds_nodash']

        # дата начала и конца окна
        start_ts = int(datetime.strptime(execution_date, '%Y-%m-%d').timestamp()) - 86400
        end_ts = start_ts + 86400

        stripe.api_key = os.environ['STRIPE_API_KEY']
        charges = stripe.Charge.list(
            created={'gte': start_ts, 'lt': end_ts},
            limit=100,
        )

        records = []
        for charge in charges.auto_paging_iter():
            records.append({
                'id': charge.id,
                'amount': charge.amount,
                'currency': charge.currency,
                'status': charge.status,
                'created': charge.created,
                'customer_id': charge.customer,
            })

        local_path = f'/tmp/stripe_charges_{ds_nodash}.json'
        with open(local_path, 'w') as f:
            json.dump(records, f)

        # push в XCom путь к файлу и количество строк
        context['ti'].xcom_push(key='local_path', value=local_path)
        context['ti'].xcom_push(key='row_count', value=len(records))

        print(f"Extracted {len(records)} charges to {local_path}")


    extract = PythonOperator(
        task_id='extract_from_stripe',
        python_callable=extract_from_stripe,
    )

    # -------- Task 2: validate extract --------
    def validate_extract(**context):
        ti = context['ti']
        row_count = ti.xcom_pull(task_ids='extract_from_stripe', key='row_count')

        if row_count == 0:
            raise ValueError("Stripe API returned 0 rows — likely an issue with API or window")

        # ожидаемый минимум — 10 платежей в день, иначе подозрительно
        if row_count < 10:
            print(f"WARN: only {row_count} rows, but proceeding")

        print(f"Validated: {row_count} rows OK")


    validate = PythonOperator(
        task_id='validate_extract',
        python_callable=validate_extract,
    )

    # -------- Task 3: upload to S3 --------
    upload_to_s3 = LocalFilesystemToS3Operator(
        task_id='upload_to_s3',
        filename="{{ ti.xcom_pull(task_ids='extract_from_stripe', key='local_path') }}",
        dest_bucket='my-data-bucket',
        dest_key="raw/stripe/dt={{ ds }}/charges.json",
        aws_conn_id='aws_default',
        replace=True,
    )

    # -------- Task 4: load to Snowflake --------
    load_to_snowflake = SnowflakeOperator(
        task_id='load_to_snowflake',
        snowflake_conn_id='snowflake_prod',
        sql="""
            -- staging table
            CREATE OR REPLACE TEMPORARY TABLE stage_stripe_charges AS
            SELECT
                $1:id::STRING                AS charge_id,
                $1:amount::INT               AS amount_cents,
                $1:currency::STRING          AS currency,
                $1:status::STRING            AS status,
                TO_TIMESTAMP($1:created)     AS created_at,
                $1:customer_id::STRING       AS customer_id,
                '{{ ds }}'::DATE             AS dt
            FROM @my_s3_stage/raw/stripe/dt={{ ds }}/charges.json
                (FILE_FORMAT => 'json_format');

            -- merge в финальную
            MERGE INTO raw.stripe_charges AS target
            USING stage_stripe_charges AS source
            ON target.charge_id = source.charge_id
            WHEN MATCHED THEN UPDATE SET
                amount_cents = source.amount_cents,
                status = source.status,
                updated_at = CURRENT_TIMESTAMP
            WHEN NOT MATCHED THEN INSERT
                (charge_id, amount_cents, currency, status, created_at, customer_id, dt, loaded_at)
            VALUES
                (source.charge_id, source.amount_cents, source.currency, source.status,
                 source.created_at, source.customer_id, source.dt, CURRENT_TIMESTAMP);
        """,
    )

    # -------- Task 5: quality check --------
    quality_check = BashOperator(
        task_id='quality_check',
        bash_command='cd /opt/dbt_project && dbt test --select stripe_charges',
    )

    # -------- зависимости --------
    extract >> validate >> upload_to_s3 >> load_to_snowflake >> quality_check

Это полностью рабочий DAG. Разберём каждую задачу по порядку.

Task 1: extract_from_stripe

Python-функция делает запрос к Stripe API за вчерашний период (между двумя timestamp-ами), итерирует по страницам результата (auto_paging_iter()), собирает в список словарей и пишет JSON-файл в /tmp.

Через XCom передаются: путь к файлу (local_path) и количество строк (row_count). Downstream-задачи используют эти значения через Jinja-шаблоны ({{ ti.xcom_pull(...) }}) или Python (ti.xcom_pull).

Важно: API-ключ Stripe берётся из переменной окружения, а не из кода. Это правильный pattern — секреты должны быть в Airflow Variables, в env, или в Vault/Secrets Manager.

Task 2: validate_extract

Простая проверка: 0 строк — fail. Это сигнал о проблеме (API недоступен, неправильное окно, пустой день — стоит проверить). Меньше 10 — warning.

Эта задача спасает от молчаливых ошибок: если бы DAG продолжил с пустыми данными, мы бы заметили это только утром, когда CFO открыл бы пустой дашборд.

Task 3: upload_to_s3

Готовый LocalFilesystemToS3Operator загружает файл из /tmp в S3 в раздел raw/stripe/dt=YYYY-MM-DD/charges.json. Партиционирование по дате — стандартная практика для raw-слоя.

Параметр replace=True означает, что если файл уже есть (например, после retry или backfill), он перезаписывается. Это поддерживает идемпотентность.

Task 4: load_to_snowflake

SnowflakeOperator выполняет SQL: создаёт temporary staging-таблицу из JSON в S3, потом делает MERGE в финальную raw.stripe_charges. MERGE по charge_id — идемпотентный: повторный запуск не создаст дубликаты.

{{ ds }} — Jinja-переменная Airflow, подставляется на дату DAG run (YYYY-MM-DD). Это позволяет backfill: при запуске за прошлую дату подставится та дата.

NOTE

В Airflow есть встроенные Jinja-макросы: ds (дата запуска), ds_nodash (без дефисов), prev_ds, next_ds, execution_date, data_interval_start и многие другие — все они оборачиваются в двойные фигурные скобки в SQL и других шаблонах. Это критически важно для параметризации DAG по дате — без них backfill не работает.

Task 5: quality_check

BashOperator запускает dbt test --select stripe_charges. dbt прогоняет тесты на колонки: not_null, accepted_values для валюты, expression_is_true: amount > 0. Если хоть один тест падает — задача fail, в UI красная, алерт в Slack.

Это обязательный шаг в production-DE: данные нельзя считать загруженными, пока не пройдены тесты качества. Иначе ты узнаешь о проблеме от бизнеса.

dbt schema tests: not_null, unique, accepted_values — то, что запускает quality_check таска

Зависимости

extract >> validate >> upload_to_s3 >> load_to_snowflake >> quality_check

Линейная цепочка: каждая следующая стартует после успеха предыдущей. Если extract упал — никто дальше не стартует. Если validate поймал 0 строк — нет смысла грузить пустоту в S3.

Retries и SLA

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': True,
}

Каждая задача имеет 2 попытки с интервалом 10 минут. Если упала и при ретрае — email DE-команде. Это спасает от транзиентных проблем: API недоступен 5 минут — ретрай решит.

Можно добавить sla=timedelta(hours=2) на DAG или на конкретные задачи. Тогда при превышении SLA — alert.

Что произойдёт при сбое

Реальные сценарии:

Stripe API упал на 30 секунд. extract упадёт, через 10 минут ретрай — Stripe уже доступен, всё хорошо.

API вернул 0 строк (например, был баг в Stripe). validate поймает, DAG run упадёт с понятной ошибкой. DE утром разберётся.

S3 временно недоступен. upload_to_s3 упадёт, ретрай через 10 минут — обычно решает.

MERGE в Snowflake упал из-за блокировки. load_to_snowflake упадёт, ретрай через 10 минут — конкурент закончил, всё ок.

dbt-тест упал, например, amount = NULL у нескольких строк. quality_check падает, DAG run помечается failed. DE-команда получает alert и разбирается, что в исходных данных не так.

Во всех сценариях оркестратор обеспечивает: автоматический retry где возможно, понятный failure где невозможно, observability через UI, alert при проблемах.

Что осталось за кадром

Этот пример упрощённый. В production добавилось бы:

  • Sensor на готовность data lake (например, что предыдущий DAG завершён).
  • Branching — разная логика для разных дней (выходные vs будни).
  • Dynamic task generation — выгрузить из 10 API параллельно, не дублируя код.
  • Data quality framework (Great Expectations, Soda) — более сложные проверки.
  • OpenLineage integration — автоматический lineage для всего пайплайна.
  • Cost tracking — мониторинг затрат на каждую задачу.

Для глубокого изучения этих тем — airflow-course на платформе.

Airflow production-архитектура: Scheduler, Workers, Executor, XCom под капотом
TIP

Хорошая привычка — всегда оборачивать пайплайн в три зоны: extract (получение данных), load (запись в storage), test (проверка качества). Без последней зоны ты не знаешь, корректны ли данные. dbt + Great Expectations + ручные SQL-проверки — стандартный набор.

Попробуй сам

Возьми любой API, который тебе интересен (Twitter, GitHub, погодный сервис), и набросай схему такого же DAG: extract -> validate -> upload -> load -> quality. Какие задачи будут? Какие у них retries? Какие тесты на финальные данные? Это упражнение даёт навык проектировать пайплайны end-to-end.

Проверка знанийKnowledge check
Зачем в production-DAG нужны задачи validate_extract и quality_check, если основная логика уже есть в extract и load?
ОтветAnswer
Задачи validate_extract и quality_check защищают от молчаливых ошибок. Без validate_extract пайплайн может пройти полностью успешно на 0 строк (если API вернул пустой ответ из-за бага или сбоя) и записать пустоту в DWH — и проблема обнаружится только утром, когда CFO откроет пустой дашборд. Без quality_check данные могут быть загружены с NULL в критичных полях, неправильной валютой, нелогичными значениями (amount меньше 0) — и аналитики работают на сломанных данных не подозревая. Эти две задачи — fail-fast механизмы: ловят проблему как можно раньше в пайплайне, превращая молчаливые ошибки в явные failures с alert. Это базовая инженерная гигиена production-DE.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Зачем в production-DAG нужна отдельная задача validate_extract, если данные уже извлечены в предыдущем шаге?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4