DAG factory testing — dynamic generation, parametrized DAGs, edge cases
DAG factory pattern (модуль 17.03) — генерирует множество DAGs из одного куска кода + конфигурация. Например, 20 ETL pipelines с identical structure, отличающимися только source table / target table. Это powerful pattern, но он многократно увеличивает blast radius bug — одна ошибка в factory ломает все 20 DAGs одновременно.
Поэтому testing DAG factory — это критический layer. Этот урок — конкретные паттерны: validation config, parametrized testing edge cases (empty configs, malformed inputs), generation correctness, integration с DAG validity tests.
pytest.mark.parametrize и property-based testing — основа factory tests
Anatomy DAG factory
Базовый pattern:
# dags/factory.py
import yaml
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
class ConfigError(Exception):
"""Raised when factory config is invalid."""
def validate_config(cfg: dict) -> None:
"""Validate single DAG config — raises ConfigError on invalid."""
required = ["id", "source_table", "target_table", "schedule"]
missing = [f for f in required if f not in cfg]
if missing:
raise ConfigError(f"Missing required fields: {missing}")
if not isinstance(cfg["id"], str) or not cfg["id"]:
raise ConfigError(f"Invalid 'id': {cfg['id']!r}")
valid_schedules = {"@daily", "@hourly", "@weekly"}
if cfg["schedule"] not in valid_schedules and not cfg["schedule"].startswith("0 "):
raise ConfigError(f"Invalid schedule: {cfg['schedule']!r}")
def make_etl_dag(cfg: dict):
"""Factory function — returns DAG built from config."""
validate_config(cfg)
@dag(
dag_id=cfg["id"],
schedule=cfg["schedule"],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "factory-generated", cfg.get("team", "data-eng")],
default_args={
"owner": cfg.get("owner", "data-eng"),
"retries": cfg.get("retries", 3),
"retry_delay": timedelta(minutes=5),
},
)
def generated_dag():
@task
def extract():
return f"SELECT * FROM {cfg['source_table']}"
@task
def transform(sql: str):
return sql + " WHERE active = TRUE"
@task
def load(sql: str):
return f"INSERT INTO {cfg['target_table']} {sql}"
load(transform(extract()))
return generated_dag()
def generate_dags(config_path: str = "config/etl_dags.yaml") -> dict:
"""Read YAML, generate DAGs, return {dag_id: DAG}."""
with open(config_path) as f:
configs = yaml.safe_load(f)
if not isinstance(configs, list):
raise ConfigError(f"Config must be a list of dicts, got {type(configs)}")
result = {}
for cfg in configs:
try:
dag = make_etl_dag(cfg)
result[cfg["id"]] = dag
except ConfigError as e:
# В factory можно либо raise (fail loudly), либо log + skip (resilient)
raise ConfigError(f"DAG {cfg.get('id', 'unknown')}: {e}") from e
return result
# Top-level — выполняется при DAG parse
for dag_id, dag_obj in generate_dags().items():
globals()[dag_id] = dag_obj
Что тестировать:
validate_config— корректно отвергает invalid configsmake_etl_dag— производит правильный DAG объект для valid configgenerate_dags— обрабатывает edge cases (empty list, malformed YAML, partial fails)- Integration — generated DAGs проходят DAG validity tests
Test 1 — validate_config
# tests/unit/test_dag_factory.py
import pytest
from dags.factory import validate_config, ConfigError
def test_validate_config_valid():
"""Valid config — no exception."""
cfg = {
"id": "orders_etl",
"source_table": "staging.orders",
"target_table": "prod.orders",
"schedule": "@daily",
}
validate_config(cfg) # no exception
@pytest.mark.parametrize("missing_field", ["id", "source_table", "target_table", "schedule"])
def test_validate_config_missing_required_field(missing_field):
"""Каждое required field обязательно."""
cfg = {
"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"
}
del cfg[missing_field]
with pytest.raises(ConfigError, match=f"Missing required fields.*{missing_field}"):
validate_config(cfg)
@pytest.mark.parametrize("invalid_id", ["", None, 123, [], {}])
def test_validate_config_invalid_id(invalid_id):
cfg = {
"id": invalid_id, "source_table": "s", "target_table": "t", "schedule": "@daily"
}
with pytest.raises(ConfigError, match="Invalid 'id'"):
validate_config(cfg)
@pytest.mark.parametrize("schedule", ["@daily", "@hourly", "@weekly", "0 0 * * *", "0 12 * * 1"])
def test_validate_config_valid_schedules(schedule):
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": schedule}
validate_config(cfg)
@pytest.mark.parametrize("schedule", ["invalid", "now", "", None, "* * *"])
def test_validate_config_invalid_schedule(schedule):
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": schedule}
with pytest.raises(ConfigError, match="Invalid schedule"):
validate_config(cfg)
pytest.mark.parametrize — best practice для variations. Каждый параметр — отдельный test, информативное failure (показывает который param failed).
Test 2 — make_etl_dag
def test_make_etl_dag_produces_dag():
cfg = {
"id": "test_etl",
"source_table": "staging.test",
"target_table": "prod.test",
"schedule": "@daily",
}
dag = make_etl_dag(cfg)
assert dag.dag_id == "test_etl"
assert dag.schedule_interval == "@daily" or str(dag.timetable) == "@daily"
assert "etl" in dag.tags
assert "factory-generated" in dag.tags
assert len(dag.tasks) == 3 # extract, transform, load
def test_make_etl_dag_uses_default_team_tag():
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
dag = make_etl_dag(cfg)
assert "data-eng" in dag.tags # default team
def test_make_etl_dag_custom_team_tag():
cfg = {
"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily",
"team": "finance",
}
dag = make_etl_dag(cfg)
assert "finance" in dag.tags
def test_make_etl_dag_dependencies():
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
dag = make_etl_dag(cfg)
extract = dag.get_task("extract")
transform = dag.get_task("transform")
load = dag.get_task("load")
assert transform.upstream_task_ids == {"extract"}
assert load.upstream_task_ids == {"transform"}
assert extract.downstream_task_ids == {"transform"}
def test_make_etl_dag_default_args():
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
dag = make_etl_dag(cfg)
extract = dag.get_task("extract")
assert extract.retries == 3 # default
assert extract.owner == "data-eng" # default
def test_make_etl_dag_custom_retries():
cfg = {
"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily",
"retries": 5,
}
dag = make_etl_dag(cfg)
assert dag.get_task("extract").retries == 5
def test_make_etl_dag_invalid_config_raises():
cfg = {"id": "x"} # missing required
with pytest.raises(ConfigError):
make_etl_dag(cfg)
Test 3 — generate_dags edge cases
import tempfile
from pathlib import Path
from dags.factory import generate_dags, ConfigError
@pytest.fixture
def config_file(tmp_path):
"""Factory для создания temp YAML config."""
def _make(content: str) -> str:
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text(content)
return str(cfg_path)
return _make
def test_generate_dags_empty_list(config_file):
"""Empty list — no DAGs generated, no error."""
path = config_file("[]")
result = generate_dags(path)
assert result == {}
def test_generate_dags_single_dag(config_file):
yaml_content = """
- id: orders_etl
source_table: staging.orders
target_table: prod.orders
schedule: "@daily"
"""
path = config_file(yaml_content)
result = generate_dags(path)
assert "orders_etl" in result
assert len(result["orders_etl"].tasks) == 3
def test_generate_dags_multiple(config_file):
yaml_content = """
- id: orders_etl
source_table: staging.orders
target_table: prod.orders
schedule: "@daily"
- id: customers_etl
source_table: staging.customers
target_table: prod.customers
schedule: "@hourly"
- id: products_etl
source_table: staging.products
target_table: prod.products
schedule: "@weekly"
"""
path = config_file(yaml_content)
result = generate_dags(path)
assert set(result.keys()) == {"orders_etl", "customers_etl", "products_etl"}
def test_generate_dags_malformed_yaml(config_file):
path = config_file("this: is: not: valid: yaml: at: all")
import yaml
with pytest.raises((yaml.YAMLError, ConfigError)):
generate_dags(path)
def test_generate_dags_not_a_list(config_file):
"""Config должен быть list, не dict."""
path = config_file("id: x\nsource_table: s\ntarget_table: t\nschedule: '@daily'")
with pytest.raises(ConfigError, match="must be a list"):
generate_dags(path)
def test_generate_dags_invalid_dag_raises_with_id(config_file):
"""Error должен указать какой DAG failed."""
yaml_content = """
- id: good_dag
source_table: s
target_table: t
schedule: "@daily"
- id: bad_dag
source_table: s
schedule: "@daily"
"""
path = config_file(yaml_content)
with pytest.raises(ConfigError, match="bad_dag.*Missing required"):
generate_dags(path)
def test_generate_dags_duplicate_ids(config_file):
"""Duplicate dag_id — последний выигрывает, или должен raise?"""
yaml_content = """
- id: dup
source_table: s1
target_table: t1
schedule: "@daily"
- id: dup
source_table: s2
target_table: t2
schedule: "@daily"
"""
path = config_file(yaml_content)
result = generate_dags(path)
# В нашей реализации — last wins
assert len(result) == 1
# Но возможно лучше fail loudly — add explicit check в generate_dags
Test 4 — generated DAGs валидны
После factory все generated DAGs должны проходить DAG validity tests:
def test_generated_dags_have_unique_task_ids():
"""Все generated DAGs должны иметь unique task_ids внутри DAG."""
configs = [
{"id": f"dag_{i}", "source_table": f"s_{i}", "target_table": f"t_{i}", "schedule": "@daily"}
for i in range(10)
]
for cfg in configs:
dag = make_etl_dag(cfg)
task_ids = [t.task_id for t in dag.tasks]
assert len(task_ids) == len(set(task_ids)), f"Duplicate task_ids in {cfg['id']}"
def test_generated_dags_pass_validity_checks():
"""Generated DAGs должны иметь tags, owner, valid start_date."""
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
dag = make_etl_dag(cfg)
assert dag.tags
assert dag.default_args["owner"] != "airflow" # Not default
assert dag.start_date.year >= 2024
assert not dag.catchup # catchup=False enforced
def test_generated_dag_serializable():
"""Generated DAG must be JSON-serializable."""
from airflow.serialization.serialized_objects import SerializedDAG
cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
dag = make_etl_dag(cfg)
serialized = SerializedDAG.to_dict(dag)
SerializedDAG.from_dict(serialized)
Property-based testing с hypothesis
Для глубокой проверки edge cases — hypothesis:
# pip install hypothesis
from hypothesis import given, strategies as st
@given(
dag_id=st.text(min_size=1, max_size=50,
alphabet=st.characters(whitelist_categories=("Ll", "Lu", "Nd"), whitelist_characters="_")),
source=st.text(min_size=1, max_size=50),
target=st.text(min_size=1, max_size=50),
schedule=st.sampled_from(["@daily", "@hourly", "@weekly", "0 0 * * *"]),
)
def test_make_etl_dag_property(dag_id, source, target, schedule):
"""Для любого valid config — make_etl_dag не должна крашиться."""
cfg = {
"id": dag_id,
"source_table": source,
"target_table": target,
"schedule": schedule,
}
dag = make_etl_dag(cfg)
assert dag.dag_id == dag_id
assert len(dag.tasks) == 3
Hypothesis генерирует сотни variations input. Если operator падает на каком-то — hypothesis даёт minimal failing example.
Pitfalls в factory pattern
Production gotchas, которые tests должны ловить:
Pitfall 1 — top-level network call
# ❌ ПЛОХО — выполняется на каждом DAG parse
def load_configs():
import requests
return requests.get("https://config-api/etl-configs").json()
for cfg in load_configs(): # network call каждые 30s
make_etl_dag(cfg)
Test:
def test_no_network_call_in_factory(mocker):
"""Factory не должна делать network calls на module-level."""
mock_get = mocker.patch("requests.get")
# Reimport module
import importlib
from dags import factory
importlib.reload(factory)
# Если был network call — assert fails
assert mock_get.call_count == 0, "Factory makes network call on import"
Pitfall 2 — mutable default arguments
# ❌ ПЛОХО
def make_dag(cfg, args={"retries": 3}): # default mutable!
args["dag_id"] = cfg["id"] # mutates default
Tests с hypothesis ловят это — multiple calls показывают что state shared.
Pitfall 3 — top-level imports heavy
import tensorflow as tf # imports 2-3s
import pandas as pd
def make_dag(cfg):
# ...
Test parse time:
def test_module_import_fast():
import time
start = time.time()
import importlib
from dags import factory
importlib.reload(factory)
assert (time.time() - start) < 1.0, "Factory module imports >1s — heavy top-level imports?"
CI integration
Factory test suite в CI:
# .github/workflows/test-factory.yml
jobs:
test-factory:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -r requirements.txt pytest pytest-mock hypothesis
- name: Unit tests factory logic
run: pytest tests/unit/test_dag_factory.py -v
- name: Property-based tests
run: pytest tests/unit/test_dag_factory_property.py -v --hypothesis-show-statistics
- name: Validate generated DAGs
run: |
# Run DAG validity tests against the actual config
pytest tests/test_dag_validity.py -v
Production gotchas
globals()[dag_id] = dag_obj обязательно. Airflow находит DAGs только в module-level globals. Если factory возвращает dict but не assign в globals — DAGs не появятся.
Config file path должен быть relative к DAG file. open("config/etl.yaml") относительно cwd, который у DagFileProcessor может быть unexpected. Use Path(__file__).parent / "config" / "etl.yaml".
Unique dag_id mandatory. Если factory создаёт duplicate dag_ids — Airflow logs warning и одна DAG silently overwrites другую. Add assertion в generate_dags чтобы fail loudly.
Closures in @task lambdas. cfg["source_table"] в extract() — это closure. Если cfg mutate-нется между parse и execute — баг. Best practice — copy values в local variables:
def make_etl_dag(cfg):
source = cfg["source_table"] # capture now
target = cfg["target_table"]
@dag(...)
def generated_dag():
@task
def extract():
return f"SELECT * FROM {source}" # uses captured local
Factory size — discipline matters. 10 DAGs из factory — ок. 100 — questionable. 1000 — anti-pattern (parse time, UI overload). При 50+ generated DAGs — задумайтесь, может ли это быть один DAG с Dynamic Task Mapping (модуль 07).