Learning Platform
Глоссарий Troubleshooting
Урок 03.06 · 20 мин
Продвинутый
SetupTeardownEphemeral ResourcesCleanup

Setup / Teardown tasks (2.7+)

Распространённый pattern в data engineering: поднять ephemeral compute resource (Spark cluster, Snowflake warehouse, K8s namespace), выполнить tasks над ним, потом обязательно удалить — даже если что-то пошло не так. До Airflow 2.7 это решалось через trigger_rule=ALL_DONE на teardown task, но семантика была subtle и часто ломалась. В 2.7 появилась first-class абстракция Setup/Teardown (AIP уже забыли номер, но это часть 2.7 release).

Этот урок — как правильно использовать Setup/Teardown в production.


Проблема которую решает

Классический случай — provisioning Spark на EMR:

@dag(...)
def spark_etl_OLD_WAY():
    create_cluster = EmrCreateJobFlowOperator(task_id="create_cluster", ...)

    run_spark_step1 = EmrAddStepsOperator(task_id="step1", ...)
    run_spark_step2 = EmrAddStepsOperator(task_id="step2", ...)
    run_spark_step3 = EmrAddStepsOperator(task_id="step3", ...)

    # Teardown — manually с trigger_rule
    terminate_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_cluster"
        trigger_rule="all_done",  # ← запустить даже если что-то fail
    )

    create_cluster >> [run_spark_step1, run_spark_step2, run_spark_step3] >> terminate_cluster

Проблемы старого подхода:

  1. Teardown failure не reflected в DagRun state — DAG marked success даже если cluster не удалился (cost leak!)
  2. trigger_rule="all_done" не работает если есть upstream skip — teardown пропускается
  3. Cleanup при clear — если очистить run и retry, cluster может остаться running, новый run создаст ещё один
  4. Scope не явен — teardown wrap всех tasks неявно

Setup/Teardown — first-class abstraction

С 2.7+:

@dag(...)
def spark_etl_NEW_WAY():
    create_cluster = EmrCreateJobFlowOperator(
        task_id="create_cluster", ...
    ).as_setup()                          # ← Mark as setup

    run_spark_step1 = EmrAddStepsOperator(task_id="step1", ...)
    run_spark_step2 = EmrAddStepsOperator(task_id="step2", ...)
    run_spark_step3 = EmrAddStepsOperator(task_id="step3", ...)

    terminate_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_cluster", ...
    ).as_teardown(setups=create_cluster)  # ← Link to setup

    create_cluster >> [run_spark_step1, run_spark_step2, run_spark_step3]
    [run_spark_step1, run_spark_step2, run_spark_step3] >> terminate_cluster

Что изменилось:

  • as_setup() помечает task как setup для downstream
  • as_teardown(setups=...) помечает task как teardown, связывает с setup
  • Teardown запускается всегда — независимо от состояния middle tasks
  • DAG state корректно отражает teardown failure

Семантика Setup/Teardown

Setup/Teardown lifecycle — happy path
create_cluster.as_setup()Setup task запускается первым. Provisions ephemeral resource (EMR cluster, K8s namespace, Snowflake warehouse). Возвращает identifier (cluster_id) через XCom для downstream использования.
setup success → middle tasks scheduled
step1 (work)Middle tasks — actual business logic. Получают cluster_id через XCom от setup. Выполняются как обычно с дефолтным trigger_rule=all_success между ними.
step2 (work)Все middle tasks в scope teardown. Их failures не блокируют teardown — он всё равно запустится. Но влияют на final DagRun state (failed если хотя бы один упал).
step3 (work)После завершения (success или failure) всех middle tasks scheduler triggers teardown — независимо от их состояния, благодаря implicit trigger_rule на teardown.
all middle reach terminal state
terminate_cluster.as_teardown(setups=create_cluster)Teardown запускается ВСЕГДА — даже если middle tasks failed, даже если setup partially failed. Должен быть idempotent (handle 'уже удалено' gracefully). setups= параметр явно линкует к setup для proper scope.
DagRun state = successDagRun success только если все middle tasks AND teardown success. Если teardown failed — DagRun=failed (это критично vs старый trigger_rule=all_done где teardown failure не отражался).
Setup/Teardown — failure scenarios
Scenario A: middle task failsstep2 raises exception, retries exhausted. step3 получает upstream_failed (default trigger_rule). Несмотря на это teardown всё равно запускается.
middle done (any terminal state)
terminate_cluster runsTeardown триггерится даже при failures — это его основная семантика. Cluster удаляется. Cost leak предотвращён. Это главное преимущество над manual trigger_rule=all_done.
==========================
Scenario B: setup failscreate_cluster raises (AWS quota exceeded, network error). Middle tasks все skipped (upstream_failed). Но teardown всё равно запускается — для cleanup partial state (например EMR JobFlow created но в TERMINATED_WITH_ERRORS).
teardown runs anywayЭто нужно для cleanup partial setup state. Teardown должен gracefully handle случай 'ресурс не существует' (try/except на cluster_not_found). Без этого orphan resources накапливаются.
==========================
Scenario C: teardown failsCluster terminate API call failed (e.g., network glitch, EMR throttling). Это серьёзно — может быть real cost leak. DagRun помечается failed, alerting должен сработать.
DagRun = failedГлавное отличие от старого trigger_rule=all_done: teardown failure ПРОПАГИРУЕТСЯ в DagRun state. Это позволяет alerting корректно сработать. Operations team видит failure, manual cleanup runbook запускается.

