Learning Platform
Глоссарий Troubleshooting
Урок 18.03 · 22 мин
Продвинутый
DAG FactoryDynamic DAGsYAML ConfigAnti-Patterns

DAG factory pattern — генерация из конфига, pitfalls, examples

DAG factory — паттерн где один кусок кода генерирует множество DAGs из конфигурации. Например, 50 ETL pipelines с identical structure, отличающихся только source/target table. Pattern экономит huge amount of duplicate code, но требует discipline — bug в factory мгновенно ломает все generated DAGs.

Этот урок — production-ready factory pattern: structure, validation, common pitfalls (top-level network calls, mutable defaults, non-deterministic generation), real-world examples и interaction с testing (модуль 16.06).


Когда использовать factory pattern

Use caseFactory подходит?
50+ identical ETL pipelinesДа — основной use case
Multiple DAGs varying только schedule/tablesДа
DAGs из multiple namespace/team configsДа
Один DAG с 50 tasksНет — это Dynamic Task Mapping (модуль 07)
Real-time DAG generation на основе runtime eventsНет — это anti-pattern
2-3 DAGs с slight variationsНет — copy-paste чище

Heuristic: если разница между DAGs покрывается несколькими parameters — factory. Если структуры разные — отдельные DAG-файлы.

Class-based декораторы: __init__ + __call__
Factory architecture: configs → factory function → globals() registration
dags/config/etl_dags.yamlCommitted в Git YAML файл. List of dicts с params для каждого DAG: id, source, target, schedule, team, owner, retries. Single source of truth. Если configs приходят из external system — periodic export через separate DAG что commits YAML в Git via CI.
yaml.safe_load (Path(__file__).parent / config)
generate_dags() orchestratorTop-level function: reads YAML, validates list structure (not dict), iterates через configs. Detect duplicates dag_ids — raise ConfigError с list duplicates перед factory call. Catches malformed YAML, missing fields, invalid types eagerly — fail fast с informative error.
for each cfg in configs
validate_config(cfg)Eager per-config validation. Required fields (id, source, target, schedule). Type checks (id is non-empty str). Schedule whitelist (@daily/@hourly/cron via croniter.is_valid). Schema validation через jsonschema или pydantic — catches errors на edit, не runtime.
make_etl_dag(cfg) factoryCapture cfg values в local vars: source = cfg['source']; target = cfg['target']. Избегает closure issues (если cfg mutates где-то). Define @dag(...) с dag_id, schedule, tags, default_args. Inside — @task functions used local captures (not cfg). Return generated() instance.
DAG instance
{dag_id: DAG}Result generate_dags — dict mapping. Deterministic: same configs → same dict each call. Test: call twice with identical input, assert keys identical. Property-based hypothesis test для random configs — invariant: len(result) == len(unique_ids).
CRITICAL: globals() assignment
for dag_id, dag in result.items(): globals()[dag_id] = dagTop-level code в DAG file. Airflow scans module's globals() для DAG instances. Если результат остался в local var (dags = generate_dags()) — DAGs invisible в UI. Common pitfall — забыть это и удивляться где DAGs пропали.
DagFileProcessor parses file
serialized_dag tableКаждый generated DAG сериализуется в serialized_dag row. Scheduler читает оттуда для UI и task scheduling. ВАЖНО: NO top-level network calls в factory — каждый parse (~30s) делал бы HTTP request. API outage → ImportError → ALL generated DAGs пропадают.
UI: 100 DAGs visibleUI отображает все generated DAGs с правильными tags (etl, factory, team). Filtering по team tag показывает только relevant DAGs. С 100+ DAGs UI start to overload — reconsider design (Dynamic Task Mapping вместо factory).

Базовая структура factory

# dags/factories/etl_factory.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator

class ConfigError(Exception):
    pass

def validate_config(cfg: dict) -> None:
    """Validate single DAG config."""
    required = ["id", "source", "target", "schedule"]
    missing = [f for f in required if f not in cfg]
    if missing:
        raise ConfigError(f"DAG '{cfg.get('id', '?')}': missing fields {missing}")

