Learning Platform
Глоссарий Troubleshooting
Урок 04.05 · 22 мин
Продвинутый
BranchingShortCircuittrigger_ruleConditionalSkip

Branching и conditional skip

Любой реальный pipeline рано или поздно встречает развилку: «если суббота — пропусти load в OLTP», «если A/B test — отправь 10% в variant», «если dev environment — не звони в Slack». В Airflow есть три механизма для управления потоком — BranchPythonOperator (выбор одной из веток), ShortCircuitOperator (skip всего downstream), и trigger_rule (политика запуска при смешанных состояниях upstream).

Retry и backoff: как переживать временные сбои

Этот урок разбирает все три и показывает, как они комбинируются, плюс распространённые паттерны и подводные камни trigger_rules после branching.


Branching — выбор ветки

BranchPythonOperator (и его TaskFlow аналог @task.branch) — особый PythonOperator, который возвращает task_id (или список task_id) тех downstream tasks, которые должны выполниться. Все остальные downstream tasks получают state skipped.

from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def branching_demo():
    @task.branch
    def pick_path(**context) -> str:
        date = context["data_interval_start"]
        if date.weekday() < 5:
            return "weekday_load"
        return "weekend_skip"

    @task
    def weekday_load():
        return "loaded weekday batch"

    weekend_skip = EmptyOperator(task_id="weekend_skip")

    end = EmptyOperator(task_id="end", trigger_rule="none_failed_min_one_success")

    branch = pick_path()
    branch >> [weekday_load(), weekend_skip] >> end

branching_demo()

Что произойдёт в субботу:

  1. pick_path запускается, возвращает "weekend_skip".
  2. Scheduler видит return value, помечает все downstream tasks branch’а, кроме weekend_skip, как skipped (включая weekday_load).
  3. weekend_skip (EmptyOperator) запускается тривиально, state success.
  4. end имеет два upstream: weekday_load (skipped) и weekend_skip (success). Default trigger_rule all_success сделал бы его тоже skipped. Поэтому ставим none_failed_min_one_success.
Branching state propagation
pick_path → SUCCESSBranch task возвращает task_id выбранной ветки. На основе return value scheduler помечает остальные downstream как skipped.
weekday_load → SKIPPEDНе выбран branch, скипается до execute. Даже если trigger_rule=all_success у downstream, skip propagates.
weekend_skip → SUCCESSВыбранная ветка, запускается нормально.
end (trigger_rule=none_failed_min_one_success)С default all_success end был бы skipped (потому что один upstream skipped). С none_failed_min_one_success — запускается, потому что есть как минимум один success и нет failed upstream.

Классический BranchPythonOperator

Без TaskFlow:

from airflow.operators.python import BranchPythonOperator

def choose(**context) -> str | list[str]:
    if some_condition(context):
        return ["task_a", "task_b"]   # запустить обе
    return "task_c"                    # запустить только одну

BranchPythonOperator(
    task_id="branch"
    python_callable=choose,
)

Return value:

  • str — task_id одной задачи (или task_group_id всей группы).
  • list[str] — несколько задач/групп выполнить.
  • None или пустой [] — skip всех downstream (эквивалент ShortCircuit).

@task.branch — тот же BranchPythonOperator под капотом, просто с function-style API.


ShortCircuitOperator — skip всего downstream

ShortCircuitOperator (или @task.short_circuit) — упрощённая форма branching. Возвращает bool:

  • True — pipeline продолжается.
  • False — все downstream tasks получают skipped.
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

@task.short_circuit
def is_business_day(**context) -> bool:
    date = context["data_interval_start"]
    return date.weekday() < 5

@task
def extract(): ...

@task
def transform(): ...

@task
def load(): ...

check = is_business_day()
check >> extract() >> transform() >> load()

В выходные is_business_day возвращает False, scheduler помечает extract, transform, load как skipped. В будни — все запускаются нормально.

ShortCircuit vs Branch

ПараметрShortCircuitBranch
Return typeboolstr или list[str]
Что делает при False/skipSkip всех downstreamSkip только не-выбранных downstream
Use caseConditional whole-pipeline gatingВыбор между ветками
Default ignore_downstream_trigger_rulesTrue (2.3+)n/a

ignore_downstream_trigger_rules=True — важный параметр ShortCircuit. С ним skip пропагируется через все downstream, игнорируя их trigger_rule. Без него (False), downstream tasks с trigger_rule all_done или all_failed могли бы запуститься.


Trigger_rule — поведение task при разных upstream states

