Learning Platform
Глоссарий Troubleshooting
Урок 02.06 · 25 мин
Продвинутый
LifecycleDagRunTaskInstanceExecutor

Job submission lifecycle — путь task от trigger до completion

Чтобы интернализовать архитектуру Airflow, полезно пройти полный жизненный цикл одной task — от момента её создания (manual trigger или schedule fire) до записи результата в БД. Этот урок — детальный walkthrough с timestamps, SQL queries и process events на каждом шаге.

Будем использовать пример: DAG daily_etl с одной task extract_orders, scheduled @daily, на CeleryExecutor.


Высокоуровневая последовательность

Lifecycle одной task в Airflow 2.x
Шаг 1: DagRun createdScheduler в Phase 1 main loop читает dag.next_dagrun_create_after. Когда время пришло — создаёт DagRun(state=queued).
schedule trigger
Шаг 2: TI createdScheduler в Phase 2 создаёт TaskInstance для каждой task в DAG. State = None.
upstream deps satisfied (нет upstream)
Шаг 3: TI → scheduledВсе upstream deps met. Scheduler меняет state = scheduled.
critical section
Шаг 4: TI → queuedВ Phase 3 (critical section) scheduler берёт row-level lock на slot_pool, проверяет concurrency, переводит scheduled → queued, кладёт command в Celery broker.
Celery broker (Redis)
Шаг 5: Worker picks upCelery worker делает BRPOP из Redis queue, получает command. Создаёт LocalTaskJob, запускает task subprocess.
Шаг 6: TI → runningLocalTaskJob обновляет state в БД на running, запускает execute() метод operator.
execute()
Шаг 7: Task executesexecute() метод выполняется. XCom push (если есть), logs writes (S3 или файл). Heartbeat каждые scheduler_heartbeat_sec.
success / failure
Шаг 8: TI → success / failedПосле execute() — UPDATE state. Callbacks (on_success_callback / on_failure_callback). Celery ACK message.
Шаг 9: DagRun → successScheduler в следующем tick видит — все TI в DagRun достигли terminal state. Если все success/skipped — DagRun → success.

Подробный walkthrough — шаг за шагом

Допустим, время — 2026-05-13 00:00:01 UTC. DAG daily_etl configured как schedule="@daily", start_date=datetime(2026, 1, 1). Последний DagRun был за 2026-05-12.

Шаг 0: Setup state в DB (на 2026-05-13 00:00:00)

-- dag table:
dag_id          | next_dagrun                 | next_dagrun_create_after
daily_etl       | 2026-05-13 00:00:00         | 2026-05-13 00:00:00

Шаг 1: Scheduler main loop tick — Phase 1 (Create DagRun)

2026-05-13 00:00:05 — scheduler-1 tick:

# В _create_dagruns_for_dags
dags = session.query(DagModel).filter(
    DagModel.is_paused == False,
    DagModel.next_dagrun_create_after <= now()
).all()
# returns [daily_etl]

dag_run = DagRun(
    dag_id='daily_etl',
    run_id='scheduled__2026-05-13T00:00:00+00:00',
    logical_date=datetime(2026, 5, 13),
    state='queued',
    run_type='scheduled',
)
session.add(dag_run)

# Advance next_dagrun
dag.next_dagrun = datetime(2026, 5, 14)
dag.next_dagrun_create_after = datetime(2026, 5, 14)
session.commit()

В БД появилось:

-- dag_run:
dag_id     | run_id                                       | state  | start_date
daily_etl  | scheduled__2026-05-13T00:00:00+00:00         | queued | NULL

Шаг 2: Scheduler Phase 2 — Create TI

В том же tick scheduler видит новый DagRun:

def _schedule_dag_run(self, dag_run, session):
    dag = serialized_dag_to_dag(dag_run.dag_id)

    for task in dag.tasks:  # просто extract_orders
        ti = TaskInstance(
            task=task,
            run_id=dag_run.run_id,
            state=None,  # не scheduled пока deps не satisfied
        )
        session.add(ti)
-- task_instance:
dag_id     | task_id        | run_id                                | state
daily_etl  | extract_orders | scheduled__2026-05-13T00:00:00+00:00  | NULL

Шаг 3: TI → scheduled

