Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 28 мин
Продвинутый
PriorityWeight RuleSchedulerPoolSorting

Priority weight и weight_rule

Когда scheduler внутри critical section видит сотни scheduled TI и понимает, что в pool 8 свободных slots — какие 8 TI взять? Ответ — отсортировать по effective priority_weight и взять первые. Priority — это не «строгий priority queue», это tie-breaker в условиях ограниченных ресурсов. Но именно от него зависит, какие tasks production проигнорирует, а какие протиснутся первыми.

Этот урок разбирает как считается effective priority (это не то что вы задали в priority_weight=), три алгоритма weight_rule (downstream, upstream, absolute), и реальные ловушки default downstream.


Базовая модель: priority_weight на task

@task(priority_weight=10)
def important():
    ...

@task(priority_weight=1)
def normal():
    ...

Числовое значение, default 1. На уровне scheduler хранится в task_instance.priority_weight (вычисленное), на уровне TaskDefinition — priority_weight (raw config).

Однако при scheduling используется effective priority — вычисленное значение в зависимости от weight_rule. Default rule — downstream.


SLA и приоритизация задач в production pipeline

Три алгоритма weight_rule

RuleФормула effective priorityКогда выбирать
downstream (default)own priority + sum of all downstream tasks’ priorities (recursive)“Если эта task важна для большого хвоста — приоритизировать её”
upstreamown priority + sum of all upstream tasks’ priorities”Если эта task — финал длинной цепи — её приоритизировать, чтобы освободить ресурсы upstream”
absoluteonly own priority”Я знаю что делаю — берите ровно это число”

Звучит абстрактно — разберём на конкретном графе.


Пример: DAG из 5 tasks

DAG для расчёта priority
extract (own=1)Корневая task, всех вытаскивает данные. priority_weight=1 (default). Имеет 4 downstream — transform_a, transform_b, load, notify.
transform_a (own=1)Обычный transform. priority=1. Downstream: load.
transform_b (own=5)Дороже трансформация — explicitly priority=5. Downstream: load.
load (own=10)Финальная загрузка. priority=10 — потому что критично завершить runtime. Downstream: notify.
notify (own=1)Финал, отправка Slack. priority=1. Без downstream.

Граф: extract → {transform_a, transform_b} → load → notify.

Rule = downstream (default)

Считаем effective priority с конца:

  • notify: own=1, нет downstream → effective = 1
  • load: own=10 + sum(downstream=notify=1) → effective = 11
  • transform_a: own=1 + sum(load=11) → effective = 12
  • transform_b: own=5 + sum(load=11) → effective = 16
  • extract: own=1 + sum(transform_a=12 + transform_b=16) → effective = 29

Сортировка scheduler: extract (29) > transform_b (16) > transform_a (12) > load (11) > notify (1).

Логика: корни графа всегда имеют самый высокий effective priority, потому что их downstream хвост накапливается. Это правильно для большинства production cases — если корень не запустить, весь DAG стоит.

Rule = upstream

То же снизу вверх:

  • extract: own=1, нет upstream → effective = 1
  • transform_a: own=1 + sum(extract=1) → effective = 2
  • transform_b: own=5 + sum(extract=1) → effective = 6
  • load: own=10 + sum(transform_a=2 + transform_b=6) → effective = 18
  • notify: own=1 + sum(load=18) → effective = 19

Сортировка: notify (19) > load (18) > transform_b (6) > transform_a (2) > extract (1).

Логика обратная: листья самые приоритетные, чтобы скорее «закрыть» хвосты и освободить slots в pool.

Rule = absolute

Никакой recursion — только own:

  • extract: 1
  • transform_a: 1
  • transform_b: 5
  • load: 10
  • notify: 1

Сортировка: load (10) > transform_b (5) > extract = transform_a = notify (1).

Логика: «доверьтесь моим числам, не суммируйте». Используется когда вы explicitly задаёте priorities как ranking без учёта структуры графа.


Где применяется priority — внутри pool

Главный нюанс: priority применяется внутри pool, не глобально. Scheduler внутри critical section:

# Псевдокод scheduling
for pool in slot_pools:
    open_slots = pool.slots - pool.used_slots
    if open_slots <= 0:
        continue

    # Все scheduled TI этого pool, отсортированные по effective priority DESC
    candidates = (
        session.query(TaskInstance)
        .filter(TaskInstance.pool == pool.pool)
        .filter(TaskInstance.state == State.SCHEDULED)
        .order_by(TaskInstance.priority_weight.desc())  # ← effective, precomputed
        .limit(open_slots)
        .all()
    )

    for ti in candidates:
        if can_enqueue(ti):
            ti.state = State.QUEUED

Это значит:

  • Priority weight НЕ глобальная очередь — это локальная сортировка в пределах одного pool.
  • Если у вас два pool warehouse и api_calls — task с priority=100 в warehouse не имеет приоритета над task с priority=1 в api_calls. Они в разных queue.

Поэтому разделение по pools — primary mechanism, priority — secondary tie-breaker.


Где хранится effective priority

В Airflow 2.x task_instance.priority_weight хранит effective значение (вычисленное по weight_rule), пересчитываемое при serialization DAG. Это значит:

  • Изменили priority_weight task — нужен DAG re-parse, чтобы эффективный пересчитался
  • Изменили weight_rule — то же самое
  • Backfill за прошлые runs использует priority которая была при serialization — не пересчитывается

