Learning Platform
Глоссарий Troubleshooting
Урок 05.04 · 38 мин
Продвинутый
TaskInstanceState machineRetryLifecycleDeferrableSensor

TaskInstance state machine — все 13 состояний

Если DagRun имеет 4 state-а, то TaskInstance — 13. Это самая сложная state machine в Airflow, и понимание всех её переходов — что отличает того, кто “использует Airflow”, от того, кто “знает, что происходит внутри”. Каждое состояние имеет конкретную семантику, конкретный код, который его устанавливает, и конкретные следующие состояния, в которые TI может перейти.

Состояния определены в airflow.utils.state.TaskInstanceState. В коде их часто называют просто State.SCHEDULED, State.QUEUED и т.д. — потому что class State имеет class attributes для удобства.


Все 13 состояний

StateСемантикаКто переводит
none (NULL)TI создан в DB, но scheduler ещё не оценивалСоздаётся при создании DagRun или DAG re-serialization
scheduledВсе upstream deps satisfied, TI готов к enqueueScheduler фаза 2 (schedule_dag_runs)
queuedTI отправлен executor-у, ждёт workerScheduler фаза 3 (critical section)
runningWorker подобрал, task code исполняетсяLocalTaskJob worker process
successTask callable вернулся без exceptionLocalTaskJob после успешного return
failedTask callable raised exception (после exhaustion retries)LocalTaskJob exception handler
up_for_retryFailed, но remaining retries > 0LocalTaskJob после exception
up_for_rescheduleSensor (mode=‘reschedule’) не satisfied, отложен до следующей проверкиSensor poke logic
deferredDeferrable operator передал control триггеру, ждёт eventDeferTask exception handler
removedTask больше нет в DAG (удалён из кода, но DagRun уже существует)Scheduler при verify_integrity
restartingManual restart через UI / APIDagRun.set_task_instance_state
shutdownExternal signal SIGTERM (через UI ‘Mark Failed’ / airflow tasks kill)LocalTaskJob signal handler
upstream_failedХотя бы один upstream failed, default trigger_ruleScheduler фаза 2
skippedBranch не выбрал эту task / ShortCircuit returned False / trigger_rule skippedBranchPythonOperator / ShortCircuitOperator

Всего 13 + null = 14 возможных значений колонки task_instance.state. Из них:

  • Non-terminal: none, scheduled, queued, running, up_for_retry, up_for_reschedule, deferred, restarting, shutdown (9 states)
  • Terminal: success, failed, upstream_failed, skipped, removed (5 states)

State categories — bitmasks из исходников

В airflow/utils/state.py определены groups:

class State:
    # ... individual states ...

    finished = frozenset([
        SUCCESS, FAILED, UPSTREAM_FAILED, SKIPPED, REMOVED,
    ])
    """Terminal states — TI не будет менять state без manual intervention."""

    unfinished = frozenset([
        NONE, SCHEDULED, QUEUED, RUNNING, UP_FOR_RETRY,
        UP_FOR_RESCHEDULE, DEFERRED, RESTARTING, SHUTDOWN,
    ])
    """Non-terminal — TI ещё может прогрессировать."""

    failed_states = frozenset([FAILED, UPSTREAM_FAILED])
    """Считается ли failure для DagRun verdict."""

    success_states = frozenset([SUCCESS, SKIPPED])
    """Считается ли success для DagRun verdict."""

    adoptable_states = frozenset([QUEUED, RUNNING, DEFERRED])
    """TI, которые могут быть adopted scheduler-ом при failover."""

    running = frozenset([RUNNING])
    """Узкое определение — только active execution."""

Зачем эти group-ы:

  • finished — фильтр в DagRun.update_state() для определения “все TI terminal?”
  • failed_states / success_states — для verdict
  • adoptable_states — для adopt_or_reset_orphaned_tasks (см. lesson 05). DEFERRED adopt-ed, но logic особая — recreate Trigger row, не reset state.

Главная цепочка успешного выполнения

