Reusable TaskGroups + Setup/Teardown patterns
TaskGroup — это намного больше чем UI grouping. Это первоклассная абстракция для reuse: один параметризованный TaskGroup может быть использован 10 раз в одном DAG с разными параметрами. В сочетании с Setup/Teardown (2.7+, stable в 2.10.5+) это даёт мощный pattern для wrapping ephemeral compute — create cluster, run jobs, terminate cluster.
Этот урок — три практических pattern: (1) TaskGroup factory для параметризованного reuse, (2) Setup/Teardown для ephemeral resources, (3) Nested TaskGroups для иерархических pipelines.
Context managers — основа паттерна with TaskGroup
TaskGroup — recap
Из модуля 02 — TaskGroup группирует tasks логически. В UI они collapsible, имеют общий namespace для task_ids. В отличие от SubDAG, TaskGroup tasks остаются в parent DAG — нет двойного DagRun, нет recursive slot consumption.
from airflow.utils.task_group import TaskGroup
with TaskGroup(group_id="validation") as validation:
check1 = check_row_count()
check2 = check_schema()
check1 >> check2
extract() >> validation >> load()
group_id создаёт namespace: внутри validation task check_row_count имеет full id validation.check_row_count.
Pattern 1: TaskGroup factory function
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
def make_validation_group(
table_name: str,
expected_min_rows: int,
schema_check: bool = True,
group_id: str = "validate",
):
"""Returns TaskGroup для validation given table."""
with TaskGroup(group_id=group_id) as group:
@task(task_id=f"check_row_count")
def check_row_count():
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
count = hook.get_first(f"SELECT COUNT(*) FROM {table_name}")[0]
assert count >= expected_min_rows, \
f"{table_name} has {count} rows, expected >= {expected_min_rows}"
return count
@task(task_id=f"check_freshness")
def check_freshness():
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
max_date = hook.get_first(f"SELECT MAX(updated_at) FROM {table_name}")[0]
from datetime import datetime, timedelta
assert max_date >= datetime.now() - timedelta(hours=24), \
f"{table_name} max date is {max_date} — older than 24h"
return max_date
if schema_check:
@task(task_id=f"check_schema")
def check_schema():
expected_cols = {"id", "created_at", "updated_at"}
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
rows = hook.get_records(
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_name = '{table_name}'"
)
actual = {r[0] for r in rows}
missing = expected_cols - actual
assert not missing, f"{table_name} missing columns: {missing}"
check_row_count() >> check_freshness() >> check_schema()
else:
check_row_count() >> check_freshness()
return group
Использование в DAG:
@dag(...)
def multi_table_pipeline():
extract_orders = extract("orders")
extract_customers = extract("customers")
extract_products = extract("products")
# Один параметризованный TaskGroup × 3 раза
validate_orders = make_validation_group(
table_name="staging.orders", expected_min_rows=1000, group_id="validate_orders"
)
validate_customers = make_validation_group(
table_name="staging.customers", expected_min_rows=100, group_id="validate_customers"
)
validate_products = make_validation_group(
table_name="staging.products", expected_min_rows=50, schema_check=False,
group_id="validate_products"
)
extract_orders >> validate_orders
extract_customers >> validate_customers
extract_products >> validate_products
В UI это три separate TaskGroups, каждый с одинаковой структурой, но разными targets.
Pattern 2: Setup/Teardown для ephemeral compute
Setup/Teardown (введён в AIP-52, доступен с 2.7+, stable в 2.10.5+) — для resources которые создаются перед main work и удаляются после. Идеальный pattern для:
- Spark/EMR clusters
- Compute pods
- Database snapshots для testing
- Temporary cloud storage buckets
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=..., catchup=False)
def spark_etl_with_ephemeral_cluster():
@task
def create_emr_cluster() -> str:
"""Setup — create EMR cluster."""
import boto3
emr = boto3.client("emr")
response = emr.run_job_flow(
Name=f"airflow-spark-{{{{ ds }}}}"
ReleaseLabel="emr-6.15.0"
Instances={
"InstanceGroups": [
{"Name": "master", "InstanceRole": "MASTER",
"InstanceType": "m5.xlarge", "InstanceCount": 1},
{"Name": "workers", "InstanceRole": "CORE",
"InstanceType": "m5.2xlarge", "InstanceCount": 4},
],
"KeepJobFlowAliveWhenNoSteps": True,
},
JobFlowRole="EMR_EC2_DefaultRole"
ServiceRole="EMR_DefaultRole",
)
return response["JobFlowId"]
@task
def run_spark_job(cluster_id: str):
"""Main work — submit Spark job."""
import boto3
emr = boto3.client("emr")
emr.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[{
"Name": "Transform orders",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["spark-submit", "s3://bucket/jobs/transform_orders.py"]
}
}]
)
# Wait for step completion (omitted)
@task
def terminate_emr_cluster(cluster_id: str):
"""Teardown — delete EMR cluster."""
import boto3
boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])
# Setup/Teardown wiring
cluster = create_emr_cluster()
job = run_spark_job(cluster)
terminate = terminate_emr_cluster(cluster)
# Critical — mark as setup/teardown
cluster.as_setup() >> job >> terminate.as_teardown(setups=cluster)
spark_etl_with_ephemeral_cluster()
Что даёт Setup/Teardown
| Поведение | Без Setup/Teardown | С Setup/Teardown |
|---|---|---|
| Cluster termination после job success | ✅ через trigger_rule=‘all_done’ | ✅ автоматически |
| Cluster termination после job failure | ✅ через trigger_rule=‘all_done’ | ✅ автоматически |
| DagRun reflects teardown failure (cost control) | ❌ DagRun success даже если teardown failed | ✅ DagRun failed если teardown failed |
| Skip teardown если setup не запускался | ❌ teardown пытается удалить cluster which doesn’t exist | ✅ skipped автоматически |
| Clear semantic | Implicit (trigger_rule magic) | Explicit (as_setup / as_teardown) |
Используйте Airflow 2.10.5+ для production. В 2.7-2.10.4 есть несколько bug с Setup/Teardown — teardown иногда skipped при middle failures + exhausted retries.
Pattern 3: Setup/Teardown wrapping TaskGroup
Combine: setup → TaskGroup of main work → teardown.
@dag(...)
def complex_spark_pipeline():
@task
def create_cluster() -> str: ...
@task
def terminate_cluster(cluster_id: str): ...
cluster = create_cluster()
# TaskGroup со всей основной работой
with TaskGroup(group_id="spark_jobs") as spark_jobs:
@task
def job_orders(cluster_id: str): ...
@task
def job_customers(cluster_id: str): ...
@task
def job_products(cluster_id: str): ...
# Все три jobs parallel, depend на cluster
[job_orders(cluster.output), job_customers(cluster.output), job_products(cluster.output)]
terminate = terminate_cluster(cluster)
# Wire setup/teardown
cluster.as_setup() >> spark_jobs >> terminate.as_teardown(setups=cluster)
complex_spark_pipeline()
Teardown произойдёт после всех jobs в spark_jobs TaskGroup — success или failure.
Pattern 4: Nested TaskGroups
Для иерархических pipelines:
@dag(...)
def hierarchical_etl():
with TaskGroup(group_id="ingestion") as ingestion:
with TaskGroup(group_id="postgres_sources"):
ingest_orders()
ingest_customers()
with TaskGroup(group_id="kafka_sources"):
ingest_events()
ingest_clicks()
with TaskGroup(group_id="transformation") as transformation:
with TaskGroup(group_id="staging"):
stage_orders()
stage_events()
with TaskGroup(group_id="warehouse"):
load_orders_dim()
load_events_fact()
with TaskGroup(group_id="quality_checks") as qa:
check_orders()
check_events()
ingestion >> transformation >> qa
В UI — collapsible hierarchy:
[+] ingestion
[+] postgres_sources
ingest_orders
ingest_customers
[+] kafka_sources
ingest_events
ingest_clicks
[+] transformation
[+] staging
...
[+] warehouse
...
[+] quality_checks
...
Pattern 5: Reusable Setup/Teardown — context manager
Можно encapsulate setup/teardown в context manager pattern:
from contextlib import contextmanager
@contextmanager
def ephemeral_emr_cluster(cluster_config: dict):
"""Yields cluster_id, ensures termination."""
@task
def create() -> str: ...
@task
def terminate(cluster_id: str): ...
cluster = create()
yield cluster
teardown_task = terminate(cluster)
cluster.as_setup() >> teardown_task
teardown_task.as_teardown(setups=cluster)
# Usage
@dag(...)
def my_etl():
with ephemeral_emr_cluster({"size": "large"}) as cluster:
@task
def transform_orders(cluster_id: str): ...
@task
def transform_customers(cluster_id: str): ...
transform_orders(cluster)
transform_customers(cluster)
Это синтаксический сахар поверх Setup/Teardown — visual cleaner, semantically тот же.
Pattern 6: Per-Dataset TaskGroup
Для DAG который processes несколько datasets:
def make_dataset_processing_group(dataset_name: str, dataset_path: str):
"""TaskGroup для processing одного dataset."""
with TaskGroup(group_id=f"process_{dataset_name}") as group:
@task(task_id="download")
def download():
...
@task(task_id="validate")
def validate():
...
@task(task_id="transform")
def transform():
...
@task(task_id="upload")
def upload():
...
download() >> validate() >> transform() >> upload()
return group
@dag(...)
def multi_dataset_pipeline():
process_orders = make_dataset_processing_group("orders", "s3://...")
process_customers = make_dataset_processing_group("customers", "s3://...")
process_products = make_dataset_processing_group("products", "s3://...")
@task
def final_aggregation(): ...
[process_orders, process_customers, process_products] >> final_aggregation()
Production gotchas
group_id должен быть unique в DAG. Если используете factory, ensure factory passes unique group_id. Reused group_id → conflict.
Tasks внутри TaskGroup используют parent’s executor / pool. TaskGroup не изолирует execution, только UI/namespace. Если нужна isolation — отдельный DAG.
Setup task’s XCom output автоматически передаётся в teardown через setups= argument. Если хотите pass дополнительные данные через teardown — через context.
Teardown failure делает DagRun failed. Это cost-control feature: если cluster не terminated, оператор знает что money продолжают сжигаться. Если хотите ignore teardown failure → set on_failure_fail_dagrun=False:
terminate_cluster(cluster).as_teardown(setups=cluster, on_failure_fail_dagrun=False)
Nested TaskGroup имеет limit depth ~4-5 уровней. UI rendering становится slow на deep nesting. Если нужно — отдельный DAG.
TaskGroup factory + Dynamic Task Mapping. Можно сочетать — TaskGroup внутри которого .expand(). Но это complex — используйте только когда clearly understood (модуль 07).
TaskGroup prefix_group_id=True (default). Task внутри validate group имеет full id validate.check_row_count. Если хотите старое поведение (flat task_ids) — prefix_group_id=False, но это deprecated.