Learning Platform
Глоссарий Troubleshooting
Урок 03.02 · 24 мин
Продвинутый
DAGDecoratordefault_argsLifecycle

DAG object anatomy

Когда вы пишете @dag(schedule="@daily", ...), что происходит на самом деле? Это не просто синтаксический сахар. Decorator создаёт factory function, которая инстанцирует airflow.models.dag.DAG object с specific конфигурацией, регистрирует его в module-level globals, и при dag.cli() запускает DAG processor logic для парсинга. Понимание этой механики — ключ к написанию maintainable DAG-ов.

Этот урок препарирует DAG object изнутри: какие поля, как они инициализируются, какие callbacks доступны, какие edge cases возникают при parsing.

Lifecycle DAG object — от .py файла до scheduling
dag.pyPython файл в dags/ folder. Содержит @dag(...) decorator + invocation my_dag(). Файл лежит в filesystem, обычно gitSync sidecar в production K8s.
importlib.import_module()
DagFileProcessor (child process)Multiprocessing child процесс scheduler-а. Importит .py файл, ловит exceptions в import_error table, находит DAG objects в module globals. Запускается каждые min_file_process_interval секунд (default 30s).
DAG.__init__() invoked
DAG object (in memory)airflow.models.dag.DAG instance. Содержит task_dict, schedule resolved в Timetable, default_args applied к tasks, validated structure (no cycles, unique task_ids).
DagBagContainer который держит все DAG objects найденные в processed файлах. Используется для validation и временного in-memory caching перед serialization.
DAG.to_json() + hash
serialized_dag tablePostgreSQL таблица где хранится JSON-представление DAG. После этой записи scheduler, webserver, triggerer работают только с serialized данными, не парсят .py повторно.
scheduler picks up
Scheduler — schedules DagRunsSchedulerJob main loop читает serialized_dag, использует timetable.next_dagrun_info() для определения когда создать следующий DagRun, переводит TaskInstances scheduled → queued.

Anatomy DAG object

airflow.models.dag.DAG — основной класс. Его инстанс хранит:

class DAG:
    # Identity
    dag_id: str                            # Unique ID
    description: str | None
    tags: list[str]

    # Scheduling
    schedule: str | Timetable | None       # cron, timedelta, "@daily", None, Dataset, list[Dataset]
    start_date: datetime
    end_date: datetime | None
    catchup: bool = True                   # Внимание: True by default
    max_active_runs: int = 16              # Concurrency

    # Tasks
    task_dict: dict[str, BaseOperator]     # Все tasks в DAG
    task_group: TaskGroup                  # Root TaskGroup

    # Defaults
    default_args: dict                     # Inheritance к tasks

    # Hooks
    on_success_callback: Callable | None
    on_failure_callback: Callable | None
    sla_miss_callback: Callable | None

    # Behavior
    render_template_as_native_obj: bool = False  # Jinja → Python objects
    user_defined_macros: dict | None
    user_defined_filters: dict | None
    template_searchpath: str | list[str] | None

    # Internals
    fileloc: str                           # Путь к .py файлу
    last_loaded: datetime                  # Когда parsed последний раз
    timetable: Timetable                   # Resolved из schedule

@dag decorator vs прямой DAG()

Современный TaskFlow стиль:

from airflow.decorators import dag, task

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl"],
)
def my_dag():
    @task
    def hello():
        return "world"

    hello()

my_dag()  # ← invocation создаёт DAG object

Под капотом — это эквивалентно классическому стилю:

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id="my_dag"
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl"],
) as dag:
    PythonOperator(
        task_id="hello"
        python_callable=lambda: "world",
    )

Два важных нюанса:

  1. Function name становится dag_id (можно override через dag_id="custom_id" в decorator).
  2. Invocation my_dag() обязателен — без него DAG не зарегистрируется в module globals и DagFileProcessor его не найдёт.

default_args — наследование к tasks

default_args — словарь, который применяется ко всем tasks в DAG (если task не override):

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={
        "owner": "data-team",
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "email_on_failure": True,
        "email": ["[email protected]"],
        "execution_timeout": timedelta(hours=1),
        "sla": timedelta(hours=2),
    },
)
def my_dag():
    @task  # Inherits all default_args
    def light_task(): pass

    @task(retries=0)  # Override только retries
    def heavy_task(): pass

my_dag()

Inheritance применяется во время инстанцирования BaseOperator. Конкретные task args переопределяют default.