Успешный happy path TI
noneTI создан scheduler-ом при verify_integrity DagRun. Колонка task_instance.state = NULL. Состояние ожидания первой оценки.
all upstreams done
scheduledScheduler фаза 2 проверил deps: все upstream TI в success/skipped/passed-trigger-rule. Установил state=scheduled, scheduled_dttm=now(). Готов к enqueue.
critical section + executor.queue_command
queuedScheduler в фазе 3 (critical section) увидел state=scheduled, проверил concurrency и pool limits, передал executor-у. Executor (Celery/K8s/Local) положил task в свою queue. queued_dttm=now().
worker picks up
runningWorker (Celery worker process / K8s pod / Local subprocess) запустил LocalTaskJob, который установил state=running, start_date=now(), запустил heartbeat thread.
task.execute() returned
successTask callable вернулся без exception. LocalTaskJob установил state=success, end_date=now(), записал XCom return_value (если задан), запустил on_success_callback.

Каждый переход здесь — отдельный UPDATE в DB, написанный в разных файлах кодовой базы. none → scheduled пишет scheduler. scheduled → queued — тоже scheduler. queued → running и running → success — worker (LocalTaskJob).


Цикл retry — failed + remaining retries > 0

Retry cycle
runningTask code исполняется в worker. LocalTaskJob heartbeat активен.
task.execute() raised exception
up_for_retryLocalTaskJob поймал exception. retries=3, try_number=1 < retries. Установил state=up_for_retry, увеличил try_number=2, записал next_retry_datetime = now() + retry_delay (с exponential_backoff если задано). Записал TI log с tracebackом.
retry_delay elapsed
scheduledScheduler фаза 2 увидел up_for_retry TI, проверил now() >= next_retry_datetime. Установил scheduled. TI попадёт обратно в фазу 3 enqueue.
… queued → running → ?
successRetry прошёл успешно. Terminal success — несмотря на initial failure.
failed (final)Retry exhausted: try_number > retries. state=failed (terminal). DagRun verdict зависит от leaf state.
Retry и backoff — как переживать временные сбои

Retry mechanics

В airflow/models/taskinstance.py:

# Псевдокод handle_failure
def handle_failure(self, error, session):
    self.end_date = timezone.utcnow()
    self.duration = (self.end_date - self.start_date).total_seconds()
    self.log.error("Task failed", exc_info=error)

    # try_number инкрементится BEFORE state assignment в 2.x
    if self.is_eligible_to_retry():
        self.state = State.UP_FOR_RETRY

        # Compute next retry time
        retry_delay = self.task.retry_delay  # timedelta
        if self.task.retry_exponential_backoff:
            # Exponential: retry_delay * 2^(try_number - 1)
            min_backoff = retry_delay.total_seconds()
            delay = min_backoff * (2 ** (self.try_number - 1))
            delay = min(delay, self.task.max_retry_delay.total_seconds())
        else:
            delay = retry_delay.total_seconds()

        self.next_method = None
        # next_retry_datetime — не отдельная колонка, scheduler вычисляет из end_date + retry_delay
    else:
        self.state = State.FAILED

    if self.task.on_failure_callback:
        callback_queue.enqueue(self.task.on_failure_callback, context=self.get_template_context())

    session.merge(self)
    session.commit()

def is_eligible_to_retry(self):
    """try_number 1-indexed после increment, retries — max allowed retries."""
    return (
        self.task.retries
        and self.try_number <= self.task.retries
        and not self.task.is_smart_sensor  # legacy 1.x
    )

Важные тонкости:

  • try_number в 2.x семантически 1-indexed после increment. Первая попытка: try_number=1 на start, на failure → try_number=2. retries=3 значит “3 retry attempts after initial”, total = 4 attempts.
  • В 3.x семантика try_number была фиксирована (AIP-65) — теперь 0-indexed before increment, что более интуитивно.
  • retry_delay default 5 минут. retry_exponential_backoff=True (default False) делает delays = 5m, 10m, 20m, 40m, …
  • max_retry_delay capping для exponential — иначе на retry=10 ждёте 85 часов.

Sensor reschedule cycle

Sensors с mode='reschedule' не держат worker slot между poke-ами. Они отпускают slot и ждут.

