Learning Platform
Глоссарий Troubleshooting
Урок 17.02 · 22 мин
Продвинутый
TestingDagBagImport ErrorsStructural Testspytest

DAG validity tests — DagBag, import errors, structural checks

Самый дешёвый и самый ценный тест в Airflow — это DAG validity test. Он проверяет, что ваши DAG-файлы вообще импортируются Python без ошибок. Один тест на 30 строк ловит 80% production-инцидентов: typo в импорте, забытая зависимость в requirements.txt, неправильный start_date, отсутствующий tag, циклическая зависимость.

В этом уроке — конкретный набор тестов на основе airflow.models.DagBag, который должен быть в каждом Airflow проекте. Запускается в CI на каждом PR, занимает ~10-30 секунд, не требует running Airflow.


Test pyramid для Airflow

Перед deep dive — карта где DAG validity tests располагаются в общей testing pyramid. Каждый уровень имеет свой trade-off между speed, coverage и cost.

Test pyramid: DAG validity → unit → integration → E2E
Layer 1: DAG validityDagBag().import_errors — единственный test, который ловит ~70% production-инцидентов 'DAG не появился в UI'. Не требует running Airflow, занимает 10-30 секунд на 100 DAGs. Запускается на каждый PR commit, blocking merge. Ловит import errors, typo, missing providers, неправильный start_date.
если import ok — двигаемся глубже
Layer 2: Unit testspytest tests/unit/ с mocked Hooks, Connections, Variables. Тестирует business logic одного operator/function в изоляции. SQLite backend, UNIT_TEST_MODE=True, no scheduler. <100ms per test. Ловит logic bugs, edge cases, retry behavior. Запускается на каждый PR commit.
logic корректна — проверяем integration
Layer 3: Integration testsairflow tasks test / airflow dags test с real Airflow init (SQLite или Postgres через testcontainers). Real Connections, real XCom, real scheduler logic. Mocked external services через moto/wiremock. 1-30s per test. Ловит integration bugs (Hook + Connection, XCom flow, trigger_rules). Запускается post-merge или nightly.
integration ok — full pipeline check
Layer 4: E2E testsПолный pipeline на staging Airflow с real connections к sample data sources. 5-10 critical DAGs через full lifecycle. Slow (минуты), expensive (real cloud resources). Запускается nightly или manually before release. Ловит configuration drift, infra issues, real external service integration bugs.

Stratifу tests по этой pyramid — основа стратегии CI/CD (модуль 16.07). Чем выше уровень, тем шире coverage, медленнее, дороже. Чем ниже — быстрее, дешевле, более targeted. Большая часть значения извлекается из Layer 1.

pytest: test discovery, AAA pattern, plain assert Mocks и monkeypatch: изоляция от внешних зависимостей

Зачем нужен DAG validity test

Без тестов поток выглядит так:

  1. Разработчик пушит DAG-файл в Git
  2. gitSync sidecar забирает его в pod
  3. DagFileProcessor пытается импортировать
  4. ImportError → DAG не появляется в UI
  5. Через сутки бизнес жалуется: «pipeline не работает!»
  6. Кто-то открывает airflow dags list-import-errors → traceback

С тестами поток:

  1. Разработчик открывает PR
  2. CI run pytest tests/test_dag_validity.py
  3. Test fail с понятным traceback → PR blocked
  4. Разработчик чинит до merge

DagBag — central abstraction для тестов

airflow.models.DagBag — это коллекция DAGs, parsed из указанной директории. Создание DagBag — это как scheduler делает DAG parsing, но в изолированном Python процессе.

from airflow.models import DagBag

dagbag = DagBag(
    dag_folder='/opt/airflow/dags',
    include_examples=False,
    read_dags_from_db=False,  # ВАЖНО — парсим .py файлы, не читаем из DB
)

# Проверки
print(f"Loaded {len(dagbag.dags)} DAGs")
print(f"Import errors: {dagbag.import_errors}")

for dag_id, dag in dagbag.dags.items():
    print(f"{dag_id}: {len(dag.tasks)} tasks, tags={dag.tags}")

