Branching и conditional skip
Любой реальный pipeline рано или поздно встречает развилку: «если суббота — пропусти load в OLTP», «если A/B test — отправь 10% в variant», «если dev environment — не звони в Slack». В Airflow есть три механизма для управления потоком — BranchPythonOperator (выбор одной из веток), ShortCircuitOperator (skip всего downstream), и trigger_rule (политика запуска при смешанных состояниях upstream).
Этот урок разбирает все три и показывает, как они комбинируются, плюс распространённые паттерны и подводные камни trigger_rules после branching.
Branching — выбор ветки
BranchPythonOperator (и его TaskFlow аналог @task.branch) — особый PythonOperator, который возвращает task_id (или список task_id) тех downstream tasks, которые должны выполниться. Все остальные downstream tasks получают state skipped.
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def branching_demo():
@task.branch
def pick_path(**context) -> str:
date = context["data_interval_start"]
if date.weekday() < 5:
return "weekday_load"
return "weekend_skip"
@task
def weekday_load():
return "loaded weekday batch"
weekend_skip = EmptyOperator(task_id="weekend_skip")
end = EmptyOperator(task_id="end", trigger_rule="none_failed_min_one_success")
branch = pick_path()
branch >> [weekday_load(), weekend_skip] >> end
branching_demo()
Что произойдёт в субботу:
pick_pathзапускается, возвращает"weekend_skip".- Scheduler видит return value, помечает все downstream tasks branch’а, кроме
weekend_skip, какskipped(включаяweekday_load). weekend_skip(EmptyOperator) запускается тривиально, statesuccess.endимеет два upstream:weekday_load (skipped)иweekend_skip (success). Default trigger_ruleall_successсделал бы его тожеskipped. Поэтому ставимnone_failed_min_one_success.
Классический BranchPythonOperator
Без TaskFlow:
from airflow.operators.python import BranchPythonOperator
def choose(**context) -> str | list[str]:
if some_condition(context):
return ["task_a", "task_b"] # запустить обе
return "task_c" # запустить только одну
BranchPythonOperator(
task_id="branch"
python_callable=choose,
)
Return value:
str— task_id одной задачи (или task_group_id всей группы).list[str]— несколько задач/групп выполнить.Noneили пустой[]— skip всех downstream (эквивалент ShortCircuit).
@task.branch — тот же BranchPythonOperator под капотом, просто с function-style API.
ShortCircuitOperator — skip всего downstream
ShortCircuitOperator (или @task.short_circuit) — упрощённая форма branching. Возвращает bool:
True— pipeline продолжается.False— все downstream tasks получаютskipped.
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
@task.short_circuit
def is_business_day(**context) -> bool:
date = context["data_interval_start"]
return date.weekday() < 5
@task
def extract(): ...
@task
def transform(): ...
@task
def load(): ...
check = is_business_day()
check >> extract() >> transform() >> load()
В выходные is_business_day возвращает False, scheduler помечает extract, transform, load как skipped. В будни — все запускаются нормально.
ShortCircuit vs Branch
| Параметр | ShortCircuit | Branch |
|---|---|---|
| Return type | bool | str или list[str] |
| Что делает при False/skip | Skip всех downstream | Skip только не-выбранных downstream |
| Use case | Conditional whole-pipeline gating | Выбор между ветками |
Default ignore_downstream_trigger_rules | True (2.3+) | n/a |
ignore_downstream_trigger_rules=True — важный параметр ShortCircuit. С ним skip пропагируется через все downstream, игнорируя их trigger_rule. Без него (False), downstream tasks с trigger_rule all_done или all_failed могли бы запуститься.
Trigger_rule — поведение task при разных upstream states
trigger_rule (атрибут operator) определяет, когда task запускается на основе состояний upstream tasks. Значения:
| trigger_rule | Когда запускается |
|---|---|
all_success (default) | Все upstream → success |
all_failed | Все upstream → failed (или upstream_failed) |
all_done | Все upstream → терминальное состояние (success, failed, skipped) |
one_success | Хотя бы один upstream → success |
one_failed | Хотя бы один upstream → failed |
one_done | Хотя бы один upstream → терминальное |
none_failed | Все upstream success или skipped (нет failed) |
none_failed_min_one_success | Все success/skipped + хотя бы один success |
none_skipped | Все upstream success или failed (нет skipped) |
always | Всегда, независимо от upstream |
Default all_success означает: «если хотя бы один upstream failed или skipped — я тоже skip/fail». Это безопасный default для линейных pipeline.
После branching default не подходит
@task.branch
def pick_env() -> str:
return "prod_load" if is_prod() else "dev_load"
@task
def prod_load(): ...
@task
def dev_load(): ...
@task
def notify(): ... # ← default all_success → если prod выбран, dev_load=skipped, notify тоже skipped!
branch = pick_env()
branch >> [prod_load(), dev_load()] >> notify()
notify имеет два upstream: один skipped, один success — all_success его пропустит. Решение — trigger_rule="none_failed_min_one_success" для notify:
@task(trigger_rule="none_failed_min_one_success")
def notify(): ...
Теперь:
- Оба upstream не failed → ок.
- Хотя бы один success → ок.
- Notify запустится.
Реальные паттерны
Pattern 1: holiday skip
Пропустить pipeline в праздничные дни:
from airflow.decorators import dag, task
from datetime import datetime
HOLIDAYS_2026 = {"2026-01-01", "2026-01-07", "2026-05-09"}
@dag(schedule="0 8 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def daily_etl_with_holidays():
@task.short_circuit
def not_a_holiday(**context) -> bool:
ds = context["ds"]
return ds not in HOLIDAYS_2026
@task
def extract(): ...
@task
def load(): ...
not_a_holiday() >> extract() >> load()
daily_etl_with_holidays()
Pattern 2: A/B test routing
Отправить 10% запусков в variant pipeline:
import hashlib
@dag(...)
def ab_test_etl():
@task.branch
def route(**context) -> str:
run_id = context["run_id"]
bucket = int(hashlib.md5(run_id.encode()).hexdigest(), 16) % 100
return "variant_pipeline" if bucket < 10 else "control_pipeline"
@task
def control_pipeline(): ...
@task
def variant_pipeline(): ...
@task(trigger_rule="none_failed_min_one_success")
def collect_metrics(): ...
route() >> [control_pipeline(), variant_pipeline()] >> collect_metrics()
ab_test_etl()
Pattern 3: environment routing
Разная логика для dev/staging/prod, в одном DAG:
@dag(...)
def env_aware_etl():
@task.branch
def env_route() -> list[str]:
env = Variable.get("airflow_env", default_var="dev")
if env == "prod":
return ["prod_extract", "prod_validate"]
if env == "staging":
return ["staging_extract"]
return ["dev_extract"]
@task
def prod_extract(): ...
@task
def prod_validate(): ...
@task
def staging_extract(): ...
@task
def dev_extract(): ...
@task(trigger_rule="none_failed_min_one_success")
def common_load(): ...
branch = env_route()
branch >> [prod_extract(), prod_validate(), staging_extract(), dev_extract()] >> common_load()
env_aware_etl()
Pattern 4: retry-aware branch
Sensor sometimes flaky — если он failed после всех retries, выполнить альтернативный путь, а не упасть весь DAG:
sensor = MyDataSensor(
task_id="wait_data"
retries=3,
retry_delay=timedelta(minutes=10),
)
main_load = MainLoadTask(task_id="main_load")
fallback_load = FallbackLoadTask(
task_id="fallback_load"
trigger_rule="all_failed", # ← запустится ТОЛЬКО если sensor failed после всех retries
)
end = EmptyOperator(
task_id="end"
trigger_rule="none_failed_min_one_success", # хотя бы один success
)
sensor >> main_load >> end
sensor >> fallback_load >> end
Trigger_rule таблица — что выбирать после branching
| Сценарий | Trigger_rule |
|---|---|
| Линейный pipeline, без branching | all_success (default) |
| После branch — общий downstream | none_failed_min_one_success |
| После branch — нужен любой success | one_success |
| Cleanup task (teardown) | all_done |
| Notify только при failure | one_failed |
| Fallback при полном failure | all_failed |
| Гарантированный запуск | always |
В 2.7+ есть Setup/Teardown abstraction (см. Module 02 lesson 06), которая делает явный teardown с правильной семантикой failure handling. Для resource cleanup (close cluster, drop temp table) предпочитайте .as_teardown() вместо trigger_rule='all_done' — намерения выражены чище, и Airflow lifecycle учитывает teardown в DagRun состоянии.
Production gotchas
-
@task.branchвозвращает task_id, а вы передаёте функцию.return weekday_load(без вызова) — это передача callable, scheduler ищет task с task_id'<function weekday_load>'и не находит. Возвращайте строку имени task:return "weekday_load". Либо через.task_id:return weekday_load_op.task_id. -
Default trigger_rule после branch → silent skip.
branch >> [a, b] >> commonс defaultall_successприведёт к тому, чтоcommonвсегдаskipped(потому что один изa/bвсегдаskipped). DAG runs «успешно», но downstream данных нет. Всегда явноtrigger_rule="none_failed_min_one_success"для merge-точки. -
ShortCircuit с
ignore_downstream_trigger_rules=False. По умолчанию (True) skip propagates через все downstream. Если ставитеFalse, downstream сtrigger_rule='all_done'или'always'могут неожиданно запуститься, обходя ваш skip-condition. Думайте дважды перед явнымFalse. -
Branch task должен возвращать существующий task_id. Опечатка
return "weekday_loa"(безd) — scheduler не находит задачу, branch получаетfailed, downstreamupstream_failed. В CI добавьте linter —airflow dags test ...распознает невалидные branch returns. -
Branching с
task_group. Если возвращаете task_group_id, в нём skipped попадают все tasks. Полезно для крупных мега-веток. Но если внутри group есть tasks сtrigger_rule='all_done'— они запустятся несмотря на skip (all_doneвключает skipped). Это типичный bug при использовании teardown внутри branch-веток. -
AirflowSkipExceptionвнутри normal task. Не путать с branching: внутри обычного@taskможно сделатьraise AirflowSkipException— task получит stateskipped. Это эффектно как замена ShortCircuit для conditional skip без отдельной task. Но при этомtrigger_ruledownstream все равно сработают: defaultall_success→ downstream skipped. -
Branching и
depends_on_past=True. Если branch выбиралtask_aвчера, а сегодня выбралtask_b, уtask_bdepends_on_past=True, то её вчерашний state —skipped. По умолчаниюdepends_on_pastждётsuccessилиskippedвчера, так что работает. Но если используетеwait_for_downstream=Trueили строгийPool-based gating — могут быть сюрпризы. Тестируйте edge cases.