В Phase 2 scheduler проверяет upstream deps. extract_orders не имеет upstream → deps satisfied → переходит в scheduled:

if all_upstreams_done(ti):
    ti.state = 'scheduled'
    ti.scheduled_dttm = now()
-- task_instance:
state='scheduled', scheduled_dttm='2026-05-13 00:00:05.123'

Также DagRun переходит queued → running (потому что хотя бы одна TI scheduled).

Шаг 4: Phase 3 — Critical Section → queued

Scheduler берёт row-level lock на slot_pool:

pools = session.query(Pool).with_for_update(nowait=True).all()
# SELECT * FROM slot_pool FOR UPDATE NOWAIT

# Проверяем pool limits
default_pool = pools['default_pool']
if default_pool.available_slots() >= 1:
    ti.state = 'queued'
    ti.queued_dttm = now()
    default_pool.used_slots += 1
    session.commit()  # ← lock released

    # Push command в Celery broker
    executor.queue_command(ti.command())

ti.command() возвращает что-то типа:

airflow tasks run daily_etl extract_orders scheduled__2026-05-13T00:00:00+00:00
  --local --pool default_pool

Celery broker (Redis) получает message в queue default:

LPUSH celery {
  "id": "abc-123",
  "task": "airflow.executors.celery_executor.execute_command",
  "args": [["airflow", "tasks", "run", "daily_etl", "extract_orders", ...]]
}

Шаг 5: Celery worker picks up

2026-05-13 00:00:06 — один из celery workers делает BRPOP:

# Celery worker
task = redis.brpop('celery', timeout=10)
# Returns message
command = task['args'][0]
# ['airflow', 'tasks', 'run', 'daily_etl', 'extract_orders', ...]

# Создаёт subprocess для task
subprocess.run(command)

Subprocess запускает airflow tasks run — это создаёт LocalTaskJob:

class LocalTaskJob(BaseJob):
    def _execute(self):
        # Запись heartbeat в job table
        self.heartbeat_thread = HeartbeatThread()

        # Запуск actual task
        self.task_runner = StandardTaskRunner(self)
        self.task_runner.start()

Шаг 6: TI → running

LocalTaskJob обновляет state:

ti.state = 'running'
ti.start_date = now()
ti.hostname = socket.gethostname()
ti.pid = os.getpid()
ti.job_id = self.job_id  # для adoption если worker умрёт
session.commit()
state='running', start_date='2026-05-13 00:00:07.456', pid=12345

Шаг 7: execute() runs

# Если это TaskFlow @task:
result = extract_orders()  # ваш Python код

# XCom push (если return value)
ti.xcom_push(key='return_value', value=result)

Что происходит параллельно:

  • Heartbeat thread пишет ti.last_heartbeat = now() каждые 60s — для zombie detection
  • Logs пишутся в файл и/или S3 (через log_handler provider)
  • OTel metrics emit-ятся

Если task делает Variable.get():

val = Variable.get('my_var')
# → SQL: SELECT * FROM variable WHERE key='my_var'
# (в 2.x — прямой SQL из worker)

Шаг 8: TI → success

Allows say task завершился за 30 секунд:

# В StandardTaskRunner после execute() success
ti.state = 'success'
ti.end_date = now()
ti.duration = (ti.end_date - ti.start_date).total_seconds()

# on_success_callback
if ti.task.on_success_callback:
    ti.task.on_success_callback(context)

# XCom уже pushed на Шаге 7

session.commit()
state='success', end_date='2026-05-13 00:00:37', duration=30

Celery worker делает ACK message в Redis (по celery_acks_late=True). Pool slot освобождается:

UPDATE slot_pool SET used_slots = used_slots - 1 WHERE pool='default_pool';

Шаг 9: DagRun → success

В следующем scheduler tick:

# В _schedule_dag_run
all_terminal = all(ti.state in TERMINAL_STATES for ti in dag_run.tis)
if all_terminal:
    if any(ti.state == 'failed' for ti in dag_run.tis):
        dag_run.state = 'failed'
    else:
        dag_run.state = 'success'
    dag_run.end_date = now()
-- dag_run:
state='success', end_date='2026-05-13 00:00:38'

DAG run callback (если есть):

if dag.on_success_callback:
    dag.on_success_callback(context)

Что происходит при failure?

