Learning Platform
Глоссарий Troubleshooting
Урок 17.06 · 22 мин
Продвинутый
DAG FactoryDynamic DAGsParametrizedYAML ConfigValidation

DAG factory testing — dynamic generation, parametrized DAGs, edge cases

DAG factory pattern (модуль 17.03) — генерирует множество DAGs из одного куска кода + конфигурация. Например, 20 ETL pipelines с identical structure, отличающимися только source table / target table. Это powerful pattern, но он многократно увеличивает blast radius bug — одна ошибка в factory ломает все 20 DAGs одновременно.

Поэтому testing DAG factory — это критический layer. Этот урок — конкретные паттерны: validation config, parametrized testing edge cases (empty configs, malformed inputs), generation correctness, integration с DAG validity tests.


pytest.mark.parametrize и property-based testing — основа factory tests

Anatomy DAG factory

Базовый pattern:

# dags/factory.py
import yaml
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator

class ConfigError(Exception):
    """Raised when factory config is invalid."""

def validate_config(cfg: dict) -> None:
    """Validate single DAG config — raises ConfigError on invalid."""
    required = ["id", "source_table", "target_table", "schedule"]
    missing = [f for f in required if f not in cfg]
    if missing:
        raise ConfigError(f"Missing required fields: {missing}")

    if not isinstance(cfg["id"], str) or not cfg["id"]:
        raise ConfigError(f"Invalid 'id': {cfg['id']!r}")

    valid_schedules = {"@daily", "@hourly", "@weekly"}
    if cfg["schedule"] not in valid_schedules and not cfg["schedule"].startswith("0 "):
        raise ConfigError(f"Invalid schedule: {cfg['schedule']!r}")

def make_etl_dag(cfg: dict):
    """Factory function — returns DAG built from config."""
    validate_config(cfg)

    @dag(
        dag_id=cfg["id"],
        schedule=cfg["schedule"],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=["etl", "factory-generated", cfg.get("team", "data-eng")],
        default_args={
            "owner": cfg.get("owner", "data-eng"),
            "retries": cfg.get("retries", 3),
            "retry_delay": timedelta(minutes=5),
        },
    )
    def generated_dag():
        @task
        def extract():
            return f"SELECT * FROM {cfg['source_table']}"

        @task
        def transform(sql: str):
            return sql + " WHERE active = TRUE"

        @task
        def load(sql: str):
            return f"INSERT INTO {cfg['target_table']} {sql}"

        load(transform(extract()))

    return generated_dag()

def generate_dags(config_path: str = "config/etl_dags.yaml") -> dict:
    """Read YAML, generate DAGs, return {dag_id: DAG}."""
    with open(config_path) as f:
        configs = yaml.safe_load(f)

    if not isinstance(configs, list):
        raise ConfigError(f"Config must be a list of dicts, got {type(configs)}")

    result = {}
    for cfg in configs:
        try:
            dag = make_etl_dag(cfg)
            result[cfg["id"]] = dag
        except ConfigError as e:
            # В factory можно либо raise (fail loudly), либо log + skip (resilient)
            raise ConfigError(f"DAG {cfg.get('id', 'unknown')}: {e}") from e

    return result

# Top-level — выполняется при DAG parse
for dag_id, dag_obj in generate_dags().items():
    globals()[dag_id] = dag_obj

Что тестировать:

  1. validate_config — корректно отвергает invalid configs
  2. make_etl_dag — производит правильный DAG объект для valid config
  3. generate_dags — обрабатывает edge cases (empty list, malformed YAML, partial fails)
  4. Integration — generated DAGs проходят DAG validity tests