def make_etl_dag(cfg: dict):
    """Factory function — returns DAG из config."""
    validate_config(cfg)

    # Capture в locals — избежать closure issues
    dag_id = cfg["id"]
    source = cfg["source"]
    target = cfg["target"]
    schedule = cfg["schedule"]
    team = cfg.get("team", "data-eng")

    @dag(
        dag_id=dag_id,
        schedule=schedule,
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=["etl", "factory", team],
        default_args={
            "owner": cfg.get("owner", "data-eng"),
            "retries": cfg.get("retries", 3),
            "retry_delay": timedelta(minutes=5),
        },
        max_active_runs=1,
        doc_md=f"Auto-generated ETL: {source}{target}",
    )
    def generated():
        @task
        def extract() -> str:
            return f"SELECT * FROM {source}"

        @task
        def transform(sql: str) -> str:
            return sql + " WHERE active = TRUE"

        @task
        def load(sql: str):
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            hook = PostgresHook(postgres_conn_id="warehouse")
            hook.run(f"""
            MERGE INTO {target} tgt
            USING ({sql}) src ON tgt.id = src.id
            WHEN MATCHED THEN UPDATE SET ...
            WHEN NOT MATCHED THEN INSERT ...;
            """)

        load(transform(extract()))

    return generated()

def generate_dags(config_path: Path = None) -> dict:
    """Generate DAGs из YAML config."""
    if config_path is None:
        config_path = Path(__file__).parent / "../config/etl_dags.yaml"

    with open(config_path) as f:
        configs = yaml.safe_load(f)

    if not isinstance(configs, list):
        raise ConfigError("Config must be list of dicts")

    # Detect duplicates
    ids = [c["id"] for c in configs if "id" in c]
    if len(ids) != len(set(ids)):
        duplicates = [i for i in ids if ids.count(i) > 1]
        raise ConfigError(f"Duplicate dag_ids: {set(duplicates)}")

    return {cfg["id"]: make_etl_dag(cfg) for cfg in configs}

# Top-level — assign generated DAGs в globals
for dag_id, dag_obj in generate_dags().items():
    globals()[dag_id] = dag_obj

И YAML config:

# dags/config/etl_dags.yaml
- id: orders_etl
  source: staging.orders
  target: prod.orders
  schedule: "@daily"
  team: data-eng
  owner: [email protected]

- id: customers_etl
  source: staging.customers
  target: prod.customers
  schedule: "@hourly"
  team: data-eng

- id: finance_revenue
  source: staging.revenue
  target: prod.daily_revenue
  schedule: "0 6 * * *"
  team: finance
  owner: [email protected]
  retries: 5

Pitfall 1: Top-level network calls

Худший анти-паттерн:

# ❌ КАТАСТРОФА
import requests

def load_configs():
    """Делает HTTP call для получения configs."""
    return requests.get("https://config-api.example.com/etl-configs").json()

for cfg in load_configs():  # Network call на каждом parse!
    globals()[cfg["id"]] = make_etl_dag(cfg)

Проблемы:

  • DagFileProcessor parses файл каждые ~30s — 30 network calls per minute per file
  • API outage → ImportError → ALL generated DAGs пропадают из UI
  • Latency parse-а растёт — scheduler tick slow
  • Inconsistent results если API cached / load-balanced

Fix: bake config в Git. Если configs приходят из database — periodic export → committed YAML в Git.

# ✅ ХОРОШО
def load_configs():
    """Read committed YAML — fast, deterministic."""
    config_path = Path(__file__).parent / "../config/etl_dags.yaml"
    with open(config_path) as f:
        return yaml.safe_load(f)

Periodic refresh из external system — отдельный DAG который генерирует YAML и commits в Git via CI:

# dags/maintenance/sync_configs.py
@dag(schedule="@daily", ...)
def sync_etl_configs():
    @task
    def fetch_from_api():
        configs = requests.get(...).json()
        yaml_str = yaml.dump(configs)
        return yaml_str

    @task
    def commit_to_git(yaml_str: str):
        # Write to dags/config/etl_dags.yaml, git commit, git push
        ...

Pitfall 2: Mutable default arguments

# ❌ ПЛОХО
def make_etl_dag(cfg, _stats={"calls": 0}):  # mutable default!
    _stats["calls"] += 1
    print(f"Generated {_stats['calls']} DAGs")
    ...

Python evaluates default value один раз при function definition. Все calls share same dict. Mutation в одном call visible в next.

В factory это catastrophe — state leak между generations, non-deterministic output.

# ✅ ХОРОШО
def make_etl_dag(cfg, _stats=None):
    _stats = _stats or {"calls": 0}
    ...

Test для catch (модуль 16.06):

def test_make_etl_dag_no_state_leak(mocker):
    # Multiple invocations должны быть independent
    dag1 = make_etl_dag({"id": "a", ...})
    dag2 = make_etl_dag({"id": "b", ...})
    # Здесь не должно быть state передачи между calls

Pitfall 3: Closures over cfg