Правила выполнения

СценарийSetupMiddle tasksTeardownDagRun
Все successsuccess
Middle task failfailed
Setup failskipped✅ (cleanup partial setup)failed
Teardown failfailed
Setup succeed, teardown succeed, middle failfailed

Ключевые отличия от старого trigger_rule=all_done:

  1. Teardown executed even if middle failed — это default
  2. Teardown failure reflected — DagRun = failed
  3. При setup failure teardown всё равно запускается — для cleanup partial setup state

2.10.5 fix — teardown executed even when DAG failed

В версиях 2.7-2.10.4 был bug: если middle tasks failed и retry exhausted, иногда teardown skipped. Полностью исправлено в 2.10.5.

Production recommendation: обязательно используйте 2.10.5+ или 2.11.x LTS для production Setup/Teardown.


Scope teardown

Teardown может быть scoped к подграфу:

@dag(...)
def scoped_teardown():
    create_db = task_create_db().as_setup()

    use_db_1 = task_use_db()
    use_db_2 = task_use_db()

    delete_db = task_delete_db().as_teardown(setups=create_db)

    # Scope: create_db → [use_db_1, use_db_2] → delete_db
    create_db >> [use_db_1, use_db_2] >> delete_db

    # Дополнительные tasks НЕ в scope
    other_task_1()
    other_task_2()

delete_db запускается только после tasks в его scope (use_db_1, use_db_2), не дожидаясь other_task_1/2.


TaskFlow style

С @task декораторами:

@dag(...)
def taskflow_setup_teardown():
    @task
    def create_resource() -> str:
        cluster_id = boto3.client("emr").run_job_flow(...)
        return cluster_id

    @task
    def use_resource(cluster_id: str): ...

    @task
    def cleanup_resource(cluster_id: str):
        boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])

    cluster = create_resource().as_setup()
    use_resource(cluster)
    cleanup_resource(cluster).as_teardown(setups=cluster)

Multiple setups и teardowns

Можно несколько setup + teardown в одном DAG:

@dag(...)
def multiple_resources():
    # Resource 1: Spark cluster
    spark_create = EmrCreateJobFlowOperator(...).as_setup()
    spark_step = EmrAddStepsOperator(...)
    spark_delete = EmrTerminateJobFlowOperator(...).as_teardown(setups=spark_create)

    # Resource 2: Snowflake warehouse
    sf_resume = SnowflakeOperator(sql="ALTER WAREHOUSE wh RESUME").as_setup()
    sf_query = SnowflakeOperator(sql="...")
    sf_suspend = SnowflakeOperator(sql="ALTER WAREHOUSE wh SUSPEND").as_teardown(setups=sf_resume)

    # Scopes
    spark_create >> spark_step >> spark_delete
    sf_resume >> sf_query >> sf_suspend

Каждый teardown управляет только своим scope.


Edge cases и pitfalls

Pitfall 1: Setup без teardown

create = task1().as_setup()  # ❌ No teardown
process = task2()
create >> process

Это lint warning — setup без teardown не имеет смысла (зачем mark если cleanup не выполняется?). Используйте обычные tasks без .as_setup().

Pitfall 2: Teardown слишком scoped

@dag(...)
def too_scoped():
    create = task_create().as_setup()
    work_1 = task_work()
    work_2 = task_work()  # ← не в scope
    teardown = task_teardown().as_teardown(setups=create)

    create >> work_1 >> teardown
    work_2  # ← independent, тратит ресурсы parallel с teardown

Лучше явно include work_2 в scope:

create >> [work_1, work_2] >> teardown

Pitfall 3: Idempotency teardown