Factory function flow: config → factory → generated DAGs
YAML configconfig/etl_dags.yaml — list of dicts с required fields (id, source_table, target_table, schedule). Committed в Git для deterministic generation. Test point: pytest fixture создаёт temp YAML с разным содержимым (empty, single, multiple, duplicate, malformed).
yaml.safe_load
generate_dags(config_path)Top-level orchestrator. Reads YAML, validates list structure, detects duplicates dag_ids (raise ConfigError если есть), iterates через configs, вызывает make_etl_dag для каждого. Returns dict[dag_id, DAG]. Test edge cases: empty list ([]), not-a-list (dict), malformed YAML.
for each cfg
validate_config(cfg)Eager validation — fail fast с informative error. Required fields check, type check, schedule whitelist. Test через pytest.mark.parametrize: каждое required field missing → ConfigError, invalid id types ([], None, 123) → ConfigError, valid schedules pass.
make_etl_dag(cfg)Factory function — returns DAG instance. Captures cfg values в local vars (избегать closure issues), builds @dag(...) с params, defines tasks внутри, calls generated() для instantiate. Test: dag.dag_id == cfg['id'], len(dag.tasks) == 3, tags содержат team, dependencies правильные.
result dict
{dag_id: DAG, ...}Return value generate_dags. Test point: assert set(result.keys()) == expected_ids, len(result) == len(configs) если no duplicates. Property-based test через hypothesis: random configs → call dice → identical output (deterministic).
critical: globals() assignment
for dag_id, dag in generate_dags().items(): globals()[dag_id] = dagCritical step. Airflow scans module's globals() для DAG instances. Если результат остался в local var (dags = generate_dags()) — DAGs invisible в UI. Test: import factory module, assert hasattr(factory_module, dag_id) для каждого expected dag_id.
DAG parse → serialized_dag table
Generated DAGs visible в UIFinal state: scheduler видит DAGs в globals(), serialize в metadata DB, UI рендерит. Integration test: dagbag = DagBag('dags/'); assert все expected dag_ids в dagbag.dags; для каждого: SerializedDAG.to_dict + from_dict round-trip без errors.

Test 1 — validate_config

# tests/unit/test_dag_factory.py
import pytest
from dags.factory import validate_config, ConfigError

def test_validate_config_valid():
    """Valid config — no exception."""
    cfg = {
        "id": "orders_etl",
        "source_table": "staging.orders",
        "target_table": "prod.orders",
        "schedule": "@daily",
    }
    validate_config(cfg)  # no exception

@pytest.mark.parametrize("missing_field", ["id", "source_table", "target_table", "schedule"])
def test_validate_config_missing_required_field(missing_field):
    """Каждое required field обязательно."""
    cfg = {
        "id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"
    }
    del cfg[missing_field]
    with pytest.raises(ConfigError, match=f"Missing required fields.*{missing_field}"):
        validate_config(cfg)

@pytest.mark.parametrize("invalid_id", ["", None, 123, [], {}])
def test_validate_config_invalid_id(invalid_id):
    cfg = {
        "id": invalid_id, "source_table": "s", "target_table": "t", "schedule": "@daily"
    }
    with pytest.raises(ConfigError, match="Invalid 'id'"):
        validate_config(cfg)

@pytest.mark.parametrize("schedule", ["@daily", "@hourly", "@weekly", "0 0 * * *", "0 12 * * 1"])
def test_validate_config_valid_schedules(schedule):
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": schedule}
    validate_config(cfg)

@pytest.mark.parametrize("schedule", ["invalid", "now", "", None, "* * *"])
def test_validate_config_invalid_schedule(schedule):
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": schedule}
    with pytest.raises(ConfigError, match="Invalid schedule"):
        validate_config(cfg)

pytest.mark.parametrize — best practice для variations. Каждый параметр — отдельный test, информативное failure (показывает который param failed).


Test 2 — make_etl_dag

def test_make_etl_dag_produces_dag():
    cfg = {
        "id": "test_etl",
        "source_table": "staging.test",
        "target_table": "prod.test",
        "schedule": "@daily",
    }
    dag = make_etl_dag(cfg)

    assert dag.dag_id == "test_etl"
    assert dag.schedule_interval == "@daily" or str(dag.timetable) == "@daily"
    assert "etl" in dag.tags
    assert "factory-generated" in dag.tags
    assert len(dag.tasks) == 3  # extract, transform, load