Sensor reschedule cycle
runningSensor.poke() executed inside worker. Returned False — condition not met. LocalTaskJob ловит специальный AirflowRescheduleException.
up_for_rescheduleLocalTaskJob установил state=up_for_reschedule. Записал в task_reschedule table: try_number, start_date, end_date, reschedule_date = now() + poke_interval. Освободил worker slot — другие tasks могут использовать.
poke_interval elapsed
scheduledScheduler фаза 2 проверил task_reschedule table, нашёл entries с reschedule_date ≤ now(). Установил state=scheduled — обратно в очередь.
… queued → running → next poke
successSensor.poke() finally returned True. LocalTaskJob → terminal success.
failedTimeout exceeded (timeout default 7 days, или soft_fail после X tries). State=failed.

Отличие от up_for_retry: retry — после exception. Reschedule — sensor poke вернул False, это штатное поведение. Reschedule не считается failure для retry counter.

В table task_reschedule хранятся все poke attempts с timestamps — это позволяет UI показывать историю sensor checking.


Deferrable operators — переход через deferred

Deferrable (AIP-40, 2.2+) — самый интересный state. Operator может на середине execution передать control в Triggerer process:

Deferrable operator cycle
runningDeferrableOperator.execute() запустил async logic, например HTTP poll. Когда нужно ждать долго — raise self.defer(trigger=..., method_name='resume').
raise TaskDeferred
deferredLocalTaskJob поймал TaskDeferred exception. Установил state=deferred, записал Trigger row в DB с serialized trigger object (HttpTrigger / TimeSensorAsyncTrigger / etc). Освободил worker slot. Worker завершился.
triggerer picks up
(в Triggerer)Triggerer process подобрал trigger row, deserialize, запустил async coroutine. Coroutine ждёт event (HTTP response / file appear / time pass) без блокировки thread — 1 triggerer может handle 1000+ deferred tasks.
event fired
scheduledTriggerer удалил Trigger row, установил TI state=scheduled, чтобы scheduler пере-enqueue его. Возобновление execution вызовет resume_from_deferred — operator.method_name(context, event).
queued → running → success/failedTI идёт обычный путь, но execute() вместо начала вызывает next_method (resume callback). Operator может снова defer, или завершить как success/failed.

Killer feature: деферный sensor не занимает worker slot. Это позволяет иметь тысячи “waiting” sensors на одном worker, тогда как reschedule mode требует scheduler tick + queue cycle для каждого poke.

State.DEFERRED — adoptable, но logic особая (см. lesson 05). Если triggerer умер, scheduler не reset state в scheduled (это сломает context), а recreate trigger row у живого triggerer-а.


Skipped — три источника

State.SKIPPED устанавливается тремя разными механизмами:

  1. BranchPythonOperator / @task.branch — возвращает list[task_id], all not-selected children получают skipped.
  2. ShortCircuitOperator — возвращает False → все downstream получают skipped (если ignore_downstream_trigger_rules=True, default).
  3. trigger_rule=‘one_success’ и upstream skipped — TI с trigger_rule='all_success' где upstream skipped → также skipped (cascading).
# Псевдокод BranchPythonOperator
class BranchPythonOperator(PythonOperator):
    def execute(self, context):
        chosen_task_ids = self.python_callable(**context)
        # XCom push для downstream resolution
        context['ti'].xcom_push(key='skipmixin_key', value={'followed': chosen_task_ids})
        # SkipMixin делает остальное
        return chosen_task_ids

# В scheduler фаза 2 при оценке downstream
def _schedule_dag_run(...):
    for ti in task_instances:
        if upstream_branched_to_other(ti):
            ti.state = State.SKIPPED  # ← skip downstream not chosen

Skipped — terminal в success_states. Это означает: SKIPPED не failure, и не блокирует downstream если у downstream trigger_rule allows skipped propagation.


Upstream_failed — cascade