Если в Шаге 7 task поднял exception:

try:
    result = extract_orders()
except Exception:
    ti.state = 'failed'
    ti.end_date = now()
    ti.try_number += 1

    if ti.try_number < ti.task.retries + 1:
        # Has retries — schedule retry
        ti.state = 'up_for_retry'
        ti.next_retry_datetime = now() + ti.task.retry_delay
    else:
        # No retries left — failed permanently
        if ti.task.on_failure_callback:
            ti.task.on_failure_callback(context)

session.commit()

up_for_retry → next tick scheduler видит, что next_retry_datetime &lt;= now() и переводит обратно в scheduled (Шаг 3 повторяется).


Что происходит при worker death?

Если Celery worker умирает посередине Шага 7:

  1. Heartbeat прокис (last_heartbeat не обновляется).
  2. Через scheduler_zombie_task_threshold (default 300s) scheduler в Phase 4 housekeeping detects:
    SELECT * FROM task_instance
    WHERE state = 'running'
      AND last_heartbeat < now() - interval '300 seconds';
  3. Помечает как failed:
    ti.state = 'failed'
    # Применяет retry logic

В модуле 04 lesson 05 детально про zombies/orphans.


Timeline визуализация

00:00:00.000  scheduler tick begins (Phase 1)
00:00:05.123  DagRun created (state=queued)
00:00:05.124  TI created (state=None)
00:00:05.125  TI → scheduled (deps met)
00:00:05.130  TI → queued (critical section)
00:00:05.131  Command pushed to Celery broker
00:00:06.000  Celery worker BRPOP message
00:00:07.456  LocalTaskJob starts, TI → running
00:00:07.500  execute() begins
00:00:37.000  execute() success
00:00:37.100  TI → success, XCom written
00:00:37.150  Celery ACK
00:00:40.000  Next scheduler tick — DagRun → success

End-to-end 40 секунд для 30-секундной user code. Overhead ≈ 10s — это типично для CeleryExecutor.

KubernetesExecutor: тот же путь, но Шаги 5-6 включают:

  • kubectl apply -f pod.yaml (5-10s)
  • Pull image (если не cached) (5-30s)
  • Init container (Airflow setup) (2-5s)
  • Python startup (1-3s)

Итого 60-90s overhead на короткие tasks. Поэтому Celery для коротких, K8s для долгих.


Kafka Consumer API — как workers получают задачи из очереди

Что наблюдать в production

Smart queries для lifecycle insight:

-- Latency от scheduling до start (queueing time)
SELECT dag_id, task_id,
       AVG(EXTRACT(epoch FROM (start_date - queued_dttm))) AS avg_queue_time_sec
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY dag_id, task_id
ORDER BY avg_queue_time_sec DESC;

Если queue time > 30s — executor underprovisioned.

-- Run time variance — detecting flaky tasks
SELECT dag_id, task_id,
       AVG(duration) AS avg,
       STDDEV(duration) AS stddev,
       MAX(duration) AS max
FROM task_instance
WHERE state = 'success' AND end_date > now() - interval '24 hours'
GROUP BY dag_id, task_id
HAVING STDDEV(duration) > AVG(duration);

Проверка знанийKnowledge check
DAG ваш scheduled @daily, и вы видите что task начала выполнение в 00:00:37 вместо 00:00:00. Откуда взялись 37 секунд delay? Из чего они состоят?
ОтветAnswer
Lifecycle overhead — нормальное явление. Расклад типичных задержек на CeleryExecutor: (1) scheduler tick: до 5s ждать следующего main loop (scheduler_heartbeat_sec=5); (2) Phase 1-3 main loop: ~50-200ms (быстро при нормальной DB); (3) Critical section + push to broker: ~100-300ms; (4) Celery worker BRPOP: 0-1s (зависит от prefetch); (5) Worker subprocess startup: 1-3s (Python init, imports); (6) LocalTaskJob start, heartbeat thread, DB update state=running: 100-200ms. Итого 1-10 секунд overhead для типичного task. 37 секунд могут означать: scheduler loop overloaded (>5s tick), broker connection issues, worker startup slow (heavy imports). KubernetesExecutor дополнительно добавляет 15-30s на pod creation, image pull, init container.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какая последовательность state transitions проходит TaskInstance от создания до success?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6