DagBag парсит все .py файлы в dag_folder, ловит ImportError/SyntaxError/AirflowException и складывает в import_errors.


Минимальный test suite

# tests/test_dag_validity.py
import pytest
from airflow.models import DagBag

@pytest.fixture(scope="session")
def dagbag():
    """DagBag fixture — parsed один раз per pytest session."""
    return DagBag(
        dag_folder="dags/"
        include_examples=False,
        read_dags_from_db=False,
    )

def test_no_import_errors(dagbag):
    """Все DAG-файлы должны импортироваться без ошибок."""
    errors = dagbag.import_errors
    assert not errors, f"Import errors:\n" + "\n".join(
        f"{path}:\n{msg}" for path, msg in errors.items()
    )

def test_dags_loaded(dagbag):
    """Должна быть загружена хоть одна DAG."""
    assert len(dagbag.dags) > 0, "No DAGs found in dag_folder"

Один test_no_import_errors ловит:

  • Typo в from airflow.providers.amazon.aws.operators.s3 import S3KeyOperator (provider не установлен → ImportError)
  • Неправильный синтаксис Python
  • Reference на функцию которой нет
  • Missing variable Variable.get("foo") если backend strict
  • ValueError в @dag(schedule=...)

Структурные тесты

После basic import check добавьте проверки на структуру:

# tests/test_dag_validity.py (continued)
ALLOWED_TAGS = {
    "etl", "ml", "monitoring", "staging", "production",
    "finance", "marketing", "data-science",
}

def test_dag_has_tags(dagbag):
    """Каждый DAG должен иметь хотя бы один tag."""
    for dag_id, dag in dagbag.dags.items():
        assert dag.tags, f"DAG {dag_id} missing tags"

def test_dag_tags_in_allowed_set(dagbag):
    """Tags должны быть из разрешённого набора."""
    for dag_id, dag in dagbag.dags.items():
        unknown = set(dag.tags) - ALLOWED_TAGS
        assert not unknown, f"DAG {dag_id} has unknown tags: {unknown}"

def test_dag_has_owner(dagbag):
    """Каждый DAG должен иметь owner в default_args."""
    for dag_id, dag in dagbag.dags.items():
        owner = dag.default_args.get("owner")
        assert owner and owner != "airflow", \
            f"DAG {dag_id} missing real owner (got '{owner}')"

def test_dag_has_email_on_failure(dagbag):
    """Production DAGs должны иметь email/Slack на failure."""
    for dag_id, dag in dagbag.dags.items():
        if "production" in dag.tags:
            has_alert = (
                dag.default_args.get("email") or
                dag.default_args.get("on_failure_callback") or
                dag.has_on_failure_callback
            )
            assert has_alert, f"Production DAG {dag_id} missing failure alert"

def test_dag_catchup_false(dagbag):
    """catchup=True опасен в production. Заставляем явно выбирать."""
    for dag_id, dag in dagbag.dags.items():
        # Если хотите catchup=True — добавьте tag "allow-catchup"
        if "allow-catchup" not in dag.tags:
            assert not dag.catchup, \
                f"DAG {dag_id} has catchup=True without 'allow-catchup' tag"

def test_dag_start_date_not_dynamic(dagbag):
    """start_date должен быть hardcoded, не datetime.now()."""
    import inspect
    for dag_id, dag in dagbag.dags.items():
        # start_date должен быть в default_args или dag.start_date
        sd = dag.start_date
        assert sd is not None, f"DAG {dag_id} missing start_date"
        # Проверка что start_date не в будущем (могло бы быть от datetime.now())
        from datetime import datetime, timezone
        assert sd < datetime.now(timezone.utc), \
            f"DAG {dag_id} has start_date in future — likely datetime.now()"

def test_dag_no_orphan_tasks(dagbag):
    """Все tasks должны иметь хотя бы upstream или downstream (кроме single-task DAGs)."""
    for dag_id, dag in dagbag.dags.items():
        if len(dag.tasks) <= 1:
            continue
        for task in dag.tasks:
            has_upstream = bool(task.upstream_task_ids)
            has_downstream = bool(task.downstream_task_ids)
            assert has_upstream or has_downstream, \
                f"Task {dag_id}.{task.task_id} is orphan (no deps)"

