DagRun lifecycle и state machine
DagRun — это central object Airflow scheduling. Каждый запуск DAG (по schedule, manual trigger, dataset event, backfill CLI) рождает новую строку в таблице dag_run. Этот объект проходит через простую с виду state machine из четырёх состояний, но что именно вызывает каждый переход, кто его делает и в какой момент main loop — детали, которые определяют поведение всего scheduler-а.
Главный класс — airflow.models.dagrun.DagRun. В 2.x он живёт в airflow/models/dagrun.py. Переходы между state-ами инициируются не самим DagRun-ом, а scheduler-ом в фазах 1-2 main loop (см. предыдущие уроки).
Четыре state DagRun-а
В 2.x DagRunState (из airflow.utils.state) имеет ровно четыре терминала жизни:
| State | Что означает | Кто переводит |
|---|---|---|
queued | DagRun создан, scheduler знает что запускать, но scheduling ещё не начался | Scheduler фаза 1 (create_dagruns_for_dags) |
running | Хотя бы один TI переведён в state ≠ none/removed/upstream_failed | Scheduler фаза 2 (schedule_dag_runs) |
success | Все TI в terminal state, ни один не failed/upstream_failed | Scheduler фаза 2 (update_state) |
failed | Все TI в terminal state, хотя бы один failed/upstream_failed | Scheduler фаза 2 (update_state) |
Дополнительно существует pseudo-state none для случая, когда DagRun ещё не создан (используется только как input для timetable.next_dagrun_info).
Состояние restarting есть у TaskInstance, но не у DagRun — DagRun не имеет concept “restart middle of life”, только terminal verdict.
State machine — visualization
Обратите внимание: между queued и running scheduler может пропустить несколько тиков. Если в DagRun только sensors с rescheduled mode, или все TI заблокированы wait_for_downstream — DagRun будет висеть в queued, пока хотя бы один TI не получит scheduled.
Из официальной документации
“A DAG run is an instance of a DAG running at a specific point in time. DAG runs progress through states: queued → running → success/failed. The scheduler is responsible for transitioning DAG runs between states based on the state of their task instances.”
И из исходников DagRun.update_state():
“Determines the overall state of the DAG Run based on the state of its TaskInstances. Returns ready-to-execute TIs and a flag indicating callback should be fired. The state of the DAG Run is only updated when all TIs are in a terminal state.”
Это ключевая фраза: DagRun state не меняется на финальный verdict, пока есть хотя бы один non-terminal TI. Если у вас есть зависший TI в state running (например, scheduler не успевает adopt orphan) — DagRun так и будет висеть в running, даже если все остальные TI давно success.
Pseudocode: как scheduler обновляет state
# Псевдокод DagRun.update_state (упрощённый)
def update_state(self, session):
"""Called каждый tick для каждого active DagRun."""
task_instances = self.get_task_instances(session)
# Schedule готовые TI (none → scheduled)
schedulable_tis = self._get_ready_tis(task_instances)
for ti in schedulable_tis:
ti.state = State.SCHEDULED
# Determine DagRun state
unfinished = [ti for ti in task_instances
if ti.state not in State.finished]
if unfinished:
# Хотя бы один не terminal — DagRun не финализируется
if self.state == DagRunState.QUEUED and any(
ti.state == State.SCHEDULED for ti in task_instances
):
self.state = DagRunState.RUNNING
self.start_date = timezone.utcnow()
return # ещё не finalize
# Все TI terminal — определяем DagRun verdict
leaves = [ti for ti in task_instances if self.dag.task_dict[ti.task_id].is_leaf]
if all(ti.state in (State.SUCCESS, State.SKIPPED) for ti in leaves):
self.state = DagRunState.SUCCESS
else:
# Хотя бы один leaf failed/upstream_failed
self.state = DagRunState.FAILED
self.end_date = timezone.utcnow()
# Эмитим callback
if self.state == DagRunState.SUCCESS:
callback = self.dag.on_success_callback
else:
callback = self.dag.on_failure_callback
if callback:
callback_queue.enqueue(callback, dag_run=self)
Ключевые моменты:
- Verdict определяется по leaf tasks (узлы без downstream), а не по всем tasks. Это позволяет middle-of-DAG failures с trigger_rule=‘all_done’ на leaves не делать DagRun failed.
- DagRun.end_date устанавливается только при finalize.
- Callbacks (on_success/on_failure) не выполняются inline — они кладутся в callback_queue, который обрабатывается отдельным executor-ом или внутри DagFileProcessor.
run_type — пять типов DagRun
В таблице dag_run есть колонка run_type (enum DagRunType):
| run_type | Источник | Особенности |
|---|---|---|
scheduled | Timetable / cron | run_id формата scheduled__<logical_date>, идёт через next_dagrun_create_after |
manual | UI ‘Trigger DAG’ / airflow dags trigger / Rest API | run_id формата manual__<timestamp>, conf может содержать override |
backfill | airflow dags backfill CLI | Управляется отдельным BackfillJob (см. lesson 06), не scheduler |
dataset_triggered | Asset/Dataset update fired | run_id формата dataset_triggered__<timestamp>, читает dataset_dag_run_queue |
asset_triggered | Synonym для dataset_triggered (2.10+ alias) | В 3.x только asset_triggered |
run_type влияет на:
- Concurrency:
max_active_runsсчитает все типы вместе - Backfill semantics: depends_on_past и wait_for_downstream имеют специальное поведение (см. lesson 06)
- Catchup: только scheduled runs создаются catchup-ом
- UI display: иконки отличаются в Grid view
data_interval vs execution_date — что было и что стало
Это историческая боль Airflow. До 2.2 использовался execution_date, который семантически был “start of data interval”, но всех путал. В 2.2 добавили data_interval_start / data_interval_end, а execution_date стал synonym для data_interval_start.
Mnemonics: для daily DAG за 1 января run происходит 2 января утром, обрабатывая данные интервала [1, 2). logical_date / execution_date = 1 января.
Это часто confuses новичков: “я триггернул @daily DAG 2 января, почему execution_date 1 января?”. Ответ: data_interval_start = 1 января, run обрабатывает данные of 1-го января.
В 3.x execution_date deprecated и заменён на logical_date повсюду (Jinja, Python context, REST API).
Asset-triggered DagRuns: dataset_dag_run_queue
Datasets (2.4+, AIP-48) дают новый способ trigger-ить DagRun: по событиям обновления других DAGs. Mechanism через таблицу dataset_dag_run_queue:
-- Когда producer DAG обновляет dataset
INSERT INTO dataset_event (dataset_id, source_task_id, source_dag_id, timestamp, ...)
VALUES (42, 'export_users', 'producer_dag', now(), ...);
-- Trigger entry для каждого consumer DAG
INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id, created_at)
VALUES (42, 'consumer_dag', now());
Scheduler в фазе 1 main loop:
# Псевдокод _create_dagruns_for_datasets
queue_entries = session.query(DatasetDagRunQueue).group_by(
DatasetDagRunQueue.target_dag_id
).all()
for dag_id, entries in queue_entries:
dag = serialized_dag_to_dag(dag_id)
required_datasets = dag.dataset_triggers
# Все ли required datasets имеют свежие events?
if all_required_datasets_ready(required_datasets, entries):
create_dagrun(
dag_id=dag_id,
run_id=f"dataset_triggered__{now()}"
run_type=DagRunType.DATASET_TRIGGERED,
state=DagRunState.QUEUED,
# data_interval = (max(event.timestamp), now())
)
# Удаляем queue entries — обработаны
session.delete_all(entries)
Gotcha: dataset_dag_run_queue накапливается, если consumer DAG paused или у него нет dataset_triggers (несоответствие conditions). Это распространённая причина инцидентов “DB разрослась на 100GB из ниоткуда” — в постоянно проксящихся production deployments проверяйте размер таблицы.
В 3.x dataset переименован в asset (AIP-74), API изменён: Asset() вместо Dataset(), но semantics та же.
SQL для inspection
В live system из psql:
-- Все active DagRuns
SELECT dag_id, run_id, state, run_type,
data_interval_start, data_interval_end,
queued_at, start_date,
now() - start_date AS duration
FROM dag_run
WHERE state IN ('queued', 'running')
ORDER BY queued_at DESC;
-- "Stuck" DagRuns в queued > 5 минут
SELECT dag_id, run_id, queued_at, now() - queued_at AS waiting
FROM dag_run
WHERE state = 'queued' AND queued_at < now() - interval '5 minutes'
ORDER BY queued_at;
-- Asset queue size (если разрослась — проблема)
SELECT target_dag_id, count(*) as queued_entries
FROM dataset_dag_run_queue
GROUP BY target_dag_id
HAVING count(*) > 100
ORDER BY count(*) DESC;
-- DagRuns без leaf success (не финализированы при наличии stuck TI)
SELECT dr.dag_id, dr.run_id, dr.state,
count(ti.task_id) FILTER (WHERE ti.state NOT IN ('success', 'failed', 'skipped', 'upstream_failed', 'removed')) AS non_terminal_tis
FROM dag_run dr
JOIN task_instance ti ON ti.dag_id = dr.dag_id AND ti.run_id = dr.run_id
WHERE dr.state = 'running'
GROUP BY dr.dag_id, dr.run_id, dr.state
HAVING count(ti.task_id) FILTER (WHERE ti.state NOT IN ('success', 'failed', 'skipped', 'upstream_failed', 'removed')) > 0
ORDER BY dr.start_date;
Что вызывает каждый переход — таблица
| From → To | Trigger | Where in code |
|---|---|---|
| (none) → queued | Timetable next_dagrun ≤ now() ИЛИ asset event ready | _create_dagruns_for_dags / _create_dagruns_for_datasets |
| queued → running | Первый TI стал scheduled (хотя бы один) | DagRun.update_state фаза 2 |
| running → success | Все leaf TI в success/skipped | DagRun.update_state |
| running → failed | Хотя бы один leaf TI failed/upstream_failed | DagRun.update_state |
| (любой) → failed | Manual mark failed через UI / API | DagRun.set_state |
| (любой) → queued | ”Clear” → “Reset DagRun” в UI | DagRun.clear_number_of_dag_runs |
Manual override через UI или CLI обходит scheduler — пользователь напрямую UPDATE-ит dag_run.state. Поэтому переходы не всегда идут “по правилам”.
Production gotchas
1. DagRun navis в queued forever
Cause: все TI в state none, ни один не получает scheduled. Например, у первого TI есть unmet dep (зависит от TI прошлого run, который failed, и depends_on_past=True).
Diagnose:
SELECT task_id, state, FROM task_instance
WHERE dag_id='X' AND run_id='Y';
Если все none — проверить deps, sensor conditions, depends_on_past, pool availability.
2. DagRun running, но visually “ничего не происходит”
Cause: есть orphan TI в state running, чей heartbeat прокис, но scheduler не успел adopt (см. lesson 05). Или sensor в reschedule mode — он “спит” между tick-ами.
Diagnose:
SELECT task_id, state, latest_heartbeat,
now() - latest_heartbeat AS gap
FROM task_instance
WHERE dag_id='X' AND run_id='Y' AND state='running';
Heartbeat прокис > 5 минут — zombie. См. lesson 05.
3. on_failure_callback не выполняется при manual mark failed
Cause: callback вызывается только из DagRun.update_state() при автоматическом transition. Manual mark через UI не идёт через update_state — это direct SQL UPDATE.
Fix: для guaranteed callbacks — используйте teardown tasks вместо on_failure_callback (см. Module 02 lesson о Setup/Teardown).
4. Catchup создаёт сотни DagRuns при unpause
Cause: DAG с catchup=True и start_date=2020-01-01, unpaused в 2026 → scheduler пытается создать 6 лет daily runs. Throttle через max_active_runs помогает только частично.
Fix:
catchup=False— default since 2.3+- Если catchup нужен — поставьте корректный start_date перед unpause
airflow dags backfill --reset-dagrunsдля controlled backfill (см. lesson 06)
Comparison с 3.x
| Aspect | 2.x | 3.x |
|---|---|---|
| Class name | airflow.models.dagrun.DagRun | то же, но dag_version_id FK |
| Versioning | DagRun не привязан к версии DAG-кода | dag_version_id обязателен (AIP-63) |
| Asset semantics | dataset_dag_run_queue | asset_dag_run_queue (rename) |
| Backfill | Отдельный BackfillJob | Managed scheduler-ом (AIP-78) |
| logical_date | Synonym для execution_date | execution_date deprecated |
Большая часть кода DagRun.update_state идентична — state machine не менялась с 2.2 (когда добавили data_interval).