DAG validity tests — DagBag, import errors, structural checks
Самый дешёвый и самый ценный тест в Airflow — это DAG validity test. Он проверяет, что ваши DAG-файлы вообще импортируются Python без ошибок. Один тест на 30 строк ловит 80% production-инцидентов: typo в импорте, забытая зависимость в requirements.txt, неправильный start_date, отсутствующий tag, циклическая зависимость.
В этом уроке — конкретный набор тестов на основе airflow.models.DagBag, который должен быть в каждом Airflow проекте. Запускается в CI на каждом PR, занимает ~10-30 секунд, не требует running Airflow.
Test pyramid для Airflow
Перед deep dive — карта где DAG validity tests располагаются в общей testing pyramid. Каждый уровень имеет свой trade-off между speed, coverage и cost.
Stratifу tests по этой pyramid — основа стратегии CI/CD (модуль 16.07). Чем выше уровень, тем шире coverage, медленнее, дороже. Чем ниже — быстрее, дешевле, более targeted. Большая часть значения извлекается из Layer 1.
pytest: test discovery, AAA pattern, plain assert Mocks и monkeypatch: изоляция от внешних зависимостейЗачем нужен DAG validity test
Без тестов поток выглядит так:
- Разработчик пушит DAG-файл в Git
- gitSync sidecar забирает его в pod
- DagFileProcessor пытается импортировать
- ImportError → DAG не появляется в UI
- Через сутки бизнес жалуется: «pipeline не работает!»
- Кто-то открывает
airflow dags list-import-errors→ traceback
С тестами поток:
- Разработчик открывает PR
- CI run
pytest tests/test_dag_validity.py - Test fail с понятным traceback → PR blocked
- Разработчик чинит до merge
DagBag — central abstraction для тестов
airflow.models.DagBag — это коллекция DAGs, parsed из указанной директории. Создание DagBag — это как scheduler делает DAG parsing, но в изолированном Python процессе.
from airflow.models import DagBag
dagbag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False,
read_dags_from_db=False, # ВАЖНО — парсим .py файлы, не читаем из DB
)
# Проверки
print(f"Loaded {len(dagbag.dags)} DAGs")
print(f"Import errors: {dagbag.import_errors}")
for dag_id, dag in dagbag.dags.items():
print(f"{dag_id}: {len(dag.tasks)} tasks, tags={dag.tags}")
DagBag парсит все .py файлы в dag_folder, ловит ImportError/SyntaxError/AirflowException и складывает в import_errors.
Минимальный test suite
# tests/test_dag_validity.py
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dagbag():
"""DagBag fixture — parsed один раз per pytest session."""
return DagBag(
dag_folder="dags/"
include_examples=False,
read_dags_from_db=False,
)
def test_no_import_errors(dagbag):
"""Все DAG-файлы должны импортироваться без ошибок."""
errors = dagbag.import_errors
assert not errors, f"Import errors:\n" + "\n".join(
f"{path}:\n{msg}" for path, msg in errors.items()
)
def test_dags_loaded(dagbag):
"""Должна быть загружена хоть одна DAG."""
assert len(dagbag.dags) > 0, "No DAGs found in dag_folder"
Один test_no_import_errors ловит:
- Typo в
from airflow.providers.amazon.aws.operators.s3 import S3KeyOperator(provider не установлен → ImportError) - Неправильный синтаксис Python
- Reference на функцию которой нет
- Missing variable
Variable.get("foo")если backend strict ValueErrorв@dag(schedule=...)
Структурные тесты
После basic import check добавьте проверки на структуру:
# tests/test_dag_validity.py (continued)
ALLOWED_TAGS = {
"etl", "ml", "monitoring", "staging", "production",
"finance", "marketing", "data-science",
}
def test_dag_has_tags(dagbag):
"""Каждый DAG должен иметь хотя бы один tag."""
for dag_id, dag in dagbag.dags.items():
assert dag.tags, f"DAG {dag_id} missing tags"
def test_dag_tags_in_allowed_set(dagbag):
"""Tags должны быть из разрешённого набора."""
for dag_id, dag in dagbag.dags.items():
unknown = set(dag.tags) - ALLOWED_TAGS
assert not unknown, f"DAG {dag_id} has unknown tags: {unknown}"
def test_dag_has_owner(dagbag):
"""Каждый DAG должен иметь owner в default_args."""
for dag_id, dag in dagbag.dags.items():
owner = dag.default_args.get("owner")
assert owner and owner != "airflow", \
f"DAG {dag_id} missing real owner (got '{owner}')"
def test_dag_has_email_on_failure(dagbag):
"""Production DAGs должны иметь email/Slack на failure."""
for dag_id, dag in dagbag.dags.items():
if "production" in dag.tags:
has_alert = (
dag.default_args.get("email") or
dag.default_args.get("on_failure_callback") or
dag.has_on_failure_callback
)
assert has_alert, f"Production DAG {dag_id} missing failure alert"
def test_dag_catchup_false(dagbag):
"""catchup=True опасен в production. Заставляем явно выбирать."""
for dag_id, dag in dagbag.dags.items():
# Если хотите catchup=True — добавьте tag "allow-catchup"
if "allow-catchup" not in dag.tags:
assert not dag.catchup, \
f"DAG {dag_id} has catchup=True without 'allow-catchup' tag"
def test_dag_start_date_not_dynamic(dagbag):
"""start_date должен быть hardcoded, не datetime.now()."""
import inspect
for dag_id, dag in dagbag.dags.items():
# start_date должен быть в default_args или dag.start_date
sd = dag.start_date
assert sd is not None, f"DAG {dag_id} missing start_date"
# Проверка что start_date не в будущем (могло бы быть от datetime.now())
from datetime import datetime, timezone
assert sd < datetime.now(timezone.utc), \
f"DAG {dag_id} has start_date in future — likely datetime.now()"
def test_dag_no_orphan_tasks(dagbag):
"""Все tasks должны иметь хотя бы upstream или downstream (кроме single-task DAGs)."""
for dag_id, dag in dagbag.dags.items():
if len(dag.tasks) <= 1:
continue
for task in dag.tasks:
has_upstream = bool(task.upstream_task_ids)
has_downstream = bool(task.downstream_task_ids)
assert has_upstream or has_downstream, \
f"Task {dag_id}.{task.task_id} is orphan (no deps)"
def test_dag_id_naming_convention(dagbag):
"""dag_id должен следовать конвенции <team>_<purpose>_<frequency>."""
import re
pattern = re.compile(r"^[a-z][a-z0-9_]+_[a-z][a-z0-9_]+$")
for dag_id in dagbag.dags:
assert pattern.match(dag_id), \
f"DAG {dag_id} doesn't match naming convention <team>_<purpose>"
Cycle detection
Airflow сам отлавливает циклы при dag.cli() parse, но в DagBag через import_errors. Дополнительная проверка для уверенности:
def test_no_cycles(dagbag):
"""Проверка отсутствия циклов в DAG (defensive)."""
from airflow.exceptions import AirflowDagInconsistent
for dag_id, dag in dagbag.dags.items():
try:
dag.test_cycle() # Airflow built-in cycle detection
except AirflowDagInconsistent as e:
pytest.fail(f"DAG {dag_id} has cycle: {e}")
Serializability test
DAG должен быть serializable (scheduler сохраняет в serialized_dag table). Это ловит проблемы с unpickleable references, lambda functions, замыканиями.
def test_dag_serializable(dagbag):
"""DAGs должны быть JSON-serializable для serialized_dag table."""
from airflow.serialization.serialized_objects import SerializedDAG
for dag_id, dag in dagbag.dags.items():
try:
serialized = SerializedDAG.to_dict(dag)
SerializedDAG.from_dict(serialized) # round-trip
except Exception as e:
pytest.fail(f"DAG {dag_id} not serializable: {e}")
Это ловит:
lambdaвpython_callable(lambdas не сериализуются)- Замыкания на mutable объекты
- Operator с
**kwargsссылающимися на нерегулярные объекты
Speed test — parse time не должен взрываться
В production parse каждого DAG-файла должен занимать < 1 секунды. Если top-level code тяжёлый — это замедляет scheduler.
import time
def test_dag_parse_time(dagbag):
"""Каждый DAG-файл должен парситься меньше 2 секунд."""
file_times = {}
for filepath in dagbag.file_last_changed:
start = time.time()
DagBag(dag_folder=filepath, include_examples=False, read_dags_from_db=False)
elapsed = time.time() - start
file_times[filepath] = elapsed
slow = {fp: t for fp, t in file_times.items() if t > 2.0}
assert not slow, f"Slow-parsing DAG files (>2s):\n{slow}"
Альтернативно — Airflow built-in: airflow dags list-import-errors --output table показывает parse time per file.
Fixture для изолированного DagBag
Для тестов часто нужен DagBag только из одной папки или одного файла:
# tests/conftest.py
import pytest
from airflow.models import DagBag
@pytest.fixture
def make_dagbag(tmp_path):
"""Factory для DagBag из in-memory DAG."""
def _make(dag_code: str, filename: str = "test_dag.py"):
filepath = tmp_path / filename
filepath.write_text(dag_code)
return DagBag(dag_folder=str(tmp_path), include_examples=False)
return _make
# Использование:
def test_my_dag(make_dagbag):
dag_code = """
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl'])
def my_dag():
@task
def t1(): return 1
t1()
my_dag()
"""
dagbag = make_dagbag(dag_code)
assert "my_dag" in dagbag.dags
assert dagbag.dags["my_dag"].tags == ["etl"]
CI integration — GitHub Actions
# .github/workflows/dag-validity.yml
name: DAG Validity
on:
pull_request:
paths:
- 'dags/**'
- 'plugins/**'
- 'requirements.txt'
- '.github/workflows/dag-validity.yml'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install Airflow + providers
run: |
pip install "apache-airflow==2.10.5" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.11.txt"
pip install -r requirements.txt
- name: Initialize Airflow DB (SQLite — local for tests)
run: airflow db init
- name: Run DAG validity tests
run: pytest tests/test_dag_validity.py -v
- name: Run airflow dags list-import-errors
run: |
airflow dags list-import-errors
# Exit non-zero если есть errors
if [ -n "$(airflow dags list-import-errors --output json | jq -r '.[] | .filename')" ]; then
exit 1
fi
Constraints файл (constraints-2.10.5-3.11.txt) — official pin для известно-рабочих versions всех dependencies. Без него можно получить incompatible combinations. Always use constraints в CI.
Pre-commit hook
Если хотите ловить ещё раньше — pre-commit hook:
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.0
hooks:
- id: ruff
args: [--fix]
- repo: local
hooks:
- id: airflow-dag-validity
name: Airflow DAG validity
entry: python -m pytest tests/test_dag_validity.py
language: system
files: ^dags/.*\.py$
pass_filenames: false
Тестирование DAG factory
DAG factory pattern (модуль 17.03) генерирует DAGs из конфига. Тестируем что generation работает на edge cases:
def test_dag_factory_empty_config():
"""Generator должен gracefully handle empty config."""
from dags.factory import generate_dags
dags = generate_dags(config=[])
assert dags == {}
def test_dag_factory_malformed_config():
"""Generator должен validate config + raise informative error."""
from dags.factory import generate_dags, ConfigError
with pytest.raises(ConfigError, match="missing required field 'schedule'"):
generate_dags(config=[{"id": "x"}])
def test_dag_factory_produces_valid_dags(dagbag):
"""После factory generation все DAGs валидны."""
# dagbag fixture уже включает factory-generated DAGs
factory_dags = [d for d in dagbag.dags.values() if "generated" in d.tags]
assert len(factory_dags) > 0
for dag in factory_dags:
assert len(dag.tasks) > 0
assert dag.start_date
Production gotchas
include_examples=True ломает CI. По default DagBag загружает example DAGs из Airflow. Они могут конфликтовать (одинаковые dag_id) или давать import errors на missing providers. Всегда include_examples=False.
Variable.get в top-level ломает CI. Если DAG делает Variable.get('foo') в top-level (а Variable не определена в test environment), DagBag получает import error. Fixes: (1) move в task callable; (2) set env var AIRFLOW_VAR_FOO=bar в CI; (3) mock через monkeypatch.setattr.
Connection use в top-level — то же. BaseHook.get_connection('postgres_default') в top-level требует DB connection в CI. Solution — переместить в task или mock.
DagBag в pytest fixture с scope=session — обязательно. Без scope каждый test парсит DagBag заново — медленно на 100+ DAGs. С scope=‘session’ — один parse за весь pytest run.
safe_mode=True в DagBag. По default DagBag ищет ключевые слова “airflow” и “dag” в файле перед polling — это speed optimization. В тестах используйте safe_mode=False если тестируете edge cases где DAG не содержит этих keywords.