Priority weight и weight_rule
Когда scheduler внутри critical section видит сотни scheduled TI и понимает, что в pool 8 свободных slots — какие 8 TI взять? Ответ — отсортировать по effective priority_weight и взять первые. Priority — это не «строгий priority queue», это tie-breaker в условиях ограниченных ресурсов. Но именно от него зависит, какие tasks production проигнорирует, а какие протиснутся первыми.
Этот урок разбирает как считается effective priority (это не то что вы задали в priority_weight=), три алгоритма weight_rule (downstream, upstream, absolute), и реальные ловушки default downstream.
Базовая модель: priority_weight на task
@task(priority_weight=10)
def important():
...
@task(priority_weight=1)
def normal():
...
Числовое значение, default 1. На уровне scheduler хранится в task_instance.priority_weight (вычисленное), на уровне TaskDefinition — priority_weight (raw config).
Однако при scheduling используется effective priority — вычисленное значение в зависимости от weight_rule. Default rule — downstream.
SLA и приоритизация задач в production pipeline
Три алгоритма weight_rule
| Rule | Формула effective priority | Когда выбирать |
|---|---|---|
downstream (default) | own priority + sum of all downstream tasks’ priorities (recursive) | “Если эта task важна для большого хвоста — приоритизировать её” |
upstream | own priority + sum of all upstream tasks’ priorities | ”Если эта task — финал длинной цепи — её приоритизировать, чтобы освободить ресурсы upstream” |
absolute | only own priority | ”Я знаю что делаю — берите ровно это число” |
Звучит абстрактно — разберём на конкретном графе.
Пример: DAG из 5 tasks
Граф: extract → {transform_a, transform_b} → load → notify.
Rule = downstream (default)
Считаем effective priority с конца:
notify: own=1, нет downstream → effective = 1load: own=10 + sum(downstream=notify=1) → effective = 11transform_a: own=1 + sum(load=11) → effective = 12transform_b: own=5 + sum(load=11) → effective = 16extract: own=1 + sum(transform_a=12 + transform_b=16) → effective = 29
Сортировка scheduler: extract (29) > transform_b (16) > transform_a (12) > load (11) > notify (1).
Логика: корни графа всегда имеют самый высокий effective priority, потому что их downstream хвост накапливается. Это правильно для большинства production cases — если корень не запустить, весь DAG стоит.
Rule = upstream
То же снизу вверх:
extract: own=1, нет upstream → effective = 1transform_a: own=1 + sum(extract=1) → effective = 2transform_b: own=5 + sum(extract=1) → effective = 6load: own=10 + sum(transform_a=2 + transform_b=6) → effective = 18notify: own=1 + sum(load=18) → effective = 19
Сортировка: notify (19) > load (18) > transform_b (6) > transform_a (2) > extract (1).
Логика обратная: листья самые приоритетные, чтобы скорее «закрыть» хвосты и освободить slots в pool.
Rule = absolute
Никакой recursion — только own:
extract: 1transform_a: 1transform_b: 5load: 10notify: 1
Сортировка: load (10) > transform_b (5) > extract = transform_a = notify (1).
Логика: «доверьтесь моим числам, не суммируйте». Используется когда вы explicitly задаёте priorities как ranking без учёта структуры графа.
Где применяется priority — внутри pool
Главный нюанс: priority применяется внутри pool, не глобально. Scheduler внутри critical section:
# Псевдокод scheduling
for pool in slot_pools:
open_slots = pool.slots - pool.used_slots
if open_slots <= 0:
continue
# Все scheduled TI этого pool, отсортированные по effective priority DESC
candidates = (
session.query(TaskInstance)
.filter(TaskInstance.pool == pool.pool)
.filter(TaskInstance.state == State.SCHEDULED)
.order_by(TaskInstance.priority_weight.desc()) # ← effective, precomputed
.limit(open_slots)
.all()
)
for ti in candidates:
if can_enqueue(ti):
ti.state = State.QUEUED
Это значит:
- Priority weight НЕ глобальная очередь — это локальная сортировка в пределах одного pool.
- Если у вас два pool
warehouseиapi_calls— task сpriority=100вwarehouseне имеет приоритета над task сpriority=1вapi_calls. Они в разных queue.
Поэтому разделение по pools — primary mechanism, priority — secondary tie-breaker.
Где хранится effective priority
В Airflow 2.x task_instance.priority_weight хранит effective значение (вычисленное по weight_rule), пересчитываемое при serialization DAG. Это значит:
- Изменили
priority_weighttask — нужен DAG re-parse, чтобы эффективный пересчитался - Изменили
weight_rule— то же самое - Backfill за прошлые runs использует priority которая была при serialization — не пересчитывается
SQL для проверки:
-- Effective priorities по DAG
SELECT
dag_id,
task_id,
priority_weight, -- ← это effective (после weight_rule)
state
FROM task_instance
WHERE state = 'scheduled'
AND dag_id = 'my_dag'
ORDER BY priority_weight DESC;
Почему default downstream иногда контр-интуитивен
Пример: у вас две DAG обе пишут в Snowflake через pool warehouse:
- DAG A — короткий, 3 tasks:
query → process → load_a, все default priority=1. - DAG B — длинный, 30 tasks fan-out:
query → [30 mapped] → load_b, все default priority=1.
С downstream (default):
- DAG A
query: effective = 1 + 1 + 1 = 3 - DAG B
query: effective = 1 + 30*1 + 1 = 32
DAG B всегда выигрывает первым в pool — потому что у него «больше хвост». Хотя по бизнес-логике DAG A может быть критичнее (например, SLA-critical короткий ETL).
Mitigation: либо явно поднять priority_weight на корень DAG A до большого значения, либо переключить weight_rule="absolute" и задать absolute priorities.
Production pattern: per-DAG priority pinning
Чтобы избежать downstream-bias, иногда полезно:
@dag(
dag_id="sla_critical_etl"
default_args={"weight_rule": "absolute"}, # ← все tasks в DAG используют absolute
)
def sla_critical():
@task(priority_weight=100) # ← high
def query(): ...
@task(priority_weight=100)
def transform(): ...
@task(priority_weight=100)
def load(): ...
load(transform(query()))
Или global pin через airflow.cfg:
[core]
default_task_weight_rule = absolute
Это меняет default для всех новых DAG.
Comparison table — когда какое rule
| Сценарий | Лучшее rule | Почему |
|---|---|---|
| Standard ETL pipelines, корни критичны | downstream (default) | Прокачивает корни — без них DAG не двинется |
| ML inference DAG, финальные predictions важны | upstream | Финал получит приоритет, освободит pool |
| SLA-tight, явный manual ranking | absolute | Никаких сюрпризов от структуры графа |
| Mixed pool с разными DAG | absolute + per-DAG offset | Контролируемая cross-DAG приоритизация |
| Fan-out с одинаковыми parallel tasks | любое (они все equal effective) | Не имеет значения — все одинаковы |
SQL для visualization priority
Чтобы понять кто выиграет в pool:
SELECT
ti.dag_id,
ti.task_id,
ti.priority_weight AS effective_priority,
ti.state,
ti.pool,
ti.queued_dttm
FROM task_instance ti
JOIN slot_pool sp ON ti.pool = sp.pool
WHERE ti.state = 'scheduled'
AND ti.pool = 'warehouse' -- интересный pool
ORDER BY ti.priority_weight DESC, ti.queued_dttm ASC
LIMIT 50;
Topmost 50 — это next batch который заберёт scheduler в following tick (при freed slots).
Production gotchas
1. weight_rule="downstream" + fan-out → корни всегда блокируют. При 100 mapped tasks корневая task имеет effective ~100, остальные ~1. Если pool узкий — корни новых DagRun запустятся раньше, чем хвосты предыдущих закончатся. Может создавать «накопление» queued TI.
2. Изменение priority не действует на уже scheduled TI. Effective priority precomputed при DAG serialization. Если меняете priority_weight — старые scheduled TI имеют старое значение. Только новые DagRun-ы получат новые числа.
3. Negative priority — для deprecation. Можно priority_weight=-10 для «надо запустить, но в последнюю очередь». Используется для cleanup tasks, которые должны идти когда всё свободно.
4. Cross-DAG приоритеты — только через pool разделение. Один pool = одна priority queue. Не делайте все production tasks в default_pool ожидая что priority поможет — лучше per-resource pools + priority внутри.
5. default_task_weight_rule в airflow.cfg — global default. Если ваш team привык к absolute, поставьте в config. Это глобально и нет сюрпризов при чтении кода.