DAG factory pattern — генерация из конфига, pitfalls, examples
DAG factory — паттерн где один кусок кода генерирует множество DAGs из конфигурации. Например, 50 ETL pipelines с identical structure, отличающихся только source/target table. Pattern экономит huge amount of duplicate code, но требует discipline — bug в factory мгновенно ломает все generated DAGs.
Этот урок — production-ready factory pattern: structure, validation, common pitfalls (top-level network calls, mutable defaults, non-deterministic generation), real-world examples и interaction с testing (модуль 16.06).
Когда использовать factory pattern
| Use case | Factory подходит? |
|---|---|
| 50+ identical ETL pipelines | Да — основной use case |
| Multiple DAGs varying только schedule/tables | Да |
| DAGs из multiple namespace/team configs | Да |
| Один DAG с 50 tasks | Нет — это Dynamic Task Mapping (модуль 07) |
| Real-time DAG generation на основе runtime events | Нет — это anti-pattern |
| 2-3 DAGs с slight variations | Нет — copy-paste чище |
Heuristic: если разница между DAGs покрывается несколькими parameters — factory. Если структуры разные — отдельные DAG-файлы.
Class-based декораторы: __init__ + __call__Базовая структура factory
# dags/factories/etl_factory.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
class ConfigError(Exception):
pass
def validate_config(cfg: dict) -> None:
"""Validate single DAG config."""
required = ["id", "source", "target", "schedule"]
missing = [f for f in required if f not in cfg]
if missing:
raise ConfigError(f"DAG '{cfg.get('id', '?')}': missing fields {missing}")
def make_etl_dag(cfg: dict):
"""Factory function — returns DAG из config."""
validate_config(cfg)
# Capture в locals — избежать closure issues
dag_id = cfg["id"]
source = cfg["source"]
target = cfg["target"]
schedule = cfg["schedule"]
team = cfg.get("team", "data-eng")
@dag(
dag_id=dag_id,
schedule=schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "factory", team],
default_args={
"owner": cfg.get("owner", "data-eng"),
"retries": cfg.get("retries", 3),
"retry_delay": timedelta(minutes=5),
},
max_active_runs=1,
doc_md=f"Auto-generated ETL: {source} → {target}",
)
def generated():
@task
def extract() -> str:
return f"SELECT * FROM {source}"
@task
def transform(sql: str) -> str:
return sql + " WHERE active = TRUE"
@task
def load(sql: str):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
hook.run(f"""
MERGE INTO {target} tgt
USING ({sql}) src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
""")
load(transform(extract()))
return generated()
def generate_dags(config_path: Path = None) -> dict:
"""Generate DAGs из YAML config."""
if config_path is None:
config_path = Path(__file__).parent / "../config/etl_dags.yaml"
with open(config_path) as f:
configs = yaml.safe_load(f)
if not isinstance(configs, list):
raise ConfigError("Config must be list of dicts")
# Detect duplicates
ids = [c["id"] for c in configs if "id" in c]
if len(ids) != len(set(ids)):
duplicates = [i for i in ids if ids.count(i) > 1]
raise ConfigError(f"Duplicate dag_ids: {set(duplicates)}")
return {cfg["id"]: make_etl_dag(cfg) for cfg in configs}
# Top-level — assign generated DAGs в globals
for dag_id, dag_obj in generate_dags().items():
globals()[dag_id] = dag_obj
И YAML config:
# dags/config/etl_dags.yaml
- id: orders_etl
source: staging.orders
target: prod.orders
schedule: "@daily"
team: data-eng
owner: [email protected]
- id: customers_etl
source: staging.customers
target: prod.customers
schedule: "@hourly"
team: data-eng
- id: finance_revenue
source: staging.revenue
target: prod.daily_revenue
schedule: "0 6 * * *"
team: finance
owner: [email protected]
retries: 5
Pitfall 1: Top-level network calls
Худший анти-паттерн:
# ❌ КАТАСТРОФА
import requests
def load_configs():
"""Делает HTTP call для получения configs."""
return requests.get("https://config-api.example.com/etl-configs").json()
for cfg in load_configs(): # Network call на каждом parse!
globals()[cfg["id"]] = make_etl_dag(cfg)
Проблемы:
- DagFileProcessor parses файл каждые ~30s — 30 network calls per minute per file
- API outage → ImportError → ALL generated DAGs пропадают из UI
- Latency parse-а растёт — scheduler tick slow
- Inconsistent results если API cached / load-balanced
Fix: bake config в Git. Если configs приходят из database — periodic export → committed YAML в Git.
# ✅ ХОРОШО
def load_configs():
"""Read committed YAML — fast, deterministic."""
config_path = Path(__file__).parent / "../config/etl_dags.yaml"
with open(config_path) as f:
return yaml.safe_load(f)
Periodic refresh из external system — отдельный DAG который генерирует YAML и commits в Git via CI:
# dags/maintenance/sync_configs.py
@dag(schedule="@daily", ...)
def sync_etl_configs():
@task
def fetch_from_api():
configs = requests.get(...).json()
yaml_str = yaml.dump(configs)
return yaml_str
@task
def commit_to_git(yaml_str: str):
# Write to dags/config/etl_dags.yaml, git commit, git push
...
Pitfall 2: Mutable default arguments
# ❌ ПЛОХО
def make_etl_dag(cfg, _stats={"calls": 0}): # mutable default!
_stats["calls"] += 1
print(f"Generated {_stats['calls']} DAGs")
...
Python evaluates default value один раз при function definition. Все calls share same dict. Mutation в одном call visible в next.
В factory это catastrophe — state leak между generations, non-deterministic output.
# ✅ ХОРОШО
def make_etl_dag(cfg, _stats=None):
_stats = _stats or {"calls": 0}
...
Test для catch (модуль 16.06):
def test_make_etl_dag_no_state_leak(mocker):
# Multiple invocations должны быть independent
dag1 = make_etl_dag({"id": "a", ...})
dag2 = make_etl_dag({"id": "b", ...})
# Здесь не должно быть state передачи между calls
Pitfall 3: Closures over cfg
# ❌ ПЛОХО — closure over cfg может ломаться
def make_etl_dag(cfg):
@dag(dag_id=cfg["id"], ...)
def generated():
@task
def extract():
# Closure! Если cfg mutate-ется where else — баг
return f"SELECT * FROM {cfg['source']}"
extract()
return generated()
Если позже кто-то делает cfg['source'] = 'new_table' — closure видит updated value. В parsing context это normally не происходит, но subtle bug возможен.
# ✅ ХОРОШО — capture в local vars
def make_etl_dag(cfg):
source = cfg["source"] # Capture now
target = cfg["target"]
@dag(...)
def generated():
@task
def extract():
return f"SELECT * FROM {source}" # Uses local capture
extract()
return generated()
Pitfall 4: globals() forgotten
# ❌ ПЛОХО — DAGs НЕ появятся в UI
def generate_dags():
return [make_etl_dag(cfg) for cfg in configs]
dags = generate_dags() # Локальная variable!
Airflow scans module’s globals() для DAG instances. Если DAGs в local variable — invisible.
# ✅ ХОРОШО
for dag_id, dag in generate_dags().items():
globals()[dag_id] = dag
Или используйте ** для assignment:
# Альтернативный pattern
for cfg in load_configs():
dag = make_etl_dag(cfg)
globals()[dag.dag_id] = dag
Pitfall 5: Duplicate dag_ids
# Если config содержит:
# - id: dag_a
# - id: dag_a # duplicate!
# Без проверки — last wins (silent overwrite!)
for cfg in configs:
globals()[cfg["id"]] = make_etl_dag(cfg)
# Second dag_a overwrites first — у второго другой config!
Always validate:
def generate_dags():
configs = load_configs()
ids = [c["id"] for c in configs]
duplicates = {i for i in ids if ids.count(i) > 1}
if duplicates:
raise ConfigError(f"Duplicate dag_ids: {duplicates}")
return {cfg["id"]: make_etl_dag(cfg) for cfg in configs}
Pitfall 6: Heavy parse time
Factory который делает heavy work во время parsing — замедляет scheduler.
# ❌ ПЛОХО
def make_etl_dag(cfg):
import tensorflow as tf # 2-3s import per DAG!
# Use TF for schema inference (top-level)
schema = tf.estimator.classifier(...).get_feature_columns()
@dag(...)
def generated():
...
tensorflow import на module level — каждый DAG parse 2-3s. С 50 generated DAGs — 100-150s parse time. Scheduler tick делает import errors warning.
Fix: lazy imports внутри tasks:
def make_etl_dag(cfg):
@dag(...)
def generated():
@task
def use_tf():
import tensorflow as tf # Import только при run
...
use_tf()
return generated()
Pitfall 7: Validation отсутствует
Factory без validation = silent failures на malformed configs.
# Validate eagerly — fail fast
def validate_config(cfg: dict) -> None:
required = ["id", "source", "target", "schedule"]
missing = [f for f in required if f not in cfg]
if missing:
raise ConfigError(f"DAG '{cfg.get('id', '?')}': missing {missing}")
if not isinstance(cfg["id"], str) or not cfg["id"]:
raise ConfigError(f"Invalid id: {cfg['id']!r}")
# Validate schedule (basic)
valid_schedules = {"@daily", "@hourly", "@weekly", "@monthly"}
if cfg["schedule"] not in valid_schedules:
# Try cron parsing
from croniter import croniter
if not croniter.is_valid(cfg["schedule"]):
raise ConfigError(f"Invalid schedule: {cfg['schedule']!r}")
С validation: malformed config → loud ImportError с указанием dag_id. Without — DAG silently invalid в UI.
Pitfall 8: Factory size grows linearly
50 DAGs — OK. 500 — questionable. 5000 — clearly anti-pattern.
| # DAGs | Status | Подход |
|---|---|---|
| 1-50 | OK | Factory pattern |
| 50-200 | OK с discipline | Factory + careful testing |
| 200-1000 | Questionable | Подумать о Dynamic Task Mapping |
| 1000+ | Anti-pattern | Не factory — нужен другой design |
С 1000+ generated DAGs:
- UI overload (DAG list scroll forever)
- Parse time 5-10 минут
- Operational overhead — кто owns каждый?
- alerting noise
При большом scale пересмотрите дизайн: один DAG с Dynamic Task Mapping (модуль 07) может обработать N items без N DAGs.
Production examples
Example 1: Per-tenant ETLs
- id: tenant_acme_etl
source: tenant_acme.raw_events
target: tenant_acme.processed_events
schedule: "@hourly"
team: tenant-services
- id: tenant_globex_etl
source: tenant_globex.raw_events
target: tenant_globex.processed_events
schedule: "@hourly"
team: tenant-services
20-100 tenants × identical structure → factory perfect fit.
Example 2: Per-source ingestion
- id: ingest_postgres_orders
source_type: postgres
source_conn: postgres_orders
source_query: "SELECT * FROM orders WHERE updated_at > '{{ data_interval_start }}'"
target: raw.orders
- id: ingest_kafka_events
source_type: kafka
source_conn: kafka_events
source_topic: events
target: raw.events
Каждый source = один generated DAG.
Example 3: Per-metric aggregations
- id: agg_daily_revenue
metric: revenue
granularity: daily
source: prod.orders
target: prod.revenue_daily
- id: agg_hourly_signups
metric: signups
granularity: hourly
source: prod.users
target: prod.signups_hourly
Factory with parametrized TaskGroups
Для более сложных flows используйте parameterized TaskGroups:
from airflow.utils.task_group import TaskGroup
def make_etl_with_validation(cfg):
@dag(dag_id=cfg["id"], ...)
def generated():
@task
def extract(): ...
with TaskGroup(group_id="validate") as validation:
@task
def check_row_count(): ...
@task
def check_schema(): ...
check_row_count() >> check_schema()
@task
def load(): ...
extract() >> validation >> load()
return generated()
Production gotchas
Config file path — relative к DAG file. open("config/etl.yaml") использует cwd which для DagFileProcessor может быть unexpected. Use Path(__file__).parent / "config" / "etl.yaml".
Caching configs in memory. @functools.lru_cache на load_configs — может cache stale data между parsings. Avoid — каждый parse reads fresh file (it’s local file, fast).
Generated DAGs не должны cross-reference. DAG A trigger DAG B — OK, но если B имеет hard dependency на A’s exact code — debugging кошмар. Use Datasets (модуль 08) для loose coupling.
Team ownership в tags. tags=["factory", team] — упрощает фильтрацию в UI per team. Plus per-DAG access_control (модуль 15.08) для real isolation.
Document factory. README в dags/factories/ объясняющий какой YAML schema, как добавить DAG, какие validation rules. Future-self благодарит.