def test_make_etl_dag_uses_default_team_tag():
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
    dag = make_etl_dag(cfg)
    assert "data-eng" in dag.tags  # default team

def test_make_etl_dag_custom_team_tag():
    cfg = {
        "id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily",
        "team": "finance",
    }
    dag = make_etl_dag(cfg)
    assert "finance" in dag.tags

def test_make_etl_dag_dependencies():
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
    dag = make_etl_dag(cfg)
    extract = dag.get_task("extract")
    transform = dag.get_task("transform")
    load = dag.get_task("load")

    assert transform.upstream_task_ids == {"extract"}
    assert load.upstream_task_ids == {"transform"}
    assert extract.downstream_task_ids == {"transform"}

def test_make_etl_dag_default_args():
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
    dag = make_etl_dag(cfg)

    extract = dag.get_task("extract")
    assert extract.retries == 3  # default
    assert extract.owner == "data-eng"  # default

def test_make_etl_dag_custom_retries():
    cfg = {
        "id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily",
        "retries": 5,
    }
    dag = make_etl_dag(cfg)
    assert dag.get_task("extract").retries == 5

def test_make_etl_dag_invalid_config_raises():
    cfg = {"id": "x"}  # missing required
    with pytest.raises(ConfigError):
        make_etl_dag(cfg)

Test 3 — generate_dags edge cases

import tempfile
from pathlib import Path
from dags.factory import generate_dags, ConfigError

@pytest.fixture
def config_file(tmp_path):
    """Factory для создания temp YAML config."""
    def _make(content: str) -> str:
        cfg_path = tmp_path / "config.yaml"
        cfg_path.write_text(content)
        return str(cfg_path)
    return _make

def test_generate_dags_empty_list(config_file):
    """Empty list — no DAGs generated, no error."""
    path = config_file("[]")
    result = generate_dags(path)
    assert result == {}

def test_generate_dags_single_dag(config_file):
    yaml_content = """
- id: orders_etl
  source_table: staging.orders
  target_table: prod.orders
  schedule: "@daily"
"""
    path = config_file(yaml_content)
    result = generate_dags(path)
    assert "orders_etl" in result
    assert len(result["orders_etl"].tasks) == 3

def test_generate_dags_multiple(config_file):
    yaml_content = """
- id: orders_etl
  source_table: staging.orders
  target_table: prod.orders
  schedule: "@daily"
- id: customers_etl
  source_table: staging.customers
  target_table: prod.customers
  schedule: "@hourly"
- id: products_etl
  source_table: staging.products
  target_table: prod.products
  schedule: "@weekly"
"""
    path = config_file(yaml_content)
    result = generate_dags(path)
    assert set(result.keys()) == {"orders_etl", "customers_etl", "products_etl"}

def test_generate_dags_malformed_yaml(config_file):
    path = config_file("this: is: not: valid: yaml: at: all")
    import yaml
    with pytest.raises((yaml.YAMLError, ConfigError)):
        generate_dags(path)

def test_generate_dags_not_a_list(config_file):
    """Config должен быть list, не dict."""
    path = config_file("id: x\nsource_table: s\ntarget_table: t\nschedule: '@daily'")
    with pytest.raises(ConfigError, match="must be a list"):
        generate_dags(path)

def test_generate_dags_invalid_dag_raises_with_id(config_file):
    """Error должен указать какой DAG failed."""
    yaml_content = """
- id: good_dag
  source_table: s
  target_table: t
  schedule: "@daily"
- id: bad_dag
  source_table: s
  schedule: "@daily"
"""
    path = config_file(yaml_content)
    with pytest.raises(ConfigError, match="bad_dag.*Missing required"):
        generate_dags(path)