default_args inheritance — DAG → Task
DAG.default_argsDict переданный в @dag(default_args=...). Хранится как attribute DAG object. При инстанцировании каждого task используется как baseline.
BaseOperator.__init__ merges
Task A — no overrideTask инстанцируется без custom kwargs для retries/retry_delay. BaseOperator берёт значения из dag.default_args. Финальное состояние: retries=2, retry_delay=5min.
Task B — @task(retries=0)Task явно override retries=0 при декорировании. BaseOperator видит kwarg, использует его вместо default_args значения. Финальное retries=0 — task не будет retry при failure.
Task C — pool='gpu'Task добавляет новый attribute (pool), который не был в default_args. Merge: получает retries=2 от default_args + pool='gpu' от per-task kwarg.
Final TaskInstance attrsПри scheduling эти attributes используются для retry logic, timeout, pool reservation. Сериализуется в serialized_dag JSON для consistency между scheduler и workers.

Common default_args keys

KeyTypeОписание
ownerstrOwner для UI и audit
retriesintСколько повторов при failure
retry_delaytimedeltaЗадержка между retries
retry_exponential_backoffboolЭкспоненциально растущая задержка
max_retry_delaytimedeltaCap для exponential backoff
start_datedatetimeЕсли не в DAG, можно в default_args
email_on_failureboolSMTP notification
email_on_retryboolНа каждый retry
emaillist[str]Recipients
execution_timeouttimedeltaHard timeout для task
slatimedeltaSLA для sla_miss_callback
priority_weightintDefault 1, для priority sorting
poolstrDefault pool
pool_slotsintDefault slots в pool
depends_on_pastboolЗапускать только если previous run success
wait_for_downstreamboolЖдать что downstream предыдущего run завершился
trigger_rulestrall_success / one_success / all_done / …
Python function decorators — что внутри @decorator

Pitfall: mutable default_args

# ❌ ПЛОХО — все DAGs шарят один dict
default_args = {"retries": 3}

@dag(default_args=default_args)
def my_dag(): ...

# Если другой DAG модифицирует default_args.update({"retries": 5}) —
# это аффектит все DAGs (Python mutable default).
# ✅ ХОРОШО — каждый DAG свой dict
@dag(default_args={"retries": 3, "retry_delay": timedelta(minutes=5)})
def my_dag(): ...

Lifecycle hooks

DAG может определить callbacks для key lifecycle events:

def notify_success(context):
    """Called when DagRun succeeds."""
    send_to_slack(f"DAG {context['dag'].dag_id} succeeded")

def notify_failure(context):
    """Called when DagRun fails."""
    send_to_pagerduty(f"DAG {context['dag'].dag_id} failed!")