trigger_rule (атрибут operator) определяет, когда task запускается на основе состояний upstream tasks. Значения:

trigger_ruleКогда запускается
all_success (default)Все upstream → success
all_failedВсе upstream → failed (или upstream_failed)
all_doneВсе upstream → терминальное состояние (success, failed, skipped)
one_successХотя бы один upstream → success
one_failedХотя бы один upstream → failed
one_doneХотя бы один upstream → терминальное
none_failedВсе upstream success или skipped (нет failed)
none_failed_min_one_successВсе success/skipped + хотя бы один success
none_skippedВсе upstream success или failed (нет skipped)
alwaysВсегда, независимо от upstream

Default all_success означает: «если хотя бы один upstream failed или skipped — я тоже skip/fail». Это безопасный default для линейных pipeline.

После branching default не подходит

@task.branch
def pick_env() -> str:
    return "prod_load" if is_prod() else "dev_load"

@task
def prod_load(): ...

@task
def dev_load(): ...

@task
def notify(): ...  # ← default all_success → если prod выбран, dev_load=skipped, notify тоже skipped!

branch = pick_env()
branch >> [prod_load(), dev_load()] >> notify()

notify имеет два upstream: один skipped, один success — all_success его пропустит. Решение — trigger_rule="none_failed_min_one_success" для notify:

@task(trigger_rule="none_failed_min_one_success")
def notify(): ...

Теперь:

  • Оба upstream не failed → ок.
  • Хотя бы один success → ок.
  • Notify запустится.

Реальные паттерны

Pattern 1: holiday skip

Пропустить pipeline в праздничные дни:

from airflow.decorators import dag, task
from datetime import datetime

HOLIDAYS_2026 = {"2026-01-01", "2026-01-07", "2026-05-09"}