Teardown может запуститься несколько раз (retry, clear, manual rerun). Должен быть idempotent:

@task
def cleanup_resource(cluster_id: str):
    try:
        boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
    except ClientError as e:
        if "InvalidRequestException" in str(e):
            # Cluster уже terminated — это норм
            return
        raise

Pitfall 4: Teardown не запускается если DAG paused

Если DAG paused во время выполнения, scheduler не будет запускать new tasks — включая teardown. Ресурс leak!

Mitigation: external monitoring + manual cleanup runbook.


Best practices

  1. ВСЕГДА as_teardown(setups=...) — без явного link teardown ведёт себя как обычная task с trigger_rule=all_done.

  2. Idempotent teardown — graceful handling «уже удалено».

  3. Teardown быстрый — он не должен иметь long-running tasks. Иначе DAG зависает.

  4. Никаких retries на teardown без retry_delay — иначе цикл retries блокирует освобождение pool slot.

  5. Use TaskFlow style для simple cases — meaningfully чище.

  6. 2.10.5+ для production — все edge cases фиксы.


Production пример: full ephemeral pipeline

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"owner": "ml", "retries": 1, "retry_delay": timedelta(minutes=5)},
    tags=["ml", "ephemeral"],
)
def ml_training_pipeline():
    @task
    def create_gpu_cluster() -> str:
        """Provision GPU cluster on EMR."""
        client = boto3.client("emr")
        response = client.run_job_flow(
            Name="ml-training"
            Instances={
                "InstanceGroups": [
                    {"Name": "Master", "InstanceType": "m5.xlarge", "InstanceCount": 1},
                    {"Name": "Workers", "InstanceType": "g4dn.4xlarge", "InstanceCount": 4},
                ],
                "Ec2KeyName": "...",
                "TerminationProtected": False,
            },
            JobFlowRole="..."
            ServiceRole="...",
        )
        return response["JobFlowId"]

    @task
    def preprocess_data(cluster_id: str) -> str:
        """Spark preprocessing step."""
        submit_spark_step(cluster_id, "preprocess.py")
        return "s3://bucket/preprocessed/"

    @task
    def train_model(cluster_id: str, data_path: str) -> str:
        """Training step."""
        submit_spark_step(cluster_id, "train.py", args=[data_path])
        return "s3://bucket/model.pkl"

    @task
    def evaluate(cluster_id: str, model_path: str):
        submit_spark_step(cluster_id, "evaluate.py", args=[model_path])

    @task(retries=3, retry_delay=timedelta(seconds=10))
    def terminate_cluster(cluster_id: str):
        """Idempotent termination."""
        try:
            boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
        except ClientError as e:
            if "InvalidRequestException" in str(e) and "already terminated" in str(e):
                return
            raise

    # Wiring
    cluster = create_gpu_cluster().as_setup()
    data = preprocess_data(cluster)
    model = train_model(cluster, data)
    evaluate_op = evaluate(cluster, model)

    cleanup = terminate_cluster(cluster).as_teardown(setups=cluster)

    # Ensure all middle tasks in cleanup scope
    [data, model, evaluate_op] >> cleanup

При успешном run:

  • Cluster создаётся
  • Data preprocessed → model trained → evaluated
  • Cluster удаляется
  • DagRun success

При сбое evaluate:

  • Data preprocessed → model trained → evaluate FAIL
  • Cluster всё равно удаляется (no leak!)
  • DagRun marked failed

При сбое create_gpu_cluster:

  • All downstream skipped
  • terminate_cluster всё равно runs (handles partial setup gracefully)

Проверка знанийKnowledge check
Чем `as_teardown(setups=create_cluster)` отличается от обычного `trigger_rule='all_done'` в 2.7+?
ОтветAnswer
Несколько критичных отличий: (1) **Teardown failure reflected в DagRun state** — если cleanup упал, DAG помечен failed. С обычным `all_done` — DAG success даже если teardown failed (cost leak неконтролируемо). (2) **Scope явный** — `setups=` параметр указывает что teardown относится к этому setup. Cleared run корректно re-runs scope. (3) **Setup failure handling** — даже если setup упал, teardown запускается для cleanup partial state. С `all_done` — зависит от skip propagation. (4) **First-class в UI** — teardown tasks помечены отдельно, видно scope. (5) **Не учитывается в default DagRun success criteria** — teardown success не required для DagRun success (но teardown failure делает DagRun failed). Это unified semantics ephemeral resource management — критично для production cost control.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В чём главное преимущество as_teardown(setups=...) по сравнению с trigger_rule='all_done'?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8