# ❌ ПЛОХО — closure over cfg может ломаться
def make_etl_dag(cfg):
    @dag(dag_id=cfg["id"], ...)
    def generated():
        @task
        def extract():
            # Closure! Если cfg mutate-ется where else — баг
            return f"SELECT * FROM {cfg['source']}"
        extract()
    return generated()

Если позже кто-то делает cfg['source'] = 'new_table' — closure видит updated value. В parsing context это normally не происходит, но subtle bug возможен.

# ✅ ХОРОШО — capture в local vars
def make_etl_dag(cfg):
    source = cfg["source"]  # Capture now
    target = cfg["target"]

    @dag(...)
    def generated():
        @task
        def extract():
            return f"SELECT * FROM {source}"  # Uses local capture
        extract()
    return generated()

Pitfall 4: globals() forgotten

# ❌ ПЛОХО — DAGs НЕ появятся в UI
def generate_dags():
    return [make_etl_dag(cfg) for cfg in configs]

dags = generate_dags()  # Локальная variable!

Airflow scans module’s globals() для DAG instances. Если DAGs в local variable — invisible.

# ✅ ХОРОШО
for dag_id, dag in generate_dags().items():
    globals()[dag_id] = dag

Или используйте ** для assignment:

# Альтернативный pattern
for cfg in load_configs():
    dag = make_etl_dag(cfg)
    globals()[dag.dag_id] = dag

Pitfall 5: Duplicate dag_ids

# Если config содержит:
# - id: dag_a
# - id: dag_a   # duplicate!

# Без проверки — last wins (silent overwrite!)
for cfg in configs:
    globals()[cfg["id"]] = make_etl_dag(cfg)
# Second dag_a overwrites first — у второго другой config!

Always validate:

def generate_dags():
    configs = load_configs()
    ids = [c["id"] for c in configs]
    duplicates = {i for i in ids if ids.count(i) > 1}
    if duplicates:
        raise ConfigError(f"Duplicate dag_ids: {duplicates}")
    return {cfg["id"]: make_etl_dag(cfg) for cfg in configs}

Pitfall 6: Heavy parse time

Factory который делает heavy work во время parsing — замедляет scheduler.

# ❌ ПЛОХО
def make_etl_dag(cfg):
    import tensorflow as tf  # 2-3s import per DAG!
    # Use TF for schema inference (top-level)
    schema = tf.estimator.classifier(...).get_feature_columns()
    @dag(...)
    def generated():
        ...

tensorflow import на module level — каждый DAG parse 2-3s. С 50 generated DAGs — 100-150s parse time. Scheduler tick делает import errors warning.

Fix: lazy imports внутри tasks:

def make_etl_dag(cfg):
    @dag(...)
    def generated():
        @task
        def use_tf():
            import tensorflow as tf  # Import только при run
            ...
        use_tf()
    return generated()

Pitfall 7: Validation отсутствует

Factory без validation = silent failures на malformed configs.

# Validate eagerly — fail fast
def validate_config(cfg: dict) -> None:
    required = ["id", "source", "target", "schedule"]
    missing = [f for f in required if f not in cfg]
    if missing:
        raise ConfigError(f"DAG '{cfg.get('id', '?')}': missing {missing}")

    if not isinstance(cfg["id"], str) or not cfg["id"]:
        raise ConfigError(f"Invalid id: {cfg['id']!r}")

    # Validate schedule (basic)
    valid_schedules = {"@daily", "@hourly", "@weekly", "@monthly"}
    if cfg["schedule"] not in valid_schedules:
        # Try cron parsing
        from croniter import croniter
        if not croniter.is_valid(cfg["schedule"]):
            raise ConfigError(f"Invalid schedule: {cfg['schedule']!r}")

С validation: malformed config → loud ImportError с указанием dag_id. Without — DAG silently invalid в UI.


Pitfall 8: Factory size grows linearly

50 DAGs — OK. 500 — questionable. 5000 — clearly anti-pattern.

# DAGsStatusПодход
1-50OKFactory pattern
50-200OK с disciplineFactory + careful testing
200-1000QuestionableПодумать о Dynamic Task Mapping
1000+Anti-patternНе factory — нужен другой design

С 1000+ generated DAGs:

  • UI overload (DAG list scroll forever)
  • Parse time 5-10 минут
  • Operational overhead — кто owns каждый?
  • alerting noise

При большом scale пересмотрите дизайн: один DAG с Dynamic Task Mapping (модуль 07) может обработать N items без N DAGs.


Production examples

Example 1: Per-tenant ETLs

- id: tenant_acme_etl
  source: tenant_acme.raw_events
  target: tenant_acme.processed_events
  schedule: "@hourly"
  team: tenant-services

