Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 26 мин
Продвинутый
DagRunState machineLifecycledata_intervalrun_type

DagRun lifecycle и state machine

DagRun — это central object Airflow scheduling. Каждый запуск DAG (по schedule, manual trigger, dataset event, backfill CLI) рождает новую строку в таблице dag_run. Этот объект проходит через простую с виду state machine из четырёх состояний, но что именно вызывает каждый переход, кто его делает и в какой момент main loop — детали, которые определяют поведение всего scheduler-а.

Главный класс — airflow.models.dagrun.DagRun. В 2.x он живёт в airflow/models/dagrun.py. Переходы между state-ами инициируются не самим DagRun-ом, а scheduler-ом в фазах 1-2 main loop (см. предыдущие уроки).


Четыре state DagRun-а

В 2.x DagRunState (из airflow.utils.state) имеет ровно четыре терминала жизни:

StateЧто означаетКто переводит
queuedDagRun создан, scheduler знает что запускать, но scheduling ещё не началсяScheduler фаза 1 (create_dagruns_for_dags)
runningХотя бы один TI переведён в state ≠ none/removed/upstream_failedScheduler фаза 2 (schedule_dag_runs)
successВсе TI в terminal state, ни один не failed/upstream_failedScheduler фаза 2 (update_state)
failedВсе TI в terminal state, хотя бы один failed/upstream_failedScheduler фаза 2 (update_state)

Дополнительно существует pseudo-state none для случая, когда DagRun ещё не создан (используется только как input для timetable.next_dagrun_info).

Состояние restarting есть у TaskInstance, но не у DagRun — DagRun не имеет concept “restart middle of life”, только terminal verdict.


State machine — visualization

DagRun state machine — главная цепочка
(nothing)DagRun ещё не существует. Scheduler фаза 1 готов создать его, когда next_dagrun_create_after ≤ now() или есть dataset_dag_run_queue entry с готовыми assets.
scheduler creates row
queuedDagRun row создана в dag_run table. Scheduler планирует обработать её на следующей итерации фазы 2. Tasks ещё не scheduled — они в state none.
первый TI → scheduled
runningХотя бы один TI прошёл фазу 2 и стал scheduled. Scheduler обновляет dag_run.state=running и устанавливает start_date=now(). На этом этапе DagRun считается активным.
все TI terminal
successВсе TI в success/skipped (или success/skipped с trigger_rule allowing failures). Scheduler устанавливает end_date=now(), эмитит DagRunStateChanged event, удаляет связанные queue entries.
failedВсе TI terminal, хотя бы один failed или upstream_failed без mitigation через trigger_rule. Scheduler логирует cause, эмитит metric dagrun.dependency-check.{dag_id}, посылает callback on_failure_callback если задан.

Обратите внимание: между queued и running scheduler может пропустить несколько тиков. Если в DagRun только sensors с rescheduled mode, или все TI заблокированы wait_for_downstream — DagRun будет висеть в queued, пока хотя бы один TI не получит scheduled.


Из официальной документации

“A DAG run is an instance of a DAG running at a specific point in time. DAG runs progress through states: queued → running → success/failed. The scheduler is responsible for transitioning DAG runs between states based on the state of their task instances.”

И из исходников DagRun.update_state():

“Determines the overall state of the DAG Run based on the state of its TaskInstances. Returns ready-to-execute TIs and a flag indicating callback should be fired. The state of the DAG Run is only updated when all TIs are in a terminal state.”

Это ключевая фраза: DagRun state не меняется на финальный verdict, пока есть хотя бы один non-terminal TI. Если у вас есть зависший TI в state running (например, scheduler не успевает adopt orphan) — DagRun так и будет висеть в running, даже если все остальные TI давно success.


Pseudocode: как scheduler обновляет state