# Псевдокод в DepStatesDep
def _evaluate(self, ti, session):
    upstream_tis = ti.task.upstream_tasks
    failed_upstreams = [ut for ut in upstream_tis if ut.state in State.failed_states]

    if failed_upstreams and ti.task.trigger_rule == TriggerRule.ALL_SUCCESS:
        # Default trigger rule — cascade failure
        ti.state = State.UPSTREAM_FAILED
        return False

    if all(ut.state == State.SKIPPED for ut in upstream_tis) and ti.task.trigger_rule == TriggerRule.ALL_SUCCESS:
        ti.state = State.SKIPPED
        return False

    # ... другие trigger_rule logic

Cascading depth: если task A failed → B (downstream A) upstream_failed → C (downstream B) тоже upstream_failed. Это иногда сюрприз для пользователей: “у меня сотня tasks упала, хотя только одна реально failed”.

С trigger_rule=‘all_done’, all_failed, one_failed, none_failed, none_skipped — можно избежать cascade. См. Module 02 lesson о trigger_rule.


Specialty states

removed

Task был в DagRun (DAG version A), но в новой версии DAG (B) этого task больше нет. Scheduler при verify_integrity обнаруживает orphan TI и помечает removed.

# Псевдокод DagRun.verify_integrity
def verify_integrity(self, session):
    current_task_ids = {t.task_id for t in self.dag.task_dict.values()}
    db_tis = session.query(TI).filter(TI.dag_id == self.dag_id, TI.run_id == self.run_id).all()

    for ti in db_tis:
        if ti.task_id not in current_task_ids:
            ti.state = State.REMOVED  # ← removed terminal

restarting

Manual UI action ‘Restart’ (отличается от Clear). Устанавливается direct UPDATE. Worker увидит сигнал и завершит текущее execution, затем перейдёт в scheduled.

shutdown

External SIGTERM — обычно из UI ‘Mark Failed’ / airflow tasks kill / executor sending kill signal. LocalTaskJob signal handler устанавливает shutdown, attempts graceful cleanup, потом state=failed.

В 3.x restarting и shutdown объединены в более чистый mechanism через signals — старые states могут быть удалены.


Полная таблица переходов

FromToTriggerГде
nonescheduledAll upstream deps satisfiedscheduler phase 2
noneupstream_failedUpstream failed, trigger_rule=all_successscheduler phase 2
noneskippedUpstream branched away / cascading skipscheduler phase 2
noneremovedTask убрана из DAGverify_integrity
scheduledqueuedCritical section + concurrency OKscheduler phase 3
queuedrunningWorker picked up taskLocalTaskJob start
queuedscheduledAdoption by new scheduler (orphan reset)adopt_or_reset_orphaned_tasks
runningsuccessexecute() returnedLocalTaskJob end
runningfailedexecute() raised, no retries leftLocalTaskJob handle_failure
runningup_for_retryexecute() raised, retries > 0LocalTaskJob handle_failure
runningup_for_rescheduleSensor poke returned FalseLocalTaskJob reschedule handler
runningdeferredTaskDeferred raisedLocalTaskJob defer handler
runningshutdownSIGTERM / ‘Mark Failed’LocalTaskJob signal handler
up_for_retryscheduledretry_delay elapsedscheduler phase 2
up_for_reschedulescheduledreschedule_date ≤ now()scheduler phase 2
deferredscheduledTrigger fired eventtriggerer process
deferredfailedTrigger timeout / raisedtriggerer process
shutdownfailedCleanup completeLocalTaskJob signal handler
(любой non-terminal)failedManual ‘Mark Failed’ UIDagRun.set_task_instance_state
(любой)scheduled’Clear task instance’ UITI.clear_instance
restartingqueuedRestart cycleManual or job manager

Это полная таблица. Любой переход, не указанный здесь — bug или undefined behavior.


SQL для inspection state distribution

-- State distribution per DAG (sanity check)
SELECT dag_id, state, count(*) AS cnt
FROM task_instance
WHERE run_id LIKE 'scheduled%'
GROUP BY dag_id, state
ORDER BY dag_id, cnt DESC;

-- TIs stuck в scheduled > 5 минут (scheduler backlog problem)
SELECT dag_id, task_id, run_id, scheduled_dttm,
       now() - scheduled_dttm AS waiting