def test_generate_dags_duplicate_ids(config_file):
    """Duplicate dag_id — последний выигрывает, или должен raise?"""
    yaml_content = """
- id: dup
  source_table: s1
  target_table: t1
  schedule: "@daily"
- id: dup
  source_table: s2
  target_table: t2
  schedule: "@daily"
"""
    path = config_file(yaml_content)
    result = generate_dags(path)
    # В нашей реализации — last wins
    assert len(result) == 1
    # Но возможно лучше fail loudly — add explicit check в generate_dags

Test 4 — generated DAGs валидны

После factory все generated DAGs должны проходить DAG validity tests:

def test_generated_dags_have_unique_task_ids():
    """Все generated DAGs должны иметь unique task_ids внутри DAG."""
    configs = [
        {"id": f"dag_{i}", "source_table": f"s_{i}", "target_table": f"t_{i}", "schedule": "@daily"}
        for i in range(10)
    ]
    for cfg in configs:
        dag = make_etl_dag(cfg)
        task_ids = [t.task_id for t in dag.tasks]
        assert len(task_ids) == len(set(task_ids)), f"Duplicate task_ids in {cfg['id']}"

def test_generated_dags_pass_validity_checks():
    """Generated DAGs должны иметь tags, owner, valid start_date."""
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
    dag = make_etl_dag(cfg)

    assert dag.tags
    assert dag.default_args["owner"] != "airflow"  # Not default
    assert dag.start_date.year >= 2024
    assert not dag.catchup  # catchup=False enforced

def test_generated_dag_serializable():
    """Generated DAG must be JSON-serializable."""
    from airflow.serialization.serialized_objects import SerializedDAG
    cfg = {"id": "x", "source_table": "s", "target_table": "t", "schedule": "@daily"}
    dag = make_etl_dag(cfg)
    serialized = SerializedDAG.to_dict(dag)
    SerializedDAG.from_dict(serialized)

Property-based testing с hypothesis

Для глубокой проверки edge cases — hypothesis:

# pip install hypothesis
from hypothesis import given, strategies as st

@given(
    dag_id=st.text(min_size=1, max_size=50,
                   alphabet=st.characters(whitelist_categories=("Ll", "Lu", "Nd"), whitelist_characters="_")),
    source=st.text(min_size=1, max_size=50),
    target=st.text(min_size=1, max_size=50),
    schedule=st.sampled_from(["@daily", "@hourly", "@weekly", "0 0 * * *"]),
)
def test_make_etl_dag_property(dag_id, source, target, schedule):
    """Для любого valid config — make_etl_dag не должна крашиться."""
    cfg = {
        "id": dag_id,
        "source_table": source,
        "target_table": target,
        "schedule": schedule,
    }
    dag = make_etl_dag(cfg)
    assert dag.dag_id == dag_id
    assert len(dag.tasks) == 3

Hypothesis генерирует сотни variations input. Если operator падает на каком-то — hypothesis даёт minimal failing example.


Pitfalls в factory pattern

Production gotchas, которые tests должны ловить:

Pitfall 1 — top-level network call

# ❌ ПЛОХО — выполняется на каждом DAG parse
def load_configs():
    import requests
    return requests.get("https://config-api/etl-configs").json()

for cfg in load_configs():  # network call каждые 30s
    make_etl_dag(cfg)

Test:

def test_no_network_call_in_factory(mocker):
    """Factory не должна делать network calls на module-level."""
    mock_get = mocker.patch("requests.get")
    # Reimport module
    import importlib
    from dags import factory
    importlib.reload(factory)

    # Если был network call — assert fails
    assert mock_get.call_count == 0, "Factory makes network call on import"

Pitfall 2 — mutable default arguments

# ❌ ПЛОХО
def make_dag(cfg, args={"retries": 3}):  # default mutable!
    args["dag_id"] = cfg["id"]  # mutates default

Tests с hypothesis ловят это — multiple calls показывают что state shared.

Pitfall 3 — top-level imports heavy

import tensorflow as tf  # imports 2-3s
import pandas as pd

def make_dag(cfg):
    # ...

Test parse time:

def test_module_import_fast():
    import time
    start = time.time()
    import importlib
    from dags import factory
    importlib.reload(factory)
    assert (time.time() - start) < 1.0, "Factory module imports >1s — heavy top-level imports?"

CI integration

Factory test suite в CI:

# .github/workflows/test-factory.yml
jobs:
  test-factory:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install -r requirements.txt pytest pytest-mock hypothesis

      - name: Unit tests factory logic
        run: pytest tests/unit/test_dag_factory.py -v

      - name: Property-based tests
        run: pytest tests/unit/test_dag_factory_property.py -v --hypothesis-show-statistics

      - name: Validate generated DAGs
        run: |
          # Run DAG validity tests against the actual config
          pytest tests/test_dag_validity.py -v

Production gotchas

globals()[dag_id] = dag_obj обязательно. Airflow находит DAGs только в module-level globals. Если factory возвращает dict but не assign в globals — DAGs не появятся.

Config file path должен быть relative к DAG file. open("config/etl.yaml") относительно cwd, который у DagFileProcessor может быть unexpected. Use Path(__file__).parent / "config" / "etl.yaml".

Unique dag_id mandatory. Если factory создаёт duplicate dag_ids — Airflow logs warning и одна DAG silently overwrites другую. Add assertion в generate_dags чтобы fail loudly.

Closures in @task lambdas. cfg["source_table"] в extract() — это closure. Если cfg mutate-нется между parse и execute — баг. Best practice — copy values в local variables:

def make_etl_dag(cfg):
    source = cfg["source_table"]  # capture now
    target = cfg["target_table"]

    @dag(...)
    def generated_dag():
        @task
        def extract():
            return f"SELECT * FROM {source}"  # uses captured local

Factory size — discipline matters. 10 DAGs из factory — ок. 100 — questionable. 1000 — anti-pattern (parse time, UI overload). При 50+ generated DAGs — задумайтесь, может ли это быть один DAG с Dynamic Task Mapping (модуль 07).


Проверка знанийKnowledge check
Команда настраивает DAG factory который генерирует 30 ETL DAGs из YAML config. На production через 2 дня видят: некоторые DAGs (~5 из 30) пропадают из UI после рандомного DAG processor restart, затем возвращаются. Что происходит и как тестировать чтобы catch?
ОтветAnswer
Это classic factory pattern issue с **mutable shared state**. Возможные причины: (1) `globals()[dag_id] = dag_obj` overwriting — если config содержит duplicate dag_ids, одна DAG silently заменяет другую при каждом parse, в зависимости от order chunks могут показать одну или другую; (2) Top-level network call в `generate_dags` — если REST API возвращает inconsistent results (caching на server side, race condition), различные parses producing разные configs; (3) Mutable default argument bug — `def make_dag(cfg, _cache={}):` shared между calls; (4) Generator function вместо list — если `generate_dags` yields, повторное iteration пустое; (5) Race condition с file IO — два DAG processors одновременно читают config.yaml пока он updating. Tests для catch: (1) **Property test через hypothesis** — generate 30 random configs, run generate_dags() 100 раз, assert deterministic output (set of dag_ids identical); (2) **Test no network calls в module import** — mock requests/urllib + reload factory module + assert call_count == 0; (3) **Test uniqueness of dag_ids** — `assert len(result) == len(unique_ids)` в generate_dags + raise если duplicate; (4) **Test mutability** — generate_dags(); modify returned dict; generate_dags() again — assert second result not affected; (5) **Test no shared mutable state** — два независимых calls должны иметь identical fields на returned DAGs. Fixes: replace top-level network с baked config (commit YAML в Git, no runtime fetching); add `assert len(set(dag_ids)) == len(dag_ids), 'Duplicate dag_ids'`; use local variables вместо closures over cfg; pin Python defaults (`def f(_cache=None): _cache = _cache or {}`). Этот пример показывает: factory pattern requires extra discipline вокруг deterministic generation. One bad mutation = chaos для 30 DAGs одновременно.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. DAG factory генерирует 30 DAGs из YAML config. Critical edge case для test:

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7