# Псевдокод DagRun.update_state (упрощённый)
def update_state(self, session):
    """Called каждый tick для каждого active DagRun."""
    task_instances = self.get_task_instances(session)

    # Schedule готовые TI (none → scheduled)
    schedulable_tis = self._get_ready_tis(task_instances)
    for ti in schedulable_tis:
        ti.state = State.SCHEDULED

    # Determine DagRun state
    unfinished = [ti for ti in task_instances
                  if ti.state not in State.finished]

    if unfinished:
        # Хотя бы один не terminal — DagRun не финализируется
        if self.state == DagRunState.QUEUED and any(
            ti.state == State.SCHEDULED for ti in task_instances
        ):
            self.state = DagRunState.RUNNING
            self.start_date = timezone.utcnow()
        return  # ещё не finalize

    # Все TI terminal — определяем DagRun verdict
    leaves = [ti for ti in task_instances if self.dag.task_dict[ti.task_id].is_leaf]

    if all(ti.state in (State.SUCCESS, State.SKIPPED) for ti in leaves):
        self.state = DagRunState.SUCCESS
    else:
        # Хотя бы один leaf failed/upstream_failed
        self.state = DagRunState.FAILED

    self.end_date = timezone.utcnow()

    # Эмитим callback
    if self.state == DagRunState.SUCCESS:
        callback = self.dag.on_success_callback
    else:
        callback = self.dag.on_failure_callback
    if callback:
        callback_queue.enqueue(callback, dag_run=self)

Ключевые моменты:

  • Verdict определяется по leaf tasks (узлы без downstream), а не по всем tasks. Это позволяет middle-of-DAG failures с trigger_rule=‘all_done’ на leaves не делать DagRun failed.
  • DagRun.end_date устанавливается только при finalize.
  • Callbacks (on_success/on_failure) не выполняются inline — они кладутся в callback_queue, который обрабатывается отдельным executor-ом или внутри DagFileProcessor.

run_type — пять типов DagRun

В таблице dag_run есть колонка run_type (enum DagRunType):

run_typeИсточникОсобенности
scheduledTimetable / cronrun_id формата scheduled__<logical_date>, идёт через next_dagrun_create_after
manualUI ‘Trigger DAG’ / airflow dags trigger / Rest APIrun_id формата manual__<timestamp>, conf может содержать override
backfillairflow dags backfill CLIУправляется отдельным BackfillJob (см. lesson 06), не scheduler
dataset_triggeredAsset/Dataset update firedrun_id формата dataset_triggered__<timestamp>, читает dataset_dag_run_queue
asset_triggeredSynonym для dataset_triggered (2.10+ alias)В 3.x только asset_triggered

run_type влияет на:

  • Concurrency: max_active_runs считает все типы вместе
  • Backfill semantics: depends_on_past и wait_for_downstream имеют специальное поведение (см. lesson 06)
  • Catchup: только scheduled runs создаются catchup-ом
  • UI display: иконки отличаются в Grid view

data_interval vs execution_date — что было и что стало

Это историческая боль Airflow. До 2.2 использовался execution_date, который семантически был “start of data interval”, но всех путал. В 2.2 добавили data_interval_start / data_interval_end, а execution_date стал synonym для data_interval_start.

Семантика data_interval для @daily DAG со start_date=2026-01-01
logical_date / execution_dateSynonyms. Это начало data interval. Для @daily DAG с start_date=2026-01-01, первый run будет иметь logical_date=2026-01-01.
data_interval_startBeginning of data window the run is processing. Для daily DAG = logical_date. Для custom timetable может отличаться.
data_interval_endEnd of data window. Для @daily = logical_date + 1 day. Run представляет данные интервала [start, end).
run_after / scheduled_dttmКогда run будет физически запущен. Для @daily = data_interval_end (после закрытия интервала, чтобы все данные были доступны). DAG запускается 2026-01-02 00:00 за интервал [01-01, 01-02).
start_date (DagRun)Физический момент старта DagRun. Записывается в момент перехода queued → running. Может отличаться от run_after на минуты (scheduler tick + critical section).