FROM task_instance
WHERE state = 'scheduled' AND scheduled_dttm < now() - interval '5 minutes'
ORDER BY scheduled_dttm;

-- Retry distribution — какие tasks retries-ятся часто (flaky)
SELECT dag_id, task_id, max(try_number) AS max_tries,
       count(*) FILTER (WHERE state = 'failed') AS total_failed,
       count(*) FILTER (WHERE state = 'success' AND try_number > 1) AS recovered_after_retry
FROM task_instance
GROUP BY dag_id, task_id
HAVING max(try_number) > 1
ORDER BY recovered_after_retry DESC;

-- Deferred tasks (если их много — проверить triggerer health)
SELECT t.trigger_id, t.classpath, t.created_date,
       now() - t.created_date AS age,
       ti.dag_id, ti.task_id, ti.state
FROM trigger t
JOIN task_instance ti ON ti.trigger_id = t.id
ORDER BY t.created_date DESC;

Production gotchas

1. try_number путаница 2.x vs 3.x

2.x: try_number incremented BEFORE state change на failure. Если retries=3 и task имела 2 failures + 1 success на 3rd — try_number в success row = 3.

3.x: try_number incremented AFTER, и теперь semantically “number of completed tries” — start at 0.

Impact: queries WHERE try_number = retries имеют разный смысл в 2.x vs 3.x.

2. Stuck в queued — most common issue

TI в queued state forever — обычно executor не подобрал task. Причины:

  • Celery worker died with task in queue (CELERY_TASK_REVOKED не дошло)
  • K8s executor — pod create failed silently
  • Pool slots wrong calculation

Diagnose: см. lesson 05 (zombies/orphans).

3. depends_on_past + manual run = forever in none

depends_on_past=True требует, чтобы previous run этого task был success. Если history пустая или previous failed — TI висит в none forever, не получает scheduled.

Fix: mark previous TI success вручную, или ignore_depends_on_past=True при trigger.

4. Deferrable + triggerer crashed

Если triggerer pod crash без graceful shutdown:

  • Trigger rows остаются в DB
  • TI navis в deferred forever
  • Scheduler с triggerer_health_check_threshold exceeded → reset state=scheduled, recreate trigger у другого triggerer

В реальности recovery работает плохо, если task передал large state через kwargs к trigger — deserialization может fail.

5. Скрытые state changes via SQL UPDATE

Некоторые operators делают direct DB UPDATE мимо state machine (например, custom HA logic). Это ломает audit trail и может конфликтовать со scheduler. Никогда не пишите code, который UPDATE task_instance.state напрямую — используйте TaskInstance.set_state API.


Comparison с 3.x state machine

State2.x3.x
All 13 statesВсе естьshutdown объединён в signal-based
try_number1-indexed после increment0-indexed before increment (AIP-65)
Task SDK (AIP-72)Не существуетOtherwise — те же state names
dag_version_id linkНет (verify_integrity по task_dict)Каждый TI знает свою dag_version (clean removed)

Core state machine не изменилась — это стабильный API с 2.0.


Проверка знанийKnowledge check
TI с retries=3, retry_exponential_backoff=True, retry_delay=60s, max_retry_delay=600s. После пяти fail attempts какой будет финальный state, и какие были delays между попытками?
ОтветAnswer
После 4 attempts (1 initial + 3 retries) TI будет state=failed (terminal). retries=3 значит 3 retry попыток после initial, всего 4. Пятой попытки не будет — после 3-его retry try_number=4, is_eligible_to_retry() возвращает False, state=failed final. Delays между попытками с exponential backoff: 60s (between try 1 и 2), 120s (between 2 и 3), 240s (between 3 и 4). 4-я delay (480s) была бы для 5-й попытки, но max_retry_delay=600s cap её бы не достиг — однако этой попытки уже не будет. Формула: delay_n = min(retry_delay * 2^(n-1), max_retry_delay). С max_retry_delay=600 cap activate на try 4-5 (480→600 noop, then 960→600 capped). Production tip: всегда устанавливайте max_retry_delay, иначе exponential с 10 retries даст 85 часов между последними попытками.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Сколько всего terminal states у TaskInstance и какие они?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6