Setup / Teardown tasks (2.7+)
Распространённый pattern в data engineering: поднять ephemeral compute resource (Spark cluster, Snowflake warehouse, K8s namespace), выполнить tasks над ним, потом обязательно удалить — даже если что-то пошло не так. До Airflow 2.7 это решалось через trigger_rule=ALL_DONE на teardown task, но семантика была subtle и часто ломалась. В 2.7 появилась first-class абстракция Setup/Teardown (AIP уже забыли номер, но это часть 2.7 release).
Этот урок — как правильно использовать Setup/Teardown в production.
Проблема которую решает
Классический случай — provisioning Spark на EMR:
@dag(...)
def spark_etl_OLD_WAY():
create_cluster = EmrCreateJobFlowOperator(task_id="create_cluster", ...)
run_spark_step1 = EmrAddStepsOperator(task_id="step1", ...)
run_spark_step2 = EmrAddStepsOperator(task_id="step2", ...)
run_spark_step3 = EmrAddStepsOperator(task_id="step3", ...)
# Teardown — manually с trigger_rule
terminate_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_cluster"
trigger_rule="all_done", # ← запустить даже если что-то fail
)
create_cluster >> [run_spark_step1, run_spark_step2, run_spark_step3] >> terminate_cluster
Проблемы старого подхода:
- Teardown failure не reflected в DagRun state — DAG marked success даже если cluster не удалился (cost leak!)
trigger_rule="all_done"не работает если есть upstream skip — teardown пропускается- Cleanup при clear — если очистить run и retry, cluster может остаться running, новый run создаст ещё один
- Scope не явен — teardown wrap всех tasks неявно
Setup/Teardown — first-class abstraction
С 2.7+:
@dag(...)
def spark_etl_NEW_WAY():
create_cluster = EmrCreateJobFlowOperator(
task_id="create_cluster", ...
).as_setup() # ← Mark as setup
run_spark_step1 = EmrAddStepsOperator(task_id="step1", ...)
run_spark_step2 = EmrAddStepsOperator(task_id="step2", ...)
run_spark_step3 = EmrAddStepsOperator(task_id="step3", ...)
terminate_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_cluster", ...
).as_teardown(setups=create_cluster) # ← Link to setup
create_cluster >> [run_spark_step1, run_spark_step2, run_spark_step3]
[run_spark_step1, run_spark_step2, run_spark_step3] >> terminate_cluster
Что изменилось:
as_setup()помечает task как setup для downstreamas_teardown(setups=...)помечает task как teardown, связывает с setup- Teardown запускается всегда — независимо от состояния middle tasks
- DAG state корректно отражает teardown failure
Семантика Setup/Teardown
Правила выполнения
| Сценарий | Setup | Middle tasks | Teardown | DagRun |
|---|---|---|---|---|
| Все success | ✅ | ✅ | ✅ | success |
| Middle task fail | ✅ | ❌ | ✅ | failed |
| Setup fail | ❌ | skipped | ✅ (cleanup partial setup) | failed |
| Teardown fail | ✅ | ✅ | ❌ | failed |
| Setup succeed, teardown succeed, middle fail | ✅ | ❌ | ✅ | failed |
Ключевые отличия от старого trigger_rule=all_done:
- Teardown executed even if middle failed — это default
- Teardown failure reflected — DagRun = failed
- При setup failure teardown всё равно запускается — для cleanup partial setup state
2.10.5 fix — teardown executed even when DAG failed
В версиях 2.7-2.10.4 был bug: если middle tasks failed и retry exhausted, иногда teardown skipped. Полностью исправлено в 2.10.5.
Production recommendation: обязательно используйте 2.10.5+ или 2.11.x LTS для production Setup/Teardown.
Scope teardown
Teardown может быть scoped к подграфу:
@dag(...)
def scoped_teardown():
create_db = task_create_db().as_setup()
use_db_1 = task_use_db()
use_db_2 = task_use_db()
delete_db = task_delete_db().as_teardown(setups=create_db)
# Scope: create_db → [use_db_1, use_db_2] → delete_db
create_db >> [use_db_1, use_db_2] >> delete_db
# Дополнительные tasks НЕ в scope
other_task_1()
other_task_2()
delete_db запускается только после tasks в его scope (use_db_1, use_db_2), не дожидаясь other_task_1/2.
TaskFlow style
С @task декораторами:
@dag(...)
def taskflow_setup_teardown():
@task
def create_resource() -> str:
cluster_id = boto3.client("emr").run_job_flow(...)
return cluster_id
@task
def use_resource(cluster_id: str): ...
@task
def cleanup_resource(cluster_id: str):
boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
cluster = create_resource().as_setup()
use_resource(cluster)
cleanup_resource(cluster).as_teardown(setups=cluster)
Multiple setups и teardowns
Можно несколько setup + teardown в одном DAG:
@dag(...)
def multiple_resources():
# Resource 1: Spark cluster
spark_create = EmrCreateJobFlowOperator(...).as_setup()
spark_step = EmrAddStepsOperator(...)
spark_delete = EmrTerminateJobFlowOperator(...).as_teardown(setups=spark_create)
# Resource 2: Snowflake warehouse
sf_resume = SnowflakeOperator(sql="ALTER WAREHOUSE wh RESUME").as_setup()
sf_query = SnowflakeOperator(sql="...")
sf_suspend = SnowflakeOperator(sql="ALTER WAREHOUSE wh SUSPEND").as_teardown(setups=sf_resume)
# Scopes
spark_create >> spark_step >> spark_delete
sf_resume >> sf_query >> sf_suspend
Каждый teardown управляет только своим scope.
Edge cases и pitfalls
Pitfall 1: Setup без teardown
create = task1().as_setup() # ❌ No teardown
process = task2()
create >> process
Это lint warning — setup без teardown не имеет смысла (зачем mark если cleanup не выполняется?). Используйте обычные tasks без .as_setup().
Pitfall 2: Teardown слишком scoped
@dag(...)
def too_scoped():
create = task_create().as_setup()
work_1 = task_work()
work_2 = task_work() # ← не в scope
teardown = task_teardown().as_teardown(setups=create)
create >> work_1 >> teardown
work_2 # ← independent, тратит ресурсы parallel с teardown
Лучше явно include work_2 в scope:
create >> [work_1, work_2] >> teardown
Pitfall 3: Idempotency teardown
Teardown может запуститься несколько раз (retry, clear, manual rerun). Должен быть idempotent:
@task
def cleanup_resource(cluster_id: str):
try:
boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
except ClientError as e:
if "InvalidRequestException" in str(e):
# Cluster уже terminated — это норм
return
raise
Pitfall 4: Teardown не запускается если DAG paused
Если DAG paused во время выполнения, scheduler не будет запускать new tasks — включая teardown. Ресурс leak!
Mitigation: external monitoring + manual cleanup runbook.
Best practices
-
ВСЕГДА
as_teardown(setups=...)— без явного link teardown ведёт себя как обычная task сtrigger_rule=all_done. -
Idempotent teardown — graceful handling «уже удалено».
-
Teardown быстрый — он не должен иметь long-running tasks. Иначе DAG зависает.
-
Никаких
retriesна teardown безretry_delay— иначе цикл retries блокирует освобождение pool slot. -
Use TaskFlow style для simple cases — meaningfully чище.
-
2.10.5+ для production — все edge cases фиксы.
Production пример: full ephemeral pipeline
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"owner": "ml", "retries": 1, "retry_delay": timedelta(minutes=5)},
tags=["ml", "ephemeral"],
)
def ml_training_pipeline():
@task
def create_gpu_cluster() -> str:
"""Provision GPU cluster on EMR."""
client = boto3.client("emr")
response = client.run_job_flow(
Name="ml-training"
Instances={
"InstanceGroups": [
{"Name": "Master", "InstanceType": "m5.xlarge", "InstanceCount": 1},
{"Name": "Workers", "InstanceType": "g4dn.4xlarge", "InstanceCount": 4},
],
"Ec2KeyName": "...",
"TerminationProtected": False,
},
JobFlowRole="..."
ServiceRole="...",
)
return response["JobFlowId"]
@task
def preprocess_data(cluster_id: str) -> str:
"""Spark preprocessing step."""
submit_spark_step(cluster_id, "preprocess.py")
return "s3://bucket/preprocessed/"
@task
def train_model(cluster_id: str, data_path: str) -> str:
"""Training step."""
submit_spark_step(cluster_id, "train.py", args=[data_path])
return "s3://bucket/model.pkl"
@task
def evaluate(cluster_id: str, model_path: str):
submit_spark_step(cluster_id, "evaluate.py", args=[model_path])
@task(retries=3, retry_delay=timedelta(seconds=10))
def terminate_cluster(cluster_id: str):
"""Idempotent termination."""
try:
boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
except ClientError as e:
if "InvalidRequestException" in str(e) and "already terminated" in str(e):
return
raise
# Wiring
cluster = create_gpu_cluster().as_setup()
data = preprocess_data(cluster)
model = train_model(cluster, data)
evaluate_op = evaluate(cluster, model)
cleanup = terminate_cluster(cluster).as_teardown(setups=cluster)
# Ensure all middle tasks in cleanup scope
[data, model, evaluate_op] >> cleanup
При успешном run:
- Cluster создаётся
- Data preprocessed → model trained → evaluated
- Cluster удаляется
- DagRun success
При сбое evaluate:
- Data preprocessed → model trained → evaluate FAIL
- Cluster всё равно удаляется (no leak!)
- DagRun marked failed
При сбое create_gpu_cluster:
- All downstream skipped
- terminate_cluster всё равно runs (handles partial setup gracefully)