Mnemonics: для daily DAG за 1 января run происходит 2 января утром, обрабатывая данные интервала [1, 2). logical_date / execution_date = 1 января.

Это часто confuses новичков: “я триггернул @daily DAG 2 января, почему execution_date 1 января?”. Ответ: data_interval_start = 1 января, run обрабатывает данные of 1-го января.

В 3.x execution_date deprecated и заменён на logical_date повсюду (Jinja, Python context, REST API).


Asset-triggered DagRuns: dataset_dag_run_queue

Datasets (2.4+, AIP-48) дают новый способ trigger-ить DagRun: по событиям обновления других DAGs. Mechanism через таблицу dataset_dag_run_queue:

-- Когда producer DAG обновляет dataset
INSERT INTO dataset_event (dataset_id, source_task_id, source_dag_id, timestamp, ...)
VALUES (42, 'export_users', 'producer_dag', now(), ...);

-- Trigger entry для каждого consumer DAG
INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id, created_at)
VALUES (42, 'consumer_dag', now());

Scheduler в фазе 1 main loop:

# Псевдокод _create_dagruns_for_datasets
queue_entries = session.query(DatasetDagRunQueue).group_by(
    DatasetDagRunQueue.target_dag_id
).all()

for dag_id, entries in queue_entries:
    dag = serialized_dag_to_dag(dag_id)
    required_datasets = dag.dataset_triggers

    # Все ли required datasets имеют свежие events?
    if all_required_datasets_ready(required_datasets, entries):
        create_dagrun(
            dag_id=dag_id,
            run_id=f"dataset_triggered__{now()}"
            run_type=DagRunType.DATASET_TRIGGERED,
            state=DagRunState.QUEUED,
            # data_interval = (max(event.timestamp), now())
        )
        # Удаляем queue entries — обработаны
        session.delete_all(entries)

Gotcha: dataset_dag_run_queue накапливается, если consumer DAG paused или у него нет dataset_triggers (несоответствие conditions). Это распространённая причина инцидентов “DB разрослась на 100GB из ниоткуда” — в постоянно проксящихся production deployments проверяйте размер таблицы.

В 3.x dataset переименован в asset (AIP-74), API изменён: Asset() вместо Dataset(), но semantics та же.


SQL для inspection

В live system из psql:

-- Все active DagRuns
SELECT dag_id, run_id, state, run_type,
       data_interval_start, data_interval_end,
       queued_at, start_date,
       now() - start_date AS duration
FROM dag_run
WHERE state IN ('queued', 'running')
ORDER BY queued_at DESC;

-- "Stuck" DagRuns в queued > 5 минут
SELECT dag_id, run_id, queued_at, now() - queued_at AS waiting
FROM dag_run
WHERE state = 'queued' AND queued_at < now() - interval '5 minutes'
ORDER BY queued_at;

-- Asset queue size (если разрослась — проблема)
SELECT target_dag_id, count(*) as queued_entries
FROM dataset_dag_run_queue
GROUP BY target_dag_id
HAVING count(*) > 100
ORDER BY count(*) DESC;

-- DagRuns без leaf success (не финализированы при наличии stuck TI)
SELECT dr.dag_id, dr.run_id, dr.state,
       count(ti.task_id) FILTER (WHERE ti.state NOT IN ('success', 'failed', 'skipped', 'upstream_failed', 'removed')) AS non_terminal_tis
FROM dag_run dr
JOIN task_instance ti ON ti.dag_id = dr.dag_id AND ti.run_id = dr.run_id
WHERE dr.state = 'running'
GROUP BY dr.dag_id, dr.run_id, dr.state
HAVING count(ti.task_id) FILTER (WHERE ti.state NOT IN ('success', 'failed', 'skipped', 'upstream_failed', 'removed')) > 0
ORDER BY dr.start_date;

Что вызывает каждый переход — таблица