def test_dag_id_naming_convention(dagbag):
    """dag_id должен следовать конвенции <team>_<purpose>_<frequency>."""
    import re
    pattern = re.compile(r"^[a-z][a-z0-9_]+_[a-z][a-z0-9_]+$")
    for dag_id in dagbag.dags:
        assert pattern.match(dag_id), \
            f"DAG {dag_id} doesn't match naming convention <team>_<purpose>"

Cycle detection

Airflow сам отлавливает циклы при dag.cli() parse, но в DagBag через import_errors. Дополнительная проверка для уверенности:

def test_no_cycles(dagbag):
    """Проверка отсутствия циклов в DAG (defensive)."""
    from airflow.exceptions import AirflowDagInconsistent
    for dag_id, dag in dagbag.dags.items():
        try:
            dag.test_cycle()  # Airflow built-in cycle detection
        except AirflowDagInconsistent as e:
            pytest.fail(f"DAG {dag_id} has cycle: {e}")

Serializability test

DAG должен быть serializable (scheduler сохраняет в serialized_dag table). Это ловит проблемы с unpickleable references, lambda functions, замыканиями.

def test_dag_serializable(dagbag):
    """DAGs должны быть JSON-serializable для serialized_dag table."""
    from airflow.serialization.serialized_objects import SerializedDAG
    for dag_id, dag in dagbag.dags.items():
        try:
            serialized = SerializedDAG.to_dict(dag)
            SerializedDAG.from_dict(serialized)  # round-trip
        except Exception as e:
            pytest.fail(f"DAG {dag_id} not serializable: {e}")

Это ловит:

  • lambda в python_callable (lambdas не сериализуются)
  • Замыкания на mutable объекты
  • Operator с **kwargs ссылающимися на нерегулярные объекты

Speed test — parse time не должен взрываться

В production parse каждого DAG-файла должен занимать < 1 секунды. Если top-level code тяжёлый — это замедляет scheduler.

import time

def test_dag_parse_time(dagbag):
    """Каждый DAG-файл должен парситься меньше 2 секунд."""
    file_times = {}
    for filepath in dagbag.file_last_changed:
        start = time.time()
        DagBag(dag_folder=filepath, include_examples=False, read_dags_from_db=False)
        elapsed = time.time() - start
        file_times[filepath] = elapsed

    slow = {fp: t for fp, t in file_times.items() if t > 2.0}
    assert not slow, f"Slow-parsing DAG files (>2s):\n{slow}"

Альтернативно — Airflow built-in: airflow dags list-import-errors --output table показывает parse time per file.


Fixture для изолированного DagBag

Для тестов часто нужен DagBag только из одной папки или одного файла:

# tests/conftest.py
import pytest
from airflow.models import DagBag

@pytest.fixture
def make_dagbag(tmp_path):
    """Factory для DagBag из in-memory DAG."""
    def _make(dag_code: str, filename: str = "test_dag.py"):
        filepath = tmp_path / filename
        filepath.write_text(dag_code)
        return DagBag(dag_folder=str(tmp_path), include_examples=False)
    return _make

# Использование:
def test_my_dag(make_dagbag):
    dag_code = """
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl'])
def my_dag():
    @task
    def t1(): return 1
    t1()

my_dag()
"""
    dagbag = make_dagbag(dag_code)
    assert "my_dag" in dagbag.dags
    assert dagbag.dags["my_dag"].tags == ["etl"]

CI integration — GitHub Actions

# .github/workflows/dag-validity.yml
name: DAG Validity

on:
  pull_request:
    paths:
      - 'dags/**'
      - 'plugins/**'
      - 'requirements.txt'
      - '.github/workflows/dag-validity.yml'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install Airflow + providers
        run: |
          pip install "apache-airflow==2.10.5" \
            --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.11.txt"
          pip install -r requirements.txt

      - name: Initialize Airflow DB (SQLite — local for tests)
        run: airflow db init

      - name: Run DAG validity tests
        run: pytest tests/test_dag_validity.py -v

      - name: Run airflow dags list-import-errors
        run: |
          airflow dags list-import-errors
          # Exit non-zero если есть errors
          if [ -n "$(airflow dags list-import-errors --output json | jq -r '.[] | .filename')" ]; then
            exit 1
          fi