def check_sla(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Called when SLA missed."""
    log_sla_miss(dag.dag_id, [t.task_id for t in task_list])

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    on_success_callback=notify_success,
    on_failure_callback=notify_failure,
    sla_miss_callback=check_sla,
)
def my_dag(): ...

Также task-level callbacks через default_args или per-task:

  • on_success_callback
  • on_failure_callback
  • on_retry_callback
  • on_execute_callback (before execute)
  • on_skipped_callback (2.10+)

Context в callbacks

Callbacks получают context dict с:

  • dag: DAG object
  • task / task_instance: текущий task
  • dag_run: DagRun object
  • logical_date / execution_date: timestamp
  • data_interval_start / data_interval_end
  • params, var, conn: helper accessors

render_template_as_native_obj

By default Jinja templates рендерятся в строки. Для passing complex Python objects (lists, dicts) между tasks — нужен render_template_as_native_obj=True:

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,  # ← Magic
)
def my_dag():
    @task
    def get_list() -> list[int]:
        return [1, 2, 3, 4, 5]

    @task
    def process(numbers: list[int]):
        # Без native — приходит '[1, 2, 3, 4, 5]' (str)
        # С native — приходит [1, 2, 3, 4, 5] (list)
        return sum(numbers)

    process(get_list())

Это критично для Dynamic Task Mapping (Module 07) — .expand() принимает iterable, который должен быть Python object, не string.


user_defined_macros и user_defined_filters

Если DAG использует Jinja templating с custom logic:

def fiscal_year(execution_date):
    """Возвращает fiscal year для date."""
    return execution_date.year if execution_date.month >= 4 else execution_date.year - 1

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    user_defined_macros={"fiscal_year": fiscal_year},
    user_defined_filters={"upper_case": str.upper},
)
def my_dag():
    PostgresOperator(
        task_id="query"
        sql="SELECT * FROM sales WHERE fy = {{ fiscal_year(execution_date) }} "
            "AND region = '{{ params.region | upper_case }}'"
        params={"region": "europe"},
    )

{{ fiscal_year(execution_date) }} вызовет наш Python function. {{ ... | upper_case }} использует filter.


template_searchpath — внешние SQL/Jinja файлы

Если SQL queries в отдельных файлах:

# dags/queries/orders.sql
SELECT * FROM orders WHERE created_at::date = '{{ ds }}';

# DAG:
@dag(
    template_searchpath=["/opt/airflow/dags/queries"],
)
def my_dag():
    PostgresOperator(
        task_id="extract"
        sql="orders.sql",  # ← путь относительно searchpath
    )

Полезно для держать SQL отдельно от Python код, для версионирования и code review.


Tags и UI organization

@dag(tags=["etl", "finance", "daily"])
def my_dag(): ...

В UI можно фильтровать DAGs по tags. Best practices:

  • team:finance, team:ml, team:platform
  • freq:daily, freq:hourly, freq:weekly
  • priority:critical, priority:low
  • env:production, env:staging

Часто упускаемые поля

params

DAG-level параметры, configurable через UI при manual trigger:

from airflow.models.param import Param

@dag(
    params={
        "source_db": Param("production", type="string", enum=["production", "staging"]),
        "date": Param(default="2026-05-13", type="string", format="date"),
        "batch_size": Param(1000, type="integer", minimum=1, maximum=10000),
    },
)
def my_dag():
    @task
    def use_params(**context):
        source = context["params"]["source_db"]
        date = context["params"]["date"]
        # ...

При manual trigger UI покажет форму для override этих параметров.

is_paused_upon_creation

@dag(is_paused_upon_creation=True)  # DAG создаётся paused — пока вручную не unpause
def critical_dag(): ...

Для critical DAGs, которые не должны запускаться сразу при deploy.

access_control

Per-DAG FAB permissions:

@dag(
    access_control={
        "FinanceTeam": {"can_read", "can_edit"},
        "Op": {"can_read"},
    },
)
def finance_dag(): ...

Module 01 lesson 03 (Webserver) детально про FAB RBAC.

doc_md

Markdown documentation, показывается в UI:

@dag(
    doc_md="""
    # Orders ETL

    Daily extraction of orders from Postgres into Iceberg lakehouse.

    ## Owners
    - Data team (`[email protected]`)

    ## SLAs
    - Must complete by 06:00 UTC
    """,
)
def orders_etl(): ...

Или:

my_dag.doc_md = __doc__  # Использовать module docstring

Edge cases parsing

datetime.now() в DAG body

# ❌ КАТАСТРОФА
@dag(start_date=datetime.now() - timedelta(days=7))
def my_dag(): ...

datetime.now() evaluates на каждом parse — каждые 30 секунд. start_date сдвигается → scheduler не знает что считать “первым run”. Backfill теряет смысл.

Fix:

@dag(start_date=datetime(2024, 1, 1))  # hardcoded
def my_dag(): ...

Variable.get() / Connection.get() в top-level

# ❌ Hit DB / Vault на каждом parse (каждые 30s по default)
db_password = Variable.get("db_password")

@dag(...)
def my_dag():
    @task
    def use_password():
        return db_password  # Closure captures

Fix: move внутрь task:

@dag(...)
def my_dag():
    @task
    def use_password():
        return Variable.get("db_password")  # Evaluated при run, не parse

Heavy imports в top-level

# ❌ Parsing slow
import pandas as pd
import tensorflow as tf  # 2-3 seconds import time

@dag(...)
def my_dag():
    @task
    def task1():
        return pd.DataFrame()

Fix: lazy imports внутри tasks:

@dag(...)
def my_dag():
    @task
    def task1():
        import pandas as pd  # Imported при run, не parse
        return pd.DataFrame()

С 2.7+ есть parsing_pre_import_modules (default True) для pre-import heavy modules в parent process — share между parsing processes через fork.


DAG validation

Airflow валидирует DAG при parse. Errors попадают в import_error table и доступны через:

airflow dags list-import-errors

Что валидируется:

  • dag_id — unique, не пустой, valid characters
  • start_date — must be timezone-aware
  • Граф — нет cycles, валидные dependencies
  • Task IDs — unique внутри DAG
  • schedule — valid cron/timedelta/Timetable

Проверка знанийKnowledge check
DAG имеет default_args = {'retries': 3} и task с @task(retries=0). Что произойдёт при failure этой task?
ОтветAnswer
Task НЕ будет retry. Per-task `retries=0` override-ит default_args. Inheritance работает так: BaseOperator при инстанцировании сначала применяет default_args, потом per-task arguments которые override default. Это design: default_args — это default, per-task — explicit override. Логика этой иерархии важна для design patterns: если DAG-wide retry policy, но один critical task должен fail fast (без retries чтобы alert сработал быстрее) — устанавливаешь retries=0 явно.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. DAG имеет `default_args = {'retries': 3}` и task с `@task(retries=0)`. Что произойдёт при failure этой task?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8