@dag(schedule="0 8 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def daily_etl_with_holidays():
    @task.short_circuit
    def not_a_holiday(**context) -> bool:
        ds = context["ds"]
        return ds not in HOLIDAYS_2026

    @task
    def extract(): ...

    @task
    def load(): ...

    not_a_holiday() >> extract() >> load()

daily_etl_with_holidays()

Pattern 2: A/B test routing

Отправить 10% запусков в variant pipeline:

import hashlib

@dag(...)
def ab_test_etl():
    @task.branch
    def route(**context) -> str:
        run_id = context["run_id"]
        bucket = int(hashlib.md5(run_id.encode()).hexdigest(), 16) % 100
        return "variant_pipeline" if bucket < 10 else "control_pipeline"

    @task
    def control_pipeline(): ...

    @task
    def variant_pipeline(): ...

    @task(trigger_rule="none_failed_min_one_success")
    def collect_metrics(): ...

    route() >> [control_pipeline(), variant_pipeline()] >> collect_metrics()

ab_test_etl()

Pattern 3: environment routing

Разная логика для dev/staging/prod, в одном DAG:

@dag(...)
def env_aware_etl():
    @task.branch
    def env_route() -> list[str]:
        env = Variable.get("airflow_env", default_var="dev")
        if env == "prod":
            return ["prod_extract", "prod_validate"]
        if env == "staging":
            return ["staging_extract"]
        return ["dev_extract"]

    @task
    def prod_extract(): ...
    @task
    def prod_validate(): ...
    @task
    def staging_extract(): ...
    @task
    def dev_extract(): ...

    @task(trigger_rule="none_failed_min_one_success")
    def common_load(): ...

    branch = env_route()
    branch >> [prod_extract(), prod_validate(), staging_extract(), dev_extract()] >> common_load()

env_aware_etl()

Pattern 4: retry-aware branch

Sensor sometimes flaky — если он failed после всех retries, выполнить альтернативный путь, а не упасть весь DAG:

sensor = MyDataSensor(
    task_id="wait_data"
    retries=3,
    retry_delay=timedelta(minutes=10),
)

main_load = MainLoadTask(task_id="main_load")

fallback_load = FallbackLoadTask(
    task_id="fallback_load"
    trigger_rule="all_failed",  # ← запустится ТОЛЬКО если sensor failed после всех retries
)

end = EmptyOperator(
    task_id="end"
    trigger_rule="none_failed_min_one_success",  # хотя бы один success
)

sensor >> main_load >> end
sensor >> fallback_load >> end

Trigger_rule таблица — что выбирать после branching

СценарийTrigger_rule
Линейный pipeline, без branchingall_success (default)
После branch — общий downstreamnone_failed_min_one_success
После branch — нужен любой successone_success
Cleanup task (teardown)all_done
Notify только при failureone_failed
Fallback при полном failureall_failed
Гарантированный запускalways
TIP

В 2.7+ есть Setup/Teardown abstraction (см. Module 02 lesson 06), которая делает явный teardown с правильной семантикой failure handling. Для resource cleanup (close cluster, drop temp table) предпочитайте .as_teardown() вместо trigger_rule='all_done' — намерения выражены чище, и Airflow lifecycle учитывает teardown в DagRun состоянии.


Production gotchas

  1. @task.branch возвращает task_id, а вы передаёте функцию. return weekday_load (без вызова) — это передача callable, scheduler ищет task с task_id '<function weekday_load>' и не находит. Возвращайте строку имени task: return "weekday_load". Либо через .task_id: return weekday_load_op.task_id.

  2. Default trigger_rule после branch → silent skip. branch >> [a, b] >> common с default all_success приведёт к тому, что common всегда skipped (потому что один из a/b всегда skipped). DAG runs «успешно», но downstream данных нет. Всегда явно trigger_rule="none_failed_min_one_success" для merge-точки.

  3. ShortCircuit с ignore_downstream_trigger_rules=False. По умолчанию (True) skip propagates через все downstream. Если ставите False, downstream с trigger_rule='all_done' или 'always' могут неожиданно запуститься, обходя ваш skip-condition. Думайте дважды перед явным False.

  4. Branch task должен возвращать существующий task_id. Опечатка return "weekday_loa" (без d) — scheduler не находит задачу, branch получает failed, downstream upstream_failed. В CI добавьте linter — airflow dags test ... распознает невалидные branch returns.

  5. Branching с task_group. Если возвращаете task_group_id, в нём skipped попадают все tasks. Полезно для крупных мега-веток. Но если внутри group есть tasks с trigger_rule='all_done' — они запустятся несмотря на skip (all_done включает skipped). Это типичный bug при использовании teardown внутри branch-веток.

  6. AirflowSkipException внутри normal task. Не путать с branching: внутри обычного @task можно сделать raise AirflowSkipException — task получит state skipped. Это эффектно как замена ShortCircuit для conditional skip без отдельной task. Но при этом trigger_rule downstream все равно сработают: default all_success → downstream skipped.

  7. Branching и depends_on_past=True. Если branch выбирал task_a вчера, а сегодня выбрал task_b, у task_b depends_on_past=True, то её вчерашний state — skipped. По умолчанию depends_on_past ждёт success или skipped вчера, так что работает. Но если используете wait_for_downstream=True или строгий Pool-based gating — могут быть сюрпризы. Тестируйте edge cases.


Проверка знанийKnowledge check
У вас DAG: `branch_env` (выбирает 'prod_load' или 'dev_load') >> [prod_load, dev_load] >> notify. На проде вы видите, что notify всегда состоянии 'skipped', хотя один из ветвей всегда success. В чём проблема и как починить? Дополнительно: почему ShortCircuit не подойдёт здесь?
ОтветAnswer
Проблема в **trigger_rule** на notify. Default — `all_success`. После branching один из upstream (тот, что не выбран) всегда `skipped`. С `all_success` это автоматически делает notify тоже `skipped`. Это **silent failure** — DAG run в UI показывает success, но notification не уходит. **Fix**: поставить `trigger_rule='none_failed_min_one_success'` на notify. Эта rule означает: ни один upstream не failed + хотя бы один success → запустись. После branching это правильный default — игнорирует skipped, требует хотя бы один success. **Почему не ShortCircuit**: ShortCircuit возвращает bool и при False скипает **всех** downstream безусловно (с `ignore_downstream_trigger_rules=True`). Это совершенно другая семантика — здесь нам нужно выбрать одну из веток, не отменить весь pipeline. ShortCircuit подходит для gating (holiday check → skip whole day), а Branch — для routing (env-specific path). Дополнительно стоит знать другие правильные rules для merge-точек: `one_success` (если хотя бы один success, не важно про failed), `none_failed` (все success или skipped, нет failed), `all_done` для cleanup tasks. Полностью избежать вопроса можно через Setup/Teardown abstraction (2.7+) для cleanup — она явнее выражает intent чем trigger_rule trickery.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. DAG: `branch_env >> [prod_load, dev_load] >> notify`. branch_env возвращает 'prod_load' или 'dev_load'. На проде заметили: notify всегда state='skipped', хотя один из ветвей success. Что неправильно?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6