NOTE

Constraints файл (constraints-2.10.5-3.11.txt) — official pin для известно-рабочих versions всех dependencies. Без него можно получить incompatible combinations. Always use constraints в CI.


Pre-commit hook

Если хотите ловить ещё раньше — pre-commit hook:

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.6.0
    hooks:
      - id: ruff
        args: [--fix]

  - repo: local
    hooks:
      - id: airflow-dag-validity
        name: Airflow DAG validity
        entry: python -m pytest tests/test_dag_validity.py
        language: system
        files: ^dags/.*\.py$
        pass_filenames: false

Тестирование DAG factory

DAG factory pattern (модуль 17.03) генерирует DAGs из конфига. Тестируем что generation работает на edge cases:

def test_dag_factory_empty_config():
    """Generator должен gracefully handle empty config."""
    from dags.factory import generate_dags
    dags = generate_dags(config=[])
    assert dags == {}

def test_dag_factory_malformed_config():
    """Generator должен validate config + raise informative error."""
    from dags.factory import generate_dags, ConfigError
    with pytest.raises(ConfigError, match="missing required field 'schedule'"):
        generate_dags(config=[{"id": "x"}])

def test_dag_factory_produces_valid_dags(dagbag):
    """После factory generation все DAGs валидны."""
    # dagbag fixture уже включает factory-generated DAGs
    factory_dags = [d for d in dagbag.dags.values() if "generated" in d.tags]
    assert len(factory_dags) > 0
    for dag in factory_dags:
        assert len(dag.tasks) > 0
        assert dag.start_date

Production gotchas

include_examples=True ломает CI. По default DagBag загружает example DAGs из Airflow. Они могут конфликтовать (одинаковые dag_id) или давать import errors на missing providers. Всегда include_examples=False.

Variable.get в top-level ломает CI. Если DAG делает Variable.get('foo') в top-level (а Variable не определена в test environment), DagBag получает import error. Fixes: (1) move в task callable; (2) set env var AIRFLOW_VAR_FOO=bar в CI; (3) mock через monkeypatch.setattr.

Connection use в top-level — то же. BaseHook.get_connection('postgres_default') в top-level требует DB connection в CI. Solution — переместить в task или mock.

DagBag в pytest fixture с scope=session — обязательно. Без scope каждый test парсит DagBag заново — медленно на 100+ DAGs. С scope=‘session’ — один parse за весь pytest run.

safe_mode=True в DagBag. По default DagBag ищет ключевые слова “airflow” и “dag” в файле перед polling — это speed optimization. В тестах используйте safe_mode=False если тестируете edge cases где DAG не содержит этих keywords.


Проверка знанийKnowledge check
Команда хочет настроить минимально работающий DAG validity test suite. Какие 3 теста — must-have, который ловит 80% production-багов?
ОтветAnswer
Три must-have теста: (1) **test_no_import_errors** — самый важный. Один SELECT через DagBag().import_errors ловит typo в imports, missing providers в requirements.txt, syntax errors, неправильный start_date типа datetime.now(). Ловит ~70% production-инцидентов 'DAG не появился'. (2) **test_dag_has_tags** + **test_dag_has_owner** — структурные тесты. Без owner alert не дойдёт до правильного человека; без tags impossible filter в UI и difficult organize по командам. Ловит organizational discipline issues. (3) **test_no_cycles** или **test_dag_serializable** — defensive проверка что DAG валиден на уровне graph theory + serialization. Ловит lambda в python_callable, циклические dependencies (хоть Airflow сам detect, лучше catch в CI). Setup: pytest fixture dagbag with scope='session' (один parse), CI через GitHub Actions с airflow constraints file. Total runtime ~10-30s. Это даёт самый высокий ROI среди всех тестов — занимает день setup, экономит часы incident response. Дополнительно: тест на parse time (<2s per file) и tag whitelist для governance — после трёх основных.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Самый дешёвый и полезный test для Airflow project — что это?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7