- id: tenant_globex_etl
  source: tenant_globex.raw_events
  target: tenant_globex.processed_events
  schedule: "@hourly"
  team: tenant-services

20-100 tenants × identical structure → factory perfect fit.

Example 2: Per-source ingestion

- id: ingest_postgres_orders
  source_type: postgres
  source_conn: postgres_orders
  source_query: "SELECT * FROM orders WHERE updated_at > '{{ data_interval_start }}'"
  target: raw.orders

- id: ingest_kafka_events
  source_type: kafka
  source_conn: kafka_events
  source_topic: events
  target: raw.events

Каждый source = один generated DAG.

Example 3: Per-metric aggregations

- id: agg_daily_revenue
  metric: revenue
  granularity: daily
  source: prod.orders
  target: prod.revenue_daily

- id: agg_hourly_signups
  metric: signups
  granularity: hourly
  source: prod.users
  target: prod.signups_hourly

Factory with parametrized TaskGroups

Для более сложных flows используйте parameterized TaskGroups:

from airflow.utils.task_group import TaskGroup

def make_etl_with_validation(cfg):
    @dag(dag_id=cfg["id"], ...)
    def generated():
        @task
        def extract(): ...

        with TaskGroup(group_id="validate") as validation:
            @task
            def check_row_count(): ...

            @task
            def check_schema(): ...

            check_row_count() >> check_schema()

        @task
        def load(): ...

        extract() >> validation >> load()

    return generated()

Production gotchas

Config file path — relative к DAG file. open("config/etl.yaml") использует cwd which для DagFileProcessor может быть unexpected. Use Path(__file__).parent / "config" / "etl.yaml".

Caching configs in memory. @functools.lru_cache на load_configs — может cache stale data между parsings. Avoid — каждый parse reads fresh file (it’s local file, fast).

Generated DAGs не должны cross-reference. DAG A trigger DAG B — OK, но если B имеет hard dependency на A’s exact code — debugging кошмар. Use Datasets (модуль 08) для loose coupling.

Team ownership в tags. tags=["factory", team] — упрощает фильтрацию в UI per team. Plus per-DAG access_control (модуль 15.08) для real isolation.

Document factory. README в dags/factories/ объясняющий какой YAML schema, как добавить DAG, какие validation rules. Future-self благодарит.


Проверка знанийKnowledge check
Production используется DAG factory который генерирует 100 DAGs из YAML. За 6 месяцев discovered несколько issues: некоторые duplicate dag_ids (silent overwrite), один DAG имел top-level requests.get что timed out → все DAGs пропали из UI на 15 минут, debug factory bugs тяжёлый. Что должно быть в production-grade factory чтобы избежать этих pitfalls?
ОтветAnswer
Production-grade factory checklist: (1) **Eager validation** в generate_dags(): assert no duplicate dag_ids (raise ConfigError с list duplicates), validate каждый cfg fully (missing required fields, invalid types, invalid schedule via croniter); fail fast с informative error. (2) **No top-level I/O** beyond reading local YAML file. NO network calls, NO database queries, NO third-party imports requiring auth. Если configs приходят из external system — separate sync DAG который exports в Git-committed YAML. (3) **Local variable captures** instead closures over cfg: `source = cfg['source']` перед @dag definition, use `source` в task bodies. (4) **No mutable defaults**: `def make_dag(cfg, _opts=None): _opts = _opts or {}`. (5) **Heavy imports — lazy внутри tasks**: tensorflow/pandas/heavy libs import inside task functions, не module level. Parse time остаётся <100ms per DAG. (6) **Comprehensive test suite** (модуль 16.06): unit tests для validate_config (pytest.mark.parametrize для edge cases), test_make_etl_dag produces correct structure, test_no_network_call (mock requests + reload module + assert call_count == 0), property-based test через hypothesis для random configs, test deterministic output (call twice — identical result). (7) **Schema validation для YAML** через jsonschema/pydantic — отлавливает schema errors на edit, не runtime. (8) **`globals()` assignment explicit** — `for id, dag in generate_dags().items(): globals()[id] = dag` — НЕ `dags = generate_dags()` (local var, invisible). (9) **Documentation**: README с YAML schema, examples, troubleshooting. (10) **Monitoring**: alert на airflow.dag_processing.import_errors > 0 — instant catch если parse breaks. С этим setup factory становится reliable foundation; без него — accident waiting to happen на 100+ DAGs scale. Сравнение: один день на правильную factory setup vs недели incident response на bad factory.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Factory делает requests.get('https://config-api/...') в top-level — почему катастрофа?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8