DAG object anatomy
Когда вы пишете @dag(schedule="@daily", ...), что происходит на самом деле? Это не просто синтаксический сахар. Decorator создаёт factory function, которая инстанцирует airflow.models.dag.DAG object с specific конфигурацией, регистрирует его в module-level globals, и при dag.cli() запускает DAG processor logic для парсинга. Понимание этой механики — ключ к написанию maintainable DAG-ов.
Этот урок препарирует DAG object изнутри: какие поля, как они инициализируются, какие callbacks доступны, какие edge cases возникают при parsing.
Anatomy DAG object
airflow.models.dag.DAG — основной класс. Его инстанс хранит:
class DAG:
# Identity
dag_id: str # Unique ID
description: str | None
tags: list[str]
# Scheduling
schedule: str | Timetable | None # cron, timedelta, "@daily", None, Dataset, list[Dataset]
start_date: datetime
end_date: datetime | None
catchup: bool = True # Внимание: True by default
max_active_runs: int = 16 # Concurrency
# Tasks
task_dict: dict[str, BaseOperator] # Все tasks в DAG
task_group: TaskGroup # Root TaskGroup
# Defaults
default_args: dict # Inheritance к tasks
# Hooks
on_success_callback: Callable | None
on_failure_callback: Callable | None
sla_miss_callback: Callable | None
# Behavior
render_template_as_native_obj: bool = False # Jinja → Python objects
user_defined_macros: dict | None
user_defined_filters: dict | None
template_searchpath: str | list[str] | None
# Internals
fileloc: str # Путь к .py файлу
last_loaded: datetime # Когда parsed последний раз
timetable: Timetable # Resolved из schedule
@dag decorator vs прямой DAG()
Современный TaskFlow стиль:
from airflow.decorators import dag, task
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl"],
)
def my_dag():
@task
def hello():
return "world"
hello()
my_dag() # ← invocation создаёт DAG object
Под капотом — это эквивалентно классическому стилю:
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id="my_dag"
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl"],
) as dag:
PythonOperator(
task_id="hello"
python_callable=lambda: "world",
)
Два важных нюанса:
- Function name становится
dag_id(можно override черезdag_id="custom_id"в decorator). - Invocation
my_dag()обязателен — без него DAG не зарегистрируется в module globals и DagFileProcessor его не найдёт.
default_args — наследование к tasks
default_args — словарь, который применяется ко всем tasks в DAG (если task не override):
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["[email protected]"],
"execution_timeout": timedelta(hours=1),
"sla": timedelta(hours=2),
},
)
def my_dag():
@task # Inherits all default_args
def light_task(): pass
@task(retries=0) # Override только retries
def heavy_task(): pass
my_dag()
Inheritance применяется во время инстанцирования BaseOperator. Конкретные task args переопределяют default.
Common default_args keys
| Key | Type | Описание |
|---|---|---|
owner | str | Owner для UI и audit |
retries | int | Сколько повторов при failure |
retry_delay | timedelta | Задержка между retries |
retry_exponential_backoff | bool | Экспоненциально растущая задержка |
max_retry_delay | timedelta | Cap для exponential backoff |
start_date | datetime | Если не в DAG, можно в default_args |
email_on_failure | bool | SMTP notification |
email_on_retry | bool | На каждый retry |
email | list[str] | Recipients |
execution_timeout | timedelta | Hard timeout для task |
sla | timedelta | SLA для sla_miss_callback |
priority_weight | int | Default 1, для priority sorting |
pool | str | Default pool |
pool_slots | int | Default slots в pool |
depends_on_past | bool | Запускать только если previous run success |
wait_for_downstream | bool | Ждать что downstream предыдущего run завершился |
trigger_rule | str | all_success / one_success / all_done / … |
Pitfall: mutable default_args
# ❌ ПЛОХО — все DAGs шарят один dict
default_args = {"retries": 3}
@dag(default_args=default_args)
def my_dag(): ...
# Если другой DAG модифицирует default_args.update({"retries": 5}) —
# это аффектит все DAGs (Python mutable default).
# ✅ ХОРОШО — каждый DAG свой dict
@dag(default_args={"retries": 3, "retry_delay": timedelta(minutes=5)})
def my_dag(): ...
Lifecycle hooks
DAG может определить callbacks для key lifecycle events:
def notify_success(context):
"""Called when DagRun succeeds."""
send_to_slack(f"DAG {context['dag'].dag_id} succeeded")
def notify_failure(context):
"""Called when DagRun fails."""
send_to_pagerduty(f"DAG {context['dag'].dag_id} failed!")
def check_sla(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when SLA missed."""
log_sla_miss(dag.dag_id, [t.task_id for t in task_list])
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
on_success_callback=notify_success,
on_failure_callback=notify_failure,
sla_miss_callback=check_sla,
)
def my_dag(): ...
Также task-level callbacks через default_args или per-task:
on_success_callbackon_failure_callbackon_retry_callbackon_execute_callback(before execute)on_skipped_callback(2.10+)
Context в callbacks
Callbacks получают context dict с:
dag: DAG objecttask/task_instance: текущий taskdag_run: DagRun objectlogical_date/execution_date: timestampdata_interval_start/data_interval_endparams,var,conn: helper accessors
render_template_as_native_obj
By default Jinja templates рендерятся в строки. Для passing complex Python objects (lists, dicts) между tasks — нужен render_template_as_native_obj=True:
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
render_template_as_native_obj=True, # ← Magic
)
def my_dag():
@task
def get_list() -> list[int]:
return [1, 2, 3, 4, 5]
@task
def process(numbers: list[int]):
# Без native — приходит '[1, 2, 3, 4, 5]' (str)
# С native — приходит [1, 2, 3, 4, 5] (list)
return sum(numbers)
process(get_list())
Это критично для Dynamic Task Mapping (Module 07) — .expand() принимает iterable, который должен быть Python object, не string.
user_defined_macros и user_defined_filters
Если DAG использует Jinja templating с custom logic:
def fiscal_year(execution_date):
"""Возвращает fiscal year для date."""
return execution_date.year if execution_date.month >= 4 else execution_date.year - 1
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
user_defined_macros={"fiscal_year": fiscal_year},
user_defined_filters={"upper_case": str.upper},
)
def my_dag():
PostgresOperator(
task_id="query"
sql="SELECT * FROM sales WHERE fy = {{ fiscal_year(execution_date) }} "
"AND region = '{{ params.region | upper_case }}'"
params={"region": "europe"},
)
{{ fiscal_year(execution_date) }} вызовет наш Python function. {{ ... | upper_case }} использует filter.
template_searchpath — внешние SQL/Jinja файлы
Если SQL queries в отдельных файлах:
# dags/queries/orders.sql
SELECT * FROM orders WHERE created_at::date = '{{ ds }}';
# DAG:
@dag(
template_searchpath=["/opt/airflow/dags/queries"],
)
def my_dag():
PostgresOperator(
task_id="extract"
sql="orders.sql", # ← путь относительно searchpath
)
Полезно для держать SQL отдельно от Python код, для версионирования и code review.
Tags и UI organization
@dag(tags=["etl", "finance", "daily"])
def my_dag(): ...
В UI можно фильтровать DAGs по tags. Best practices:
team:finance,team:ml,team:platformfreq:daily,freq:hourly,freq:weeklypriority:critical,priority:lowenv:production,env:staging
Часто упускаемые поля
params
DAG-level параметры, configurable через UI при manual trigger:
from airflow.models.param import Param
@dag(
params={
"source_db": Param("production", type="string", enum=["production", "staging"]),
"date": Param(default="2026-05-13", type="string", format="date"),
"batch_size": Param(1000, type="integer", minimum=1, maximum=10000),
},
)
def my_dag():
@task
def use_params(**context):
source = context["params"]["source_db"]
date = context["params"]["date"]
# ...
При manual trigger UI покажет форму для override этих параметров.
is_paused_upon_creation
@dag(is_paused_upon_creation=True) # DAG создаётся paused — пока вручную не unpause
def critical_dag(): ...
Для critical DAGs, которые не должны запускаться сразу при deploy.
access_control
Per-DAG FAB permissions:
@dag(
access_control={
"FinanceTeam": {"can_read", "can_edit"},
"Op": {"can_read"},
},
)
def finance_dag(): ...
Module 01 lesson 03 (Webserver) детально про FAB RBAC.
doc_md
Markdown documentation, показывается в UI:
@dag(
doc_md="""
# Orders ETL
Daily extraction of orders from Postgres into Iceberg lakehouse.
## Owners
- Data team (`[email protected]`)
## SLAs
- Must complete by 06:00 UTC
""",
)
def orders_etl(): ...
Или:
my_dag.doc_md = __doc__ # Использовать module docstring
Edge cases parsing
datetime.now() в DAG body
# ❌ КАТАСТРОФА
@dag(start_date=datetime.now() - timedelta(days=7))
def my_dag(): ...
datetime.now() evaluates на каждом parse — каждые 30 секунд. start_date сдвигается → scheduler не знает что считать “первым run”. Backfill теряет смысл.
Fix:
@dag(start_date=datetime(2024, 1, 1)) # hardcoded
def my_dag(): ...
Variable.get() / Connection.get() в top-level
# ❌ Hit DB / Vault на каждом parse (каждые 30s по default)
db_password = Variable.get("db_password")
@dag(...)
def my_dag():
@task
def use_password():
return db_password # Closure captures
Fix: move внутрь task:
@dag(...)
def my_dag():
@task
def use_password():
return Variable.get("db_password") # Evaluated при run, не parse
Heavy imports в top-level
# ❌ Parsing slow
import pandas as pd
import tensorflow as tf # 2-3 seconds import time
@dag(...)
def my_dag():
@task
def task1():
return pd.DataFrame()
Fix: lazy imports внутри tasks:
@dag(...)
def my_dag():
@task
def task1():
import pandas as pd # Imported при run, не parse
return pd.DataFrame()
С 2.7+ есть parsing_pre_import_modules (default True) для pre-import heavy modules в parent process — share между parsing processes через fork.
DAG validation
Airflow валидирует DAG при parse. Errors попадают в import_error table и доступны через:
airflow dags list-import-errors
Что валидируется:
dag_id— unique, не пустой, valid charactersstart_date— must be timezone-aware- Граф — нет cycles, валидные dependencies
- Task IDs — unique внутри DAG
schedule— valid cron/timedelta/Timetable