Job submission lifecycle — путь task от trigger до completion
Чтобы интернализовать архитектуру Airflow, полезно пройти полный жизненный цикл одной task — от момента её создания (manual trigger или schedule fire) до записи результата в БД. Этот урок — детальный walkthrough с timestamps, SQL queries и process events на каждом шаге.
Будем использовать пример: DAG daily_etl с одной task extract_orders, scheduled @daily, на CeleryExecutor.
Высокоуровневая последовательность
Подробный walkthrough — шаг за шагом
Допустим, время — 2026-05-13 00:00:01 UTC. DAG daily_etl configured как schedule="@daily", start_date=datetime(2026, 1, 1). Последний DagRun был за 2026-05-12.
Шаг 0: Setup state в DB (на 2026-05-13 00:00:00)
-- dag table:
dag_id | next_dagrun | next_dagrun_create_after
daily_etl | 2026-05-13 00:00:00 | 2026-05-13 00:00:00
Шаг 1: Scheduler main loop tick — Phase 1 (Create DagRun)
2026-05-13 00:00:05 — scheduler-1 tick:
# В _create_dagruns_for_dags
dags = session.query(DagModel).filter(
DagModel.is_paused == False,
DagModel.next_dagrun_create_after <= now()
).all()
# returns [daily_etl]
dag_run = DagRun(
dag_id='daily_etl',
run_id='scheduled__2026-05-13T00:00:00+00:00',
logical_date=datetime(2026, 5, 13),
state='queued',
run_type='scheduled',
)
session.add(dag_run)
# Advance next_dagrun
dag.next_dagrun = datetime(2026, 5, 14)
dag.next_dagrun_create_after = datetime(2026, 5, 14)
session.commit()
В БД появилось:
-- dag_run:
dag_id | run_id | state | start_date
daily_etl | scheduled__2026-05-13T00:00:00+00:00 | queued | NULL
Шаг 2: Scheduler Phase 2 — Create TI
В том же tick scheduler видит новый DagRun:
def _schedule_dag_run(self, dag_run, session):
dag = serialized_dag_to_dag(dag_run.dag_id)
for task in dag.tasks: # просто extract_orders
ti = TaskInstance(
task=task,
run_id=dag_run.run_id,
state=None, # не scheduled пока deps не satisfied
)
session.add(ti)
-- task_instance:
dag_id | task_id | run_id | state
daily_etl | extract_orders | scheduled__2026-05-13T00:00:00+00:00 | NULL
Шаг 3: TI → scheduled
В Phase 2 scheduler проверяет upstream deps. extract_orders не имеет upstream → deps satisfied → переходит в scheduled:
if all_upstreams_done(ti):
ti.state = 'scheduled'
ti.scheduled_dttm = now()
-- task_instance:
state='scheduled', scheduled_dttm='2026-05-13 00:00:05.123'
Также DagRun переходит queued → running (потому что хотя бы одна TI scheduled).
Шаг 4: Phase 3 — Critical Section → queued
Scheduler берёт row-level lock на slot_pool:
pools = session.query(Pool).with_for_update(nowait=True).all()
# SELECT * FROM slot_pool FOR UPDATE NOWAIT
# Проверяем pool limits
default_pool = pools['default_pool']
if default_pool.available_slots() >= 1:
ti.state = 'queued'
ti.queued_dttm = now()
default_pool.used_slots += 1
session.commit() # ← lock released
# Push command в Celery broker
executor.queue_command(ti.command())
ti.command() возвращает что-то типа:
airflow tasks run daily_etl extract_orders scheduled__2026-05-13T00:00:00+00:00
--local --pool default_pool
Celery broker (Redis) получает message в queue default:
LPUSH celery {
"id": "abc-123",
"task": "airflow.executors.celery_executor.execute_command",
"args": [["airflow", "tasks", "run", "daily_etl", "extract_orders", ...]]
}
Шаг 5: Celery worker picks up
2026-05-13 00:00:06 — один из celery workers делает BRPOP:
# Celery worker
task = redis.brpop('celery', timeout=10)
# Returns message
command = task['args'][0]
# ['airflow', 'tasks', 'run', 'daily_etl', 'extract_orders', ...]
# Создаёт subprocess для task
subprocess.run(command)
Subprocess запускает airflow tasks run — это создаёт LocalTaskJob:
class LocalTaskJob(BaseJob):
def _execute(self):
# Запись heartbeat в job table
self.heartbeat_thread = HeartbeatThread()
# Запуск actual task
self.task_runner = StandardTaskRunner(self)
self.task_runner.start()
Шаг 6: TI → running
LocalTaskJob обновляет state:
ti.state = 'running'
ti.start_date = now()
ti.hostname = socket.gethostname()
ti.pid = os.getpid()
ti.job_id = self.job_id # для adoption если worker умрёт
session.commit()
state='running', start_date='2026-05-13 00:00:07.456', pid=12345
Шаг 7: execute() runs
# Если это TaskFlow @task:
result = extract_orders() # ваш Python код
# XCom push (если return value)
ti.xcom_push(key='return_value', value=result)
Что происходит параллельно:
- Heartbeat thread пишет
ti.last_heartbeat = now()каждые 60s — для zombie detection - Logs пишутся в файл и/или S3 (через
log_handlerprovider) - OTel metrics emit-ятся
Если task делает Variable.get():
val = Variable.get('my_var')
# → SQL: SELECT * FROM variable WHERE key='my_var'
# (в 2.x — прямой SQL из worker)
Шаг 8: TI → success
Allows say task завершился за 30 секунд:
# В StandardTaskRunner после execute() success
ti.state = 'success'
ti.end_date = now()
ti.duration = (ti.end_date - ti.start_date).total_seconds()
# on_success_callback
if ti.task.on_success_callback:
ti.task.on_success_callback(context)
# XCom уже pushed на Шаге 7
session.commit()
state='success', end_date='2026-05-13 00:00:37', duration=30
Celery worker делает ACK message в Redis (по celery_acks_late=True). Pool slot освобождается:
UPDATE slot_pool SET used_slots = used_slots - 1 WHERE pool='default_pool';
Шаг 9: DagRun → success
В следующем scheduler tick:
# В _schedule_dag_run
all_terminal = all(ti.state in TERMINAL_STATES for ti in dag_run.tis)
if all_terminal:
if any(ti.state == 'failed' for ti in dag_run.tis):
dag_run.state = 'failed'
else:
dag_run.state = 'success'
dag_run.end_date = now()
-- dag_run:
state='success', end_date='2026-05-13 00:00:38'
DAG run callback (если есть):
if dag.on_success_callback:
dag.on_success_callback(context)
Что происходит при failure?
Если в Шаге 7 task поднял exception:
try:
result = extract_orders()
except Exception:
ti.state = 'failed'
ti.end_date = now()
ti.try_number += 1
if ti.try_number < ti.task.retries + 1:
# Has retries — schedule retry
ti.state = 'up_for_retry'
ti.next_retry_datetime = now() + ti.task.retry_delay
else:
# No retries left — failed permanently
if ti.task.on_failure_callback:
ti.task.on_failure_callback(context)
session.commit()
up_for_retry → next tick scheduler видит, что next_retry_datetime <= now() и переводит обратно в scheduled (Шаг 3 повторяется).
Что происходит при worker death?
Если Celery worker умирает посередине Шага 7:
- Heartbeat прокис (
last_heartbeatне обновляется). - Через
scheduler_zombie_task_threshold(default 300s) scheduler в Phase 4 housekeeping detects:SELECT * FROM task_instance WHERE state = 'running' AND last_heartbeat < now() - interval '300 seconds'; - Помечает как failed:
ti.state = 'failed' # Применяет retry logic
В модуле 04 lesson 05 детально про zombies/orphans.
Timeline визуализация
00:00:00.000 scheduler tick begins (Phase 1)
00:00:05.123 DagRun created (state=queued)
00:00:05.124 TI created (state=None)
00:00:05.125 TI → scheduled (deps met)
00:00:05.130 TI → queued (critical section)
00:00:05.131 Command pushed to Celery broker
00:00:06.000 Celery worker BRPOP message
00:00:07.456 LocalTaskJob starts, TI → running
00:00:07.500 execute() begins
00:00:37.000 execute() success
00:00:37.100 TI → success, XCom written
00:00:37.150 Celery ACK
00:00:40.000 Next scheduler tick — DagRun → success
End-to-end 40 секунд для 30-секундной user code. Overhead ≈ 10s — это типично для CeleryExecutor.
KubernetesExecutor: тот же путь, но Шаги 5-6 включают:
kubectl apply -f pod.yaml(5-10s)- Pull image (если не cached) (5-30s)
- Init container (Airflow setup) (2-5s)
- Python startup (1-3s)
Итого 60-90s overhead на короткие tasks. Поэтому Celery для коротких, K8s для долгих.
Kafka Consumer API — как workers получают задачи из очереди
Что наблюдать в production
Smart queries для lifecycle insight:
-- Latency от scheduling до start (queueing time)
SELECT dag_id, task_id,
AVG(EXTRACT(epoch FROM (start_date - queued_dttm))) AS avg_queue_time_sec
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY dag_id, task_id
ORDER BY avg_queue_time_sec DESC;
Если queue time > 30s — executor underprovisioned.
-- Run time variance — detecting flaky tasks
SELECT dag_id, task_id,
AVG(duration) AS avg,
STDDEV(duration) AS stddev,
MAX(duration) AS max
FROM task_instance
WHERE state = 'success' AND end_date > now() - interval '24 hours'
GROUP BY dag_id, task_id
HAVING STDDEV(duration) > AVG(duration);