TaskInstance state machine — все 13 состояний
Если DagRun имеет 4 state-а, то TaskInstance — 13. Это самая сложная state machine в Airflow, и понимание всех её переходов — что отличает того, кто “использует Airflow”, от того, кто “знает, что происходит внутри”. Каждое состояние имеет конкретную семантику, конкретный код, который его устанавливает, и конкретные следующие состояния, в которые TI может перейти.
Состояния определены в airflow.utils.state.TaskInstanceState. В коде их часто называют просто State.SCHEDULED, State.QUEUED и т.д. — потому что class State имеет class attributes для удобства.
Все 13 состояний
| State | Семантика | Кто переводит |
|---|---|---|
none (NULL) | TI создан в DB, но scheduler ещё не оценивал | Создаётся при создании DagRun или DAG re-serialization |
scheduled | Все upstream deps satisfied, TI готов к enqueue | Scheduler фаза 2 (schedule_dag_runs) |
queued | TI отправлен executor-у, ждёт worker | Scheduler фаза 3 (critical section) |
running | Worker подобрал, task code исполняется | LocalTaskJob worker process |
success | Task callable вернулся без exception | LocalTaskJob после успешного return |
failed | Task callable raised exception (после exhaustion retries) | LocalTaskJob exception handler |
up_for_retry | Failed, но remaining retries > 0 | LocalTaskJob после exception |
up_for_reschedule | Sensor (mode=‘reschedule’) не satisfied, отложен до следующей проверки | Sensor poke logic |
deferred | Deferrable operator передал control триггеру, ждёт event | DeferTask exception handler |
removed | Task больше нет в DAG (удалён из кода, но DagRun уже существует) | Scheduler при verify_integrity |
restarting | Manual restart через UI / API | DagRun.set_task_instance_state |
shutdown | External signal SIGTERM (через UI ‘Mark Failed’ / airflow tasks kill) | LocalTaskJob signal handler |
upstream_failed | Хотя бы один upstream failed, default trigger_rule | Scheduler фаза 2 |
skipped | Branch не выбрал эту task / ShortCircuit returned False / trigger_rule skipped | BranchPythonOperator / ShortCircuitOperator |
Всего 13 + null = 14 возможных значений колонки task_instance.state. Из них:
- Non-terminal: none, scheduled, queued, running, up_for_retry, up_for_reschedule, deferred, restarting, shutdown (9 states)
- Terminal: success, failed, upstream_failed, skipped, removed (5 states)
State categories — bitmasks из исходников
В airflow/utils/state.py определены groups:
class State:
# ... individual states ...
finished = frozenset([
SUCCESS, FAILED, UPSTREAM_FAILED, SKIPPED, REMOVED,
])
"""Terminal states — TI не будет менять state без manual intervention."""
unfinished = frozenset([
NONE, SCHEDULED, QUEUED, RUNNING, UP_FOR_RETRY,
UP_FOR_RESCHEDULE, DEFERRED, RESTARTING, SHUTDOWN,
])
"""Non-terminal — TI ещё может прогрессировать."""
failed_states = frozenset([FAILED, UPSTREAM_FAILED])
"""Считается ли failure для DagRun verdict."""
success_states = frozenset([SUCCESS, SKIPPED])
"""Считается ли success для DagRun verdict."""
adoptable_states = frozenset([QUEUED, RUNNING, DEFERRED])
"""TI, которые могут быть adopted scheduler-ом при failover."""
running = frozenset([RUNNING])
"""Узкое определение — только active execution."""
Зачем эти group-ы:
finished— фильтр вDagRun.update_state()для определения “все TI terminal?”failed_states/success_states— для verdictadoptable_states— дляadopt_or_reset_orphaned_tasks(см. lesson 05). DEFERRED adopt-ed, но logic особая — recreate Trigger row, не reset state.
Главная цепочка успешного выполнения
Каждый переход здесь — отдельный UPDATE в DB, написанный в разных файлах кодовой базы. none → scheduled пишет scheduler. scheduled → queued — тоже scheduler. queued → running и running → success — worker (LocalTaskJob).
Цикл retry — failed + remaining retries > 0
Retry mechanics
В airflow/models/taskinstance.py:
# Псевдокод handle_failure
def handle_failure(self, error, session):
self.end_date = timezone.utcnow()
self.duration = (self.end_date - self.start_date).total_seconds()
self.log.error("Task failed", exc_info=error)
# try_number инкрементится BEFORE state assignment в 2.x
if self.is_eligible_to_retry():
self.state = State.UP_FOR_RETRY
# Compute next retry time
retry_delay = self.task.retry_delay # timedelta
if self.task.retry_exponential_backoff:
# Exponential: retry_delay * 2^(try_number - 1)
min_backoff = retry_delay.total_seconds()
delay = min_backoff * (2 ** (self.try_number - 1))
delay = min(delay, self.task.max_retry_delay.total_seconds())
else:
delay = retry_delay.total_seconds()
self.next_method = None
# next_retry_datetime — не отдельная колонка, scheduler вычисляет из end_date + retry_delay
else:
self.state = State.FAILED
if self.task.on_failure_callback:
callback_queue.enqueue(self.task.on_failure_callback, context=self.get_template_context())
session.merge(self)
session.commit()
def is_eligible_to_retry(self):
"""try_number 1-indexed после increment, retries — max allowed retries."""
return (
self.task.retries
and self.try_number <= self.task.retries
and not self.task.is_smart_sensor # legacy 1.x
)
Важные тонкости:
try_numberв 2.x семантически 1-indexed после increment. Первая попытка: try_number=1 на start, на failure → try_number=2. retries=3 значит “3 retry attempts after initial”, total = 4 attempts.- В 3.x семантика try_number была фиксирована (AIP-65) — теперь 0-indexed before increment, что более интуитивно.
retry_delaydefault 5 минут.retry_exponential_backoff=True(default False) делает delays = 5m, 10m, 20m, 40m, …max_retry_delaycapping для exponential — иначе на retry=10 ждёте 85 часов.
Sensor reschedule cycle
Sensors с mode='reschedule' не держат worker slot между poke-ами. Они отпускают slot и ждут.
Отличие от up_for_retry: retry — после exception. Reschedule — sensor poke вернул False, это штатное поведение. Reschedule не считается failure для retry counter.
В table task_reschedule хранятся все poke attempts с timestamps — это позволяет UI показывать историю sensor checking.
Deferrable operators — переход через deferred
Deferrable (AIP-40, 2.2+) — самый интересный state. Operator может на середине execution передать control в Triggerer process:
Killer feature: деферный sensor не занимает worker slot. Это позволяет иметь тысячи “waiting” sensors на одном worker, тогда как reschedule mode требует scheduler tick + queue cycle для каждого poke.
State.DEFERRED — adoptable, но logic особая (см. lesson 05). Если triggerer умер, scheduler не reset state в scheduled (это сломает context), а recreate trigger row у живого triggerer-а.
Skipped — три источника
State.SKIPPED устанавливается тремя разными механизмами:
- BranchPythonOperator / @task.branch — возвращает list[task_id], all not-selected children получают skipped.
- ShortCircuitOperator — возвращает False → все downstream получают skipped (если
ignore_downstream_trigger_rules=True, default). - trigger_rule=‘one_success’ и upstream skipped — TI с
trigger_rule='all_success'где upstream skipped → также skipped (cascading).
# Псевдокод BranchPythonOperator
class BranchPythonOperator(PythonOperator):
def execute(self, context):
chosen_task_ids = self.python_callable(**context)
# XCom push для downstream resolution
context['ti'].xcom_push(key='skipmixin_key', value={'followed': chosen_task_ids})
# SkipMixin делает остальное
return chosen_task_ids
# В scheduler фаза 2 при оценке downstream
def _schedule_dag_run(...):
for ti in task_instances:
if upstream_branched_to_other(ti):
ti.state = State.SKIPPED # ← skip downstream not chosen
Skipped — terminal в success_states. Это означает: SKIPPED не failure, и не блокирует downstream если у downstream trigger_rule allows skipped propagation.
Upstream_failed — cascade
# Псевдокод в DepStatesDep
def _evaluate(self, ti, session):
upstream_tis = ti.task.upstream_tasks
failed_upstreams = [ut for ut in upstream_tis if ut.state in State.failed_states]
if failed_upstreams and ti.task.trigger_rule == TriggerRule.ALL_SUCCESS:
# Default trigger rule — cascade failure
ti.state = State.UPSTREAM_FAILED
return False
if all(ut.state == State.SKIPPED for ut in upstream_tis) and ti.task.trigger_rule == TriggerRule.ALL_SUCCESS:
ti.state = State.SKIPPED
return False
# ... другие trigger_rule logic
Cascading depth: если task A failed → B (downstream A) upstream_failed → C (downstream B) тоже upstream_failed. Это иногда сюрприз для пользователей: “у меня сотня tasks упала, хотя только одна реально failed”.
С trigger_rule=‘all_done’, all_failed, one_failed, none_failed, none_skipped — можно избежать cascade. См. Module 02 lesson о trigger_rule.
Specialty states
removed
Task был в DagRun (DAG version A), но в новой версии DAG (B) этого task больше нет. Scheduler при verify_integrity обнаруживает orphan TI и помечает removed.
# Псевдокод DagRun.verify_integrity
def verify_integrity(self, session):
current_task_ids = {t.task_id for t in self.dag.task_dict.values()}
db_tis = session.query(TI).filter(TI.dag_id == self.dag_id, TI.run_id == self.run_id).all()
for ti in db_tis:
if ti.task_id not in current_task_ids:
ti.state = State.REMOVED # ← removed terminal
restarting
Manual UI action ‘Restart’ (отличается от Clear). Устанавливается direct UPDATE. Worker увидит сигнал и завершит текущее execution, затем перейдёт в scheduled.
shutdown
External SIGTERM — обычно из UI ‘Mark Failed’ / airflow tasks kill / executor sending kill signal. LocalTaskJob signal handler устанавливает shutdown, attempts graceful cleanup, потом state=failed.
В 3.x restarting и shutdown объединены в более чистый mechanism через signals — старые states могут быть удалены.
Полная таблица переходов
| From | To | Trigger | Где |
|---|---|---|---|
| none | scheduled | All upstream deps satisfied | scheduler phase 2 |
| none | upstream_failed | Upstream failed, trigger_rule=all_success | scheduler phase 2 |
| none | skipped | Upstream branched away / cascading skip | scheduler phase 2 |
| none | removed | Task убрана из DAG | verify_integrity |
| scheduled | queued | Critical section + concurrency OK | scheduler phase 3 |
| queued | running | Worker picked up task | LocalTaskJob start |
| queued | scheduled | Adoption by new scheduler (orphan reset) | adopt_or_reset_orphaned_tasks |
| running | success | execute() returned | LocalTaskJob end |
| running | failed | execute() raised, no retries left | LocalTaskJob handle_failure |
| running | up_for_retry | execute() raised, retries > 0 | LocalTaskJob handle_failure |
| running | up_for_reschedule | Sensor poke returned False | LocalTaskJob reschedule handler |
| running | deferred | TaskDeferred raised | LocalTaskJob defer handler |
| running | shutdown | SIGTERM / ‘Mark Failed’ | LocalTaskJob signal handler |
| up_for_retry | scheduled | retry_delay elapsed | scheduler phase 2 |
| up_for_reschedule | scheduled | reschedule_date ≤ now() | scheduler phase 2 |
| deferred | scheduled | Trigger fired event | triggerer process |
| deferred | failed | Trigger timeout / raised | triggerer process |
| shutdown | failed | Cleanup complete | LocalTaskJob signal handler |
| (любой non-terminal) | failed | Manual ‘Mark Failed’ UI | DagRun.set_task_instance_state |
| (любой) | scheduled | ’Clear task instance’ UI | TI.clear_instance |
| restarting | queued | Restart cycle | Manual or job manager |
Это полная таблица. Любой переход, не указанный здесь — bug или undefined behavior.
SQL для inspection state distribution
-- State distribution per DAG (sanity check)
SELECT dag_id, state, count(*) AS cnt
FROM task_instance
WHERE run_id LIKE 'scheduled%'
GROUP BY dag_id, state
ORDER BY dag_id, cnt DESC;
-- TIs stuck в scheduled > 5 минут (scheduler backlog problem)
SELECT dag_id, task_id, run_id, scheduled_dttm,
now() - scheduled_dttm AS waiting
FROM task_instance
WHERE state = 'scheduled' AND scheduled_dttm < now() - interval '5 minutes'
ORDER BY scheduled_dttm;
-- Retry distribution — какие tasks retries-ятся часто (flaky)
SELECT dag_id, task_id, max(try_number) AS max_tries,
count(*) FILTER (WHERE state = 'failed') AS total_failed,
count(*) FILTER (WHERE state = 'success' AND try_number > 1) AS recovered_after_retry
FROM task_instance
GROUP BY dag_id, task_id
HAVING max(try_number) > 1
ORDER BY recovered_after_retry DESC;
-- Deferred tasks (если их много — проверить triggerer health)
SELECT t.trigger_id, t.classpath, t.created_date,
now() - t.created_date AS age,
ti.dag_id, ti.task_id, ti.state
FROM trigger t
JOIN task_instance ti ON ti.trigger_id = t.id
ORDER BY t.created_date DESC;
Production gotchas
1. try_number путаница 2.x vs 3.x
2.x: try_number incremented BEFORE state change на failure. Если retries=3 и task имела 2 failures + 1 success на 3rd — try_number в success row = 3.
3.x: try_number incremented AFTER, и теперь semantically “number of completed tries” — start at 0.
Impact: queries WHERE try_number = retries имеют разный смысл в 2.x vs 3.x.
2. Stuck в queued — most common issue
TI в queued state forever — обычно executor не подобрал task. Причины:
- Celery worker died with task in queue (CELERY_TASK_REVOKED не дошло)
- K8s executor — pod create failed silently
- Pool slots wrong calculation
Diagnose: см. lesson 05 (zombies/orphans).
3. depends_on_past + manual run = forever in none
depends_on_past=True требует, чтобы previous run этого task был success. Если history пустая или previous failed — TI висит в none forever, не получает scheduled.
Fix: mark previous TI success вручную, или ignore_depends_on_past=True при trigger.
4. Deferrable + triggerer crashed
Если triggerer pod crash без graceful shutdown:
- Trigger rows остаются в DB
- TI navis в deferred forever
- Scheduler с
triggerer_health_check_thresholdexceeded → reset state=scheduled, recreate trigger у другого triggerer
В реальности recovery работает плохо, если task передал large state через kwargs к trigger — deserialization может fail.
5. Скрытые state changes via SQL UPDATE
Некоторые operators делают direct DB UPDATE мимо state machine (например, custom HA logic). Это ломает audit trail и может конфликтовать со scheduler. Никогда не пишите code, который UPDATE task_instance.state напрямую — используйте TaskInstance.set_state API.
Comparison с 3.x state machine
| State | 2.x | 3.x |
|---|---|---|
| All 13 states | Все есть | shutdown объединён в signal-based |
| try_number | 1-indexed после increment | 0-indexed before increment (AIP-65) |
| Task SDK (AIP-72) | Не существует | Otherwise — те же state names |
| dag_version_id link | Нет (verify_integrity по task_dict) | Каждый TI знает свою dag_version (clean removed) |
Core state machine не изменилась — это стабильный API с 2.0.