From → ToTriggerWhere in code
(none) → queuedTimetable next_dagrun ≤ now() ИЛИ asset event ready_create_dagruns_for_dags / _create_dagruns_for_datasets
queued → runningПервый TI стал scheduled (хотя бы один)DagRun.update_state фаза 2
running → successВсе leaf TI в success/skippedDagRun.update_state
running → failedХотя бы один leaf TI failed/upstream_failedDagRun.update_state
(любой) → failedManual mark failed через UI / APIDagRun.set_state
(любой) → queued”Clear” → “Reset DagRun” в UIDagRun.clear_number_of_dag_runs

Manual override через UI или CLI обходит scheduler — пользователь напрямую UPDATE-ит dag_run.state. Поэтому переходы не всегда идут “по правилам”.


Production gotchas

1. DagRun navis в queued forever

Cause: все TI в state none, ни один не получает scheduled. Например, у первого TI есть unmet dep (зависит от TI прошлого run, который failed, и depends_on_past=True).

Diagnose:

SELECT task_id, state, FROM task_instance
WHERE dag_id='X' AND run_id='Y';

Если все none — проверить deps, sensor conditions, depends_on_past, pool availability.

2. DagRun running, но visually “ничего не происходит”

Cause: есть orphan TI в state running, чей heartbeat прокис, но scheduler не успел adopt (см. lesson 05). Или sensor в reschedule mode — он “спит” между tick-ами.

Diagnose:

SELECT task_id, state, latest_heartbeat,
       now() - latest_heartbeat AS gap
FROM task_instance
WHERE dag_id='X' AND run_id='Y' AND state='running';

Heartbeat прокис > 5 минут — zombie. См. lesson 05.

3. on_failure_callback не выполняется при manual mark failed

Cause: callback вызывается только из DagRun.update_state() при автоматическом transition. Manual mark через UI не идёт через update_state — это direct SQL UPDATE.

Fix: для guaranteed callbacks — используйте teardown tasks вместо on_failure_callback (см. Module 02 lesson о Setup/Teardown).

4. Catchup создаёт сотни DagRuns при unpause

Cause: DAG с catchup=True и start_date=2020-01-01, unpaused в 2026 → scheduler пытается создать 6 лет daily runs. Throttle через max_active_runs помогает только частично.

Fix:

  • catchup=False — default since 2.3+
  • Если catchup нужен — поставьте корректный start_date перед unpause
  • airflow dags backfill --reset-dagruns для controlled backfill (см. lesson 06)

Comparison с 3.x

Aspect2.x3.x
Class nameairflow.models.dagrun.DagRunто же, но dag_version_id FK
VersioningDagRun не привязан к версии DAG-кодаdag_version_id обязателен (AIP-63)
Asset semanticsdataset_dag_run_queueasset_dag_run_queue (rename)
BackfillОтдельный BackfillJobManaged scheduler-ом (AIP-78)
logical_dateSynonym для execution_dateexecution_date deprecated

Большая часть кода DagRun.update_state идентична — state machine не менялась с 2.2 (когда добавили data_interval).


Проверка знанийKnowledge check
DAG имеет 5 tasks: A → B → [C, D] → E. Task C failed, остальные success. В каком state будет DagRun, и почему?
ОтветAnswer
DagRun будет failed. Verdict определяется по leaf tasks (узлам без downstream). Leaf здесь — task E. Если C failed, то D success, но E получит upstream_failed (потому что D succeed, но C failed, и E зависит от обоих с default trigger_rule='all_success'). Leaf E в state upstream_failed → DagRun failed. Если бы trigger_rule у E был 'all_done' или 'one_success', E мог бы success-ить, и DagRun стал бы success. Это распространённая интуиция: 'middle failure' не всегда даёт failed DagRun — главное leaf verdict. Поэтому Setup/Teardown с `on_failure_fail_dagrun=True` важен для cost-control сценариев — гарантирует failed DagRun, даже если leaves succeed.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В каком состоянии создаётся DagRun сразу после scheduler фазы 1?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6