SQL для проверки:

-- Effective priorities по DAG
SELECT
    dag_id,
    task_id,
    priority_weight,  -- ← это effective (после weight_rule)
    state
FROM task_instance
WHERE state = 'scheduled'
  AND dag_id = 'my_dag'
ORDER BY priority_weight DESC;

Почему default downstream иногда контр-интуитивен

Пример: у вас две DAG обе пишут в Snowflake через pool warehouse:

  • DAG A — короткий, 3 tasks: query → process → load_a, все default priority=1.
  • DAG B — длинный, 30 tasks fan-out: query → [30 mapped] → load_b, все default priority=1.

С downstream (default):

  • DAG A query: effective = 1 + 1 + 1 = 3
  • DAG B query: effective = 1 + 30*1 + 1 = 32

DAG B всегда выигрывает первым в pool — потому что у него «больше хвост». Хотя по бизнес-логике DAG A может быть критичнее (например, SLA-critical короткий ETL).

Mitigation: либо явно поднять priority_weight на корень DAG A до большого значения, либо переключить weight_rule="absolute" и задать absolute priorities.


Production pattern: per-DAG priority pinning

Чтобы избежать downstream-bias, иногда полезно:

@dag(
    dag_id="sla_critical_etl"
    default_args={"weight_rule": "absolute"},  # ← все tasks в DAG используют absolute
)
def sla_critical():
    @task(priority_weight=100)  # ← high
    def query(): ...

    @task(priority_weight=100)
    def transform(): ...

    @task(priority_weight=100)
    def load(): ...

    load(transform(query()))

Или global pin через airflow.cfg:

[core]
default_task_weight_rule = absolute

Это меняет default для всех новых DAG.


Comparison table — когда какое rule

СценарийЛучшее ruleПочему
Standard ETL pipelines, корни критичныdownstream (default)Прокачивает корни — без них DAG не двинется
ML inference DAG, финальные predictions важныupstreamФинал получит приоритет, освободит pool
SLA-tight, явный manual rankingabsoluteНикаких сюрпризов от структуры графа
Mixed pool с разными DAGabsolute + per-DAG offsetКонтролируемая cross-DAG приоритизация
Fan-out с одинаковыми parallel tasksлюбое (они все equal effective)Не имеет значения — все одинаковы

SQL для visualization priority

Чтобы понять кто выиграет в pool:

SELECT
    ti.dag_id,
    ti.task_id,
    ti.priority_weight AS effective_priority,
    ti.state,
    ti.pool,
    ti.queued_dttm
FROM task_instance ti
JOIN slot_pool sp ON ti.pool = sp.pool
WHERE ti.state = 'scheduled'
  AND ti.pool = 'warehouse'  -- интересный pool
ORDER BY ti.priority_weight DESC, ti.queued_dttm ASC
LIMIT 50;

Topmost 50 — это next batch который заберёт scheduler в following tick (при freed slots).


Production gotchas

1. weight_rule="downstream" + fan-out → корни всегда блокируют. При 100 mapped tasks корневая task имеет effective ~100, остальные ~1. Если pool узкий — корни новых DagRun запустятся раньше, чем хвосты предыдущих закончатся. Может создавать «накопление» queued TI.

2. Изменение priority не действует на уже scheduled TI. Effective priority precomputed при DAG serialization. Если меняете priority_weight — старые scheduled TI имеют старое значение. Только новые DagRun-ы получат новые числа.

3. Negative priority — для deprecation. Можно priority_weight=-10 для «надо запустить, но в последнюю очередь». Используется для cleanup tasks, которые должны идти когда всё свободно.

4. Cross-DAG приоритеты — только через pool разделение. Один pool = одна priority queue. Не делайте все production tasks в default_pool ожидая что priority поможет — лучше per-resource pools + priority внутри.

5. default_task_weight_rule в airflow.cfg — global default. Если ваш team привык к absolute, поставьте в config. Это глобально и нет сюрпризов при чтении кода.


Проверка знанийKnowledge check
DAG имеет fan-out из 30 mapped tasks. Pool 'warehouse' с 8 slots. Все priority_weight=1 default, weight_rule=downstream default. Будут ли все 30 mapped tasks равны по priority при scheduling?
ОтветAnswer
Да, при weight_rule=downstream все 30 mapped tasks имеют equal effective priority (1 + сумма downstream). Если mapped task — лист графа без downstream, у каждой effective=1. Если у них один общий downstream — у каждой effective=1+1=2, тоже equal. Scheduler возьмёт первые 8 в order priority DESC, tiebreaker — queued_dttm ASC (старее first). С точки зрения user — будут запускаться по порядку map_index 0,1,2..., но это side effect ordering DB query, не priority. Главный takeaway: priority_weight различает tasks только если у них РАЗНАЯ структура downstream subgraph, иначе они equal. Для mapped tasks с homogeneous workload priority бесполезен — нужно либо разные effective через absolute + per-index priority (редко), либо принять FIFO ordering. Если хотите явный per-index priority — нужно строить DAG динамически с explicit absolute priorities на каждую task, что defeats суть mapping.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. weight_rule='downstream' (default) для task X = own_priority + sum(downstream_priorities) recursive. Если граф extract → transform → load, все priority=1, какое effective priority для extract?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6