Learning Platform
Глоссарий Troubleshooting
Урок 03.08 · 18 мин
Продвинутый
Trigger RulesDependenciesBranchingError Handling

Trigger rules — управление выполнением при partial failures

По default task запускается только когда все upstream tasks success. Это разумно для большинства cases, но иногда нужно другое поведение: «запустить notification даже если что-то failed», «запустить cleanup в любом случае», «запустить если хотя бы один upstream succeeded», «запустить только при partial failure». Это управляется через trigger_rule.

Этот урок — все 9 опций trigger_rule с use cases и semantics.

Trigger rule evaluation — upstream states → downstream decision
upstream_A: successTask завершился успешно (state=success). Для всех trigger_rules кроме all_failed/one_failed — это позитивный сигнал. XCom available для downstream pull.
upstream_B: failedTask failed после exhausted retries (state=failed). Влияет на all_success → upstream_failed, но позволяет all_done и one_failed.
upstream_C: skippedTask пропущен (state=skipped) — обычно после branching или short-circuit. Особенный case: для all_success строго это failure, для none_failed — допустимо.
downstream TI scheduling check
trigger_rule evaluatorПри попытке перевода TI scheduled → queued scheduler проверяет trigger_rule downstream task против states upstream tasks. Если condition не met — TI остаётся None или становится upstream_failed/skipped.
all_success → upstream_failedDefault rule. Требует ВСЕ upstream success. Если хотя бы один failed/skipped — downstream marked upstream_failed (не запускается). Самый strict вариант.
all_done → scheduledЗапускается когда все upstream достигли любого terminal state (success/failed/skipped). Используется для cleanup tasks которые должны run regardless of outcomes. Альтернатива as_teardown() в 2.7+.
one_success → scheduledЗапускается если хотя бы один upstream success. Критично для branching patterns: после @task.branch один путь success, другой skipped → one_success позволяет merge task запуститься.
one_failed → scheduledЗапускается если хотя бы один upstream failed. Используется для alerting/notification только на failures — если все success, alert task просто skipped.
none_failed_min_one_successГибрид: никаких failures + минимум один success. После branching где нужно гарантировать что хотя бы одна ветка succeeded (не все skipped). Самый robust pattern для branching merge.
downstream TI gets final state
scheduled → queued → runningЕсли trigger_rule satisfied — TI попадает в normal scheduling flow. Critical section перевод scheduled → queued, executor подхватывает, worker исполняет.
upstream_failed / skippedЕсли condition не satisfied — terminal state без execution. upstream_failed для failures upstream, skipped для skipped propagation. Downstream tasks этих TI пропагируют дальше по rules.

Когда срабатывает task

Каждый TaskInstance проходит проверку trigger_rule перед переходом None → scheduled:

def can_be_scheduled(ti, upstream_tis):
    """Check if all upstream conditions are met based on trigger_rule."""
    upstream_states = [up.state for up in upstream_tis]
    return ti.task.trigger_rule.evaluate(upstream_states)

Если evaluate returns False → TI остаётся None. Если все upstream tasks завершились (terminal states), но условия не met → TI становится upstream_failed или skipped.


Все trigger_rule значения

RuleКогда срабатывает
all_success (default)Все upstream success/skipped
all_failedВсе upstream failed/upstream_failed
all_doneВсе upstream в terminal state (success/failed/skipped/upstream_failed)
all_skippedВсе upstream skipped
one_successХотя бы один upstream success
one_failedХотя бы один upstream failed
none_failedНикто не failed (но допустимы skipped)
none_failed_min_one_successНикто не failed + хотя бы один success
none_skippedНикто не skipped
alwaysВсегда — игнорируем upstream

Бонус: always_failed (2.7+)

В 2.7+ добавлен always_failed — для патернов вроде “alert если что-то upstream вообще failed”:

@task(trigger_rule="always_failed")
def alert_on_any_failure():
    send_pagerduty_alert()

Use case 1: Cleanup в любом случае (all_done)

Classic pattern — освободить resource независимо от outcome upstream:

@dag(...)
def cleanup_pattern():
    create_temp = create_temp_table()

    process_a = process_a_task()
    process_b = process_b_task()

    drop_temp = drop_temp_table.override(trigger_rule="all_done")()

    create_temp >> [process_a, process_b] >> drop_temp

Даже если process_a или process_b failed — drop_temp запускается.

Альтернатива в 2.7+: используйте Setup/Teardown (Lesson 06) — более явная семантика и лучше UI.


Use case 2: Notification на любой исход (always)

Send Slack message всегда — success или fail:

@dag(...)
def notify_pattern():
    @task
    def main_work(): ...

    @task(trigger_rule="always")
    def send_summary(**context):
        dag_run = context["dag_run"]
        send_slack(
            f"DAG {dag_run.dag_id} finished with state {dag_run.state}"
        )

    main_work() >> send_summary()

always runs неотменно — даже если upstream failed, skipped, или upstream_failed.


Use case 3: Alert только на failure (one_failed)

@dag(...)
def alert_pattern():
    @task
    def step1(): pass
    @task
    def step2(): pass
    @task
    def step3(): pass

    @task(trigger_rule="one_failed")
    def alert():
        send_pagerduty()

    [step1(), step2(), step3()] >> alert()

alert runs только когда хотя бы один из step1/2/3 failed. Если все success — alert skipped.


Use case 4: Branching через one_success

После BranchPythonOperator только один из путей выбран:

from airflow.operators.python import BranchPythonOperator

@dag(...)
def branching_pattern():
    @task.branch
    def choose_path(**context):
        if context["dag_run"].run_type == "manual":
            return "manual_task"
        else:
            return "scheduled_task"

    @task
    def manual_task(): print("manual")

    @task
    def scheduled_task(): print("scheduled")

    @task(trigger_rule="one_success")  # ← Важно!
    def merge():
        print("done")

    branch = choose_path()
    branch >> [manual_task(), scheduled_task()] >> merge()

Без trigger_rule="one_success", merge имел бы default all_success. Поскольку один из manual/scheduled будет skipped (другой выбран branch), merge тоже skipped → нежелательное поведение.

one_success triggers когда хотя бы один upstream success → корректно работает с branching.


Use case 5: Wait for all upstream (all_done) + check (Python check)

Когда нужно процессить результаты только если все upstream завершились:

@dag(...)
def aggregate_pattern():
    @task
    def task_a(): return "a"
    @task
    def task_b(): return "b"
    @task
    def task_c(): return "c"

    @task(trigger_rule="all_done")
    def aggregate(**context):
        ti = context["ti"]
        results = []
        for task_id in ["task_a", "task_b", "task_c"]:
            try:
                value = ti.xcom_pull(task_ids=task_id)
                if value is not None:
                    results.append(value)
            except Exception:
                pass  # Task failed — skip

        if not results:
            raise AirflowFailException("All upstream tasks failed")
        return results

    [task_a(), task_b(), task_c()] >> aggregate()

aggregate runs after all upstream finished (success/fail), сам решает что делать с failures (best-effort aggregation).


Detailed semantics

all_success (default)

upstream statesResult
all successscheduled
any failedupstream_failed
any skippedupstream_failed (в 2.x — может быть skipped)
mix success + skippedupstream_failed
mix success + runningwait

none_failed

Allows skipped, но not failed:

upstream statesResult
all successscheduled
mix success + skippedscheduled
any failedupstream_failed
any upstream_failedupstream_failed

Полезно когда optional upstream — skipped is OK, failed не OK.

none_failed_min_one_success

Like none_failed, но also требует хотя бы один success:

upstream statesResult
all skippedupstream_failed (нужен хотя бы один success)
all successscheduled
1 success + N skippedscheduled
any failedupstream_failed

Полезно после branching где нужен success от хотя бы одной ветви.

all_done

Triggers when all upstream reach terminal state (любое):

upstream statesResult
any runningwait
any queuedwait
all terminal (any combination)scheduled

Используется для cleanup tasks которые должны run regardless of upstream outcomes.

always

Не смотрит на upstream вообще:

upstream statesResult
anythingscheduled (когда DagRun created)

Даже если upstream ещё running — always task может запуститься. Но dependency chain всё равно используется для ordering: always task runs after upstream queued (но не ждёт их completion).

Practically — schedule запускает always tasks сразу же после upstream submitted, что часто соответствует “после upstream finished” из-за scheduling order.


Trigger rule + retries

Tricky case: если upstream task имеет retries, может ли downstream task запуститься между retries?

Нетup_for_retry не считается terminal state. Downstream tasks ждут пока upstream не достигнет final terminal state (success/failed после exhausted retries).

@task(retries=3)
def flaky_task(): ...

@task(trigger_rule="all_done")
def downstream(): ...

flaky_task() >> downstream()

downstream ждёт пока flaky_task либо succeed, либо failed после всех 3 retries.


Trigger rule + Setup/Teardown

В 2.7+ teardown task имеет implicit trigger_rule like all_done для своего scope. Но также setup failure handling:

SetupMiddle tasksDefault teardown behavior
successsuccessruns
successfailedruns
failedskippedruns (cleanup partial setup)
skipped (depends_on_past=True и past failed)skippedruns

Teardown почти always в практике, но с правильной semantics для setup failure.


Anti-patterns

Anti-pattern 1: trigger_rule=“always” для actual logic

@task(trigger_rule="always")
def core_processing():  # ❌ Должно зависеть от upstream
    ...

Если task actually depends on upstream results — не используйте always. Это для notification/cleanup ONLY.

Anti-pattern 2: Branching без learning_rule=“one_success”

@task.branch
def choose(): return "path_a"

@task
def path_a(): pass

@task
def path_b(): pass

@task
def merge():  # ❌ Default all_success → upstream_failed когда path_b skipped
    pass

После branching merge task должен иметь trigger_rule="one_success" или "none_failed_min_one_success".

Anti-pattern 3: Mixing semantics

# Confusing — что значит one_success здесь?
@task
def parallel_a(): ...
@task
def parallel_b(): ...

@task(trigger_rule="one_success")
def aggregate(): ...

[parallel_a(), parallel_b()] >> aggregate()

one_success означает “запустить если ОДИН из parallel_a/b succeeded”. Но aggregate skip-нет другой task — это intentional? Если parallel_a/b independent и оба должны run — используй all_done или none_failed.


Production checklist

  1. Cleanup tasksall_done или as_teardown() (2.7+)
  2. Notifications (success ИЛИ fail)always
  3. Alerts только на failone_failed
  4. После branchingone_success или none_failed_min_one_success
  5. Aggregationsall_done + Python logic для handling failures

Проверка знанийKnowledge check
DAG: `start → [check_a, check_b, check_c] → notify`. notify должен запускаться независимо от outcomes a/b/c, отсылать summary всем (что прошло, что упало). Какой trigger_rule выбрать и что внутри notify?
ОтветAnswer
**Trigger rule: `all_done`** — запустится когда все check_a/b/c достигнут terminal state (любого). Альтернативно `always`, но `all_done` чище — гарантирует что upstream actually finished (для XCom availability). **Внутри notify**: использовать `ti.xcom_pull(task_ids='check_a')` + handle case когда task failed (XCom None). Цикл по task_ids, собрать results, format summary, отправить. Pattern: `try: result = ti.xcom_pull(...) except: result = 'FAILED'`. Альтернатива в 2.7+: Listener API hook on dag_run_success/failed — централизованное уведомление для всех DAGs, без необходимости notify task в каждом.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. После BranchPythonOperator (@task.branch) какой trigger_rule нужен у downstream merge task?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8