Trigger rules — управление выполнением при partial failures
По default task запускается только когда все upstream tasks success. Это разумно для большинства cases, но иногда нужно другое поведение: «запустить notification даже если что-то failed», «запустить cleanup в любом случае», «запустить если хотя бы один upstream succeeded», «запустить только при partial failure». Это управляется через trigger_rule.
Этот урок — все 9 опций trigger_rule с use cases и semantics.
Когда срабатывает task
Каждый TaskInstance проходит проверку trigger_rule перед переходом None → scheduled:
def can_be_scheduled(ti, upstream_tis):
"""Check if all upstream conditions are met based on trigger_rule."""
upstream_states = [up.state for up in upstream_tis]
return ti.task.trigger_rule.evaluate(upstream_states)
Если evaluate returns False → TI остаётся None. Если все upstream tasks завершились (terminal states), но условия не met → TI становится upstream_failed или skipped.
Все trigger_rule значения
| Rule | Когда срабатывает |
|---|---|
all_success (default) | Все upstream success/skipped |
all_failed | Все upstream failed/upstream_failed |
all_done | Все upstream в terminal state (success/failed/skipped/upstream_failed) |
all_skipped | Все upstream skipped |
one_success | Хотя бы один upstream success |
one_failed | Хотя бы один upstream failed |
none_failed | Никто не failed (но допустимы skipped) |
none_failed_min_one_success | Никто не failed + хотя бы один success |
none_skipped | Никто не skipped |
always | Всегда — игнорируем upstream |
Бонус: always_failed (2.7+)
В 2.7+ добавлен always_failed — для патернов вроде “alert если что-то upstream вообще failed”:
@task(trigger_rule="always_failed")
def alert_on_any_failure():
send_pagerduty_alert()
Use case 1: Cleanup в любом случае (all_done)
Classic pattern — освободить resource независимо от outcome upstream:
@dag(...)
def cleanup_pattern():
create_temp = create_temp_table()
process_a = process_a_task()
process_b = process_b_task()
drop_temp = drop_temp_table.override(trigger_rule="all_done")()
create_temp >> [process_a, process_b] >> drop_temp
Даже если process_a или process_b failed — drop_temp запускается.
Альтернатива в 2.7+: используйте Setup/Teardown (Lesson 06) — более явная семантика и лучше UI.
Use case 2: Notification на любой исход (always)
Send Slack message всегда — success или fail:
@dag(...)
def notify_pattern():
@task
def main_work(): ...
@task(trigger_rule="always")
def send_summary(**context):
dag_run = context["dag_run"]
send_slack(
f"DAG {dag_run.dag_id} finished with state {dag_run.state}"
)
main_work() >> send_summary()
always runs неотменно — даже если upstream failed, skipped, или upstream_failed.
Use case 3: Alert только на failure (one_failed)
@dag(...)
def alert_pattern():
@task
def step1(): pass
@task
def step2(): pass
@task
def step3(): pass
@task(trigger_rule="one_failed")
def alert():
send_pagerduty()
[step1(), step2(), step3()] >> alert()
alert runs только когда хотя бы один из step1/2/3 failed. Если все success — alert skipped.
Use case 4: Branching через one_success
После BranchPythonOperator только один из путей выбран:
from airflow.operators.python import BranchPythonOperator
@dag(...)
def branching_pattern():
@task.branch
def choose_path(**context):
if context["dag_run"].run_type == "manual":
return "manual_task"
else:
return "scheduled_task"
@task
def manual_task(): print("manual")
@task
def scheduled_task(): print("scheduled")
@task(trigger_rule="one_success") # ← Важно!
def merge():
print("done")
branch = choose_path()
branch >> [manual_task(), scheduled_task()] >> merge()
Без trigger_rule="one_success", merge имел бы default all_success. Поскольку один из manual/scheduled будет skipped (другой выбран branch), merge тоже skipped → нежелательное поведение.
one_success triggers когда хотя бы один upstream success → корректно работает с branching.
Use case 5: Wait for all upstream (all_done) + check (Python check)
Когда нужно процессить результаты только если все upstream завершились:
@dag(...)
def aggregate_pattern():
@task
def task_a(): return "a"
@task
def task_b(): return "b"
@task
def task_c(): return "c"
@task(trigger_rule="all_done")
def aggregate(**context):
ti = context["ti"]
results = []
for task_id in ["task_a", "task_b", "task_c"]:
try:
value = ti.xcom_pull(task_ids=task_id)
if value is not None:
results.append(value)
except Exception:
pass # Task failed — skip
if not results:
raise AirflowFailException("All upstream tasks failed")
return results
[task_a(), task_b(), task_c()] >> aggregate()
aggregate runs after all upstream finished (success/fail), сам решает что делать с failures (best-effort aggregation).
Detailed semantics
all_success (default)
| upstream states | Result |
|---|---|
| all success | scheduled |
| any failed | upstream_failed |
| any skipped | upstream_failed (в 2.x — может быть skipped) |
| mix success + skipped | upstream_failed |
| mix success + running | wait |
none_failed
Allows skipped, но not failed:
| upstream states | Result |
|---|---|
| all success | scheduled |
| mix success + skipped | scheduled |
| any failed | upstream_failed |
| any upstream_failed | upstream_failed |
Полезно когда optional upstream — skipped is OK, failed не OK.
none_failed_min_one_success
Like none_failed, но also требует хотя бы один success:
| upstream states | Result |
|---|---|
| all skipped | upstream_failed (нужен хотя бы один success) |
| all success | scheduled |
| 1 success + N skipped | scheduled |
| any failed | upstream_failed |
Полезно после branching где нужен success от хотя бы одной ветви.
all_done
Triggers when all upstream reach terminal state (любое):
| upstream states | Result |
|---|---|
| any running | wait |
| any queued | wait |
| all terminal (any combination) | scheduled |
Используется для cleanup tasks которые должны run regardless of upstream outcomes.
always
Не смотрит на upstream вообще:
| upstream states | Result |
|---|---|
| anything | scheduled (когда DagRun created) |
Даже если upstream ещё running — always task может запуститься. Но dependency chain всё равно используется для ordering: always task runs after upstream queued (но не ждёт их completion).
Practically — schedule запускает always tasks сразу же после upstream submitted, что часто соответствует “после upstream finished” из-за scheduling order.
Trigger rule + retries
Tricky case: если upstream task имеет retries, может ли downstream task запуститься между retries?
Нет — up_for_retry не считается terminal state. Downstream tasks ждут пока upstream не достигнет final terminal state (success/failed после exhausted retries).
@task(retries=3)
def flaky_task(): ...
@task(trigger_rule="all_done")
def downstream(): ...
flaky_task() >> downstream()
downstream ждёт пока flaky_task либо succeed, либо failed после всех 3 retries.
Trigger rule + Setup/Teardown
В 2.7+ teardown task имеет implicit trigger_rule like all_done для своего scope. Но также setup failure handling:
| Setup | Middle tasks | Default teardown behavior |
|---|---|---|
| success | success | runs |
| success | failed | runs |
| failed | skipped | runs (cleanup partial setup) |
| skipped (depends_on_past=True и past failed) | skipped | runs |
Teardown почти always в практике, но с правильной semantics для setup failure.
Anti-patterns
Anti-pattern 1: trigger_rule=“always” для actual logic
@task(trigger_rule="always")
def core_processing(): # ❌ Должно зависеть от upstream
...
Если task actually depends on upstream results — не используйте always. Это для notification/cleanup ONLY.
Anti-pattern 2: Branching без learning_rule=“one_success”
@task.branch
def choose(): return "path_a"
@task
def path_a(): pass
@task
def path_b(): pass
@task
def merge(): # ❌ Default all_success → upstream_failed когда path_b skipped
pass
После branching merge task должен иметь trigger_rule="one_success" или "none_failed_min_one_success".
Anti-pattern 3: Mixing semantics
# Confusing — что значит one_success здесь?
@task
def parallel_a(): ...
@task
def parallel_b(): ...
@task(trigger_rule="one_success")
def aggregate(): ...
[parallel_a(), parallel_b()] >> aggregate()
one_success означает “запустить если ОДИН из parallel_a/b succeeded”. Но aggregate skip-нет другой task — это intentional? Если parallel_a/b independent и оба должны run — используй all_done или none_failed.
Production checklist
- Cleanup tasks →
all_doneилиas_teardown()(2.7+) - Notifications (success ИЛИ fail) →
always - Alerts только на fail →
one_failed - После branching →
one_successилиnone_failed_min_one_success - Aggregations →
all_done+ Python logic для handling failures