Learning Platform
Глоссарий Troubleshooting
Урок 18.04 · 22 мин
Продвинутый
TaskGroupReusabilitySetupTeardownEphemeral Compute

Reusable TaskGroups + Setup/Teardown patterns

TaskGroup — это намного больше чем UI grouping. Это первоклассная абстракция для reuse: один параметризованный TaskGroup может быть использован 10 раз в одном DAG с разными параметрами. В сочетании с Setup/Teardown (2.7+, stable в 2.10.5+) это даёт мощный pattern для wrapping ephemeral compute — create cluster, run jobs, terminate cluster.

Этот урок — три практических pattern: (1) TaskGroup factory для параметризованного reuse, (2) Setup/Teardown для ephemeral resources, (3) Nested TaskGroups для иерархических pipelines.


Context managers — основа паттерна with TaskGroup

TaskGroup — recap

Из модуля 02 — TaskGroup группирует tasks логически. В UI они collapsible, имеют общий namespace для task_ids. В отличие от SubDAG, TaskGroup tasks остаются в parent DAG — нет двойного DagRun, нет recursive slot consumption.

from airflow.utils.task_group import TaskGroup

with TaskGroup(group_id="validation") as validation:
    check1 = check_row_count()
    check2 = check_schema()
    check1 >> check2

extract() >> validation >> load()

group_id создаёт namespace: внутри validation task check_row_count имеет full id validation.check_row_count.

TaskGroup factory + Setup/Teardown wrap
make_validation_group(table, min_rows, group_id)Factory function returns TaskGroup. Параметризован table_name, expected_min_rows, schema_check flag, group_id. Внутри определяет @task функции с captured params. Reusable — можно вызвать N раз в одном DAG с разными targets. group_id обязательно unique per call.
instantiate × N для разных tables
validate_orders (group)Один instance factory call с table_name='staging.orders', expected_min_rows=1000, group_id='validate_orders'. Внутри: check_row_count → check_freshness → check_schema. Все task_ids имеют prefix 'validate_orders.'. В UI — collapsible block.
validate_customers (group)Идентичная структура, другие params: table='staging.customers', min_rows=100. group_id='validate_customers' — unique чтобы не conflict с другими instances. Reuses factory code — нет copy-paste.
validate_products (group)schema_check=False — opt-out от schema validation для этой table. Factory имеет условную логику включения check_schema task. group_id='validate_products'. Демонстрирует flexibility factory pattern.
композиция с Setup/Teardown
create_cluster.as_setup()Setup task — создаёт EMR cluster с deterministic name (airflow-{{ ds }}). XCom return — cluster_id для downstream. Setup/Teardown семантически: если setup failed, downstream skipped, teardown тоже skipped (не пытается terminate non-existent). С 2.10.5+ — production ready.
TaskGroup: spark_jobs (main work)Внутри TaskGroup — три parallel Spark jobs: job_orders, job_customers, job_products. Все depend на cluster.output (XCom). TaskGroup group_id='spark_jobs' — UI группировка. Teardown ждёт пока все три complete (success или failure).
terminate_cluster.as_teardown(setups=cluster, on_failure_fail_dagrun=True)Teardown — terminate EMR cluster. on_failure_fail_dagrun=True (default) — если teardown failed (cluster не terminated), DagRun становится failed → alert triggers → SRE видит проблему day-of. Без Setup/Teardown это решалось через trigger_rule='all_done' — но dagRun success при teardown failure (cost leak).

Pattern 1: TaskGroup factory function

from airflow.utils.task_group import TaskGroup
from airflow.decorators import task

def make_validation_group(
    table_name: str,
    expected_min_rows: int,
    schema_check: bool = True,
    group_id: str = "validate",
):
    """Returns TaskGroup для validation given table."""
    with TaskGroup(group_id=group_id) as group:

        @task(task_id=f"check_row_count")
        def check_row_count():
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            hook = PostgresHook(postgres_conn_id="warehouse")
            count = hook.get_first(f"SELECT COUNT(*) FROM {table_name}")[0]
            assert count >= expected_min_rows, \
                f"{table_name} has {count} rows, expected >= {expected_min_rows}"
            return count

        @task(task_id=f"check_freshness")
        def check_freshness():
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            hook = PostgresHook(postgres_conn_id="warehouse")
            max_date = hook.get_first(f"SELECT MAX(updated_at) FROM {table_name}")[0]
            from datetime import datetime, timedelta
            assert max_date >= datetime.now() - timedelta(hours=24), \
                f"{table_name} max date is {max_date} — older than 24h"
            return max_date

        if schema_check:
            @task(task_id=f"check_schema")
            def check_schema():
                expected_cols = {"id", "created_at", "updated_at"}
                from airflow.providers.postgres.hooks.postgres import PostgresHook
                hook = PostgresHook(postgres_conn_id="warehouse")
                rows = hook.get_records(
                    f"SELECT column_name FROM information_schema.columns "
                    f"WHERE table_name = '{table_name}'"
                )
                actual = {r[0] for r in rows}
                missing = expected_cols - actual
                assert not missing, f"{table_name} missing columns: {missing}"

            check_row_count() >> check_freshness() >> check_schema()
        else:
            check_row_count() >> check_freshness()

    return group

Использование в DAG:

@dag(...)
def multi_table_pipeline():
    extract_orders = extract("orders")
    extract_customers = extract("customers")
    extract_products = extract("products")

    # Один параметризованный TaskGroup × 3 раза
    validate_orders = make_validation_group(
        table_name="staging.orders", expected_min_rows=1000, group_id="validate_orders"
    )
    validate_customers = make_validation_group(
        table_name="staging.customers", expected_min_rows=100, group_id="validate_customers"
    )
    validate_products = make_validation_group(
        table_name="staging.products", expected_min_rows=50, schema_check=False,
        group_id="validate_products"
    )

    extract_orders >> validate_orders
    extract_customers >> validate_customers
    extract_products >> validate_products

В UI это три separate TaskGroups, каждый с одинаковой структурой, но разными targets.


Pattern 2: Setup/Teardown для ephemeral compute

Setup/Teardown (введён в AIP-52, доступен с 2.7+, stable в 2.10.5+) — для resources которые создаются перед main work и удаляются после. Идеальный pattern для:

  • Spark/EMR clusters
  • Compute pods
  • Database snapshots для testing
  • Temporary cloud storage buckets
from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=..., catchup=False)
def spark_etl_with_ephemeral_cluster():
    @task
    def create_emr_cluster() -> str:
        """Setup — create EMR cluster."""
        import boto3
        emr = boto3.client("emr")
        response = emr.run_job_flow(
            Name=f"airflow-spark-{{{{ ds }}}}"
            ReleaseLabel="emr-6.15.0"
            Instances={
                "InstanceGroups": [
                    {"Name": "master", "InstanceRole": "MASTER",
                     "InstanceType": "m5.xlarge", "InstanceCount": 1},
                    {"Name": "workers", "InstanceRole": "CORE",
                     "InstanceType": "m5.2xlarge", "InstanceCount": 4},
                ],
                "KeepJobFlowAliveWhenNoSteps": True,
            },
            JobFlowRole="EMR_EC2_DefaultRole"
            ServiceRole="EMR_DefaultRole",
        )
        return response["JobFlowId"]

    @task
    def run_spark_job(cluster_id: str):
        """Main work — submit Spark job."""
        import boto3
        emr = boto3.client("emr")
        emr.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[{
                "Name": "Transform orders",
                "ActionOnFailure": "TERMINATE_CLUSTER",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": ["spark-submit", "s3://bucket/jobs/transform_orders.py"]
                }
            }]
        )
        # Wait for step completion (omitted)

    @task
    def terminate_emr_cluster(cluster_id: str):
        """Teardown — delete EMR cluster."""
        import boto3
        boto3.client("emr").terminate_job_flows(JobFlowIds=[cluster_id])

    # Setup/Teardown wiring
    cluster = create_emr_cluster()
    job = run_spark_job(cluster)
    terminate = terminate_emr_cluster(cluster)

    # Critical — mark as setup/teardown
    cluster.as_setup() >> job >> terminate.as_teardown(setups=cluster)

spark_etl_with_ephemeral_cluster()

Что даёт Setup/Teardown

ПоведениеБез Setup/TeardownС Setup/Teardown
Cluster termination после job success✅ через trigger_rule=‘all_done’✅ автоматически
Cluster termination после job failure✅ через trigger_rule=‘all_done’✅ автоматически
DagRun reflects teardown failure (cost control)❌ DagRun success даже если teardown failed✅ DagRun failed если teardown failed
Skip teardown если setup не запускался❌ teardown пытается удалить cluster which doesn’t exist✅ skipped автоматически
Clear semanticImplicit (trigger_rule magic)Explicit (as_setup / as_teardown)
WARNING

Используйте Airflow 2.10.5+ для production. В 2.7-2.10.4 есть несколько bug с Setup/Teardown — teardown иногда skipped при middle failures + exhausted retries.


Pattern 3: Setup/Teardown wrapping TaskGroup

Combine: setup → TaskGroup of main work → teardown.

@dag(...)
def complex_spark_pipeline():
    @task
    def create_cluster() -> str: ...

    @task
    def terminate_cluster(cluster_id: str): ...

    cluster = create_cluster()

    # TaskGroup со всей основной работой
    with TaskGroup(group_id="spark_jobs") as spark_jobs:
        @task
        def job_orders(cluster_id: str): ...

        @task
        def job_customers(cluster_id: str): ...

        @task
        def job_products(cluster_id: str): ...

        # Все три jobs parallel, depend на cluster
        [job_orders(cluster.output), job_customers(cluster.output), job_products(cluster.output)]

    terminate = terminate_cluster(cluster)

    # Wire setup/teardown
    cluster.as_setup() >> spark_jobs >> terminate.as_teardown(setups=cluster)

complex_spark_pipeline()

Teardown произойдёт после всех jobs в spark_jobs TaskGroup — success или failure.


Pattern 4: Nested TaskGroups

Для иерархических pipelines:

@dag(...)
def hierarchical_etl():
    with TaskGroup(group_id="ingestion") as ingestion:
        with TaskGroup(group_id="postgres_sources"):
            ingest_orders()
            ingest_customers()

        with TaskGroup(group_id="kafka_sources"):
            ingest_events()
            ingest_clicks()

    with TaskGroup(group_id="transformation") as transformation:
        with TaskGroup(group_id="staging"):
            stage_orders()
            stage_events()

        with TaskGroup(group_id="warehouse"):
            load_orders_dim()
            load_events_fact()

    with TaskGroup(group_id="quality_checks") as qa:
        check_orders()
        check_events()

    ingestion >> transformation >> qa

В UI — collapsible hierarchy:

[+] ingestion
    [+] postgres_sources
        ingest_orders
        ingest_customers
    [+] kafka_sources
        ingest_events
        ingest_clicks
[+] transformation
    [+] staging
        ...
    [+] warehouse
        ...
[+] quality_checks
    ...

Pattern 5: Reusable Setup/Teardown — context manager

Можно encapsulate setup/teardown в context manager pattern:

from contextlib import contextmanager

@contextmanager
def ephemeral_emr_cluster(cluster_config: dict):
    """Yields cluster_id, ensures termination."""
    @task
    def create() -> str: ...

    @task
    def terminate(cluster_id: str): ...

    cluster = create()
    yield cluster
    teardown_task = terminate(cluster)
    cluster.as_setup() >> teardown_task
    teardown_task.as_teardown(setups=cluster)

# Usage
@dag(...)
def my_etl():
    with ephemeral_emr_cluster({"size": "large"}) as cluster:
        @task
        def transform_orders(cluster_id: str): ...

        @task
        def transform_customers(cluster_id: str): ...

        transform_orders(cluster)
        transform_customers(cluster)

Это синтаксический сахар поверх Setup/Teardown — visual cleaner, semantically тот же.


Pattern 6: Per-Dataset TaskGroup

Для DAG который processes несколько datasets:

def make_dataset_processing_group(dataset_name: str, dataset_path: str):
    """TaskGroup для processing одного dataset."""
    with TaskGroup(group_id=f"process_{dataset_name}") as group:
        @task(task_id="download")
        def download():
            ...

        @task(task_id="validate")
        def validate():
            ...

        @task(task_id="transform")
        def transform():
            ...

        @task(task_id="upload")
        def upload():
            ...

        download() >> validate() >> transform() >> upload()

    return group

@dag(...)
def multi_dataset_pipeline():
    process_orders = make_dataset_processing_group("orders", "s3://...")
    process_customers = make_dataset_processing_group("customers", "s3://...")
    process_products = make_dataset_processing_group("products", "s3://...")

    @task
    def final_aggregation(): ...

    [process_orders, process_customers, process_products] >> final_aggregation()

Production gotchas

group_id должен быть unique в DAG. Если используете factory, ensure factory passes unique group_id. Reused group_id → conflict.

Tasks внутри TaskGroup используют parent’s executor / pool. TaskGroup не изолирует execution, только UI/namespace. Если нужна isolation — отдельный DAG.

Setup task’s XCom output автоматически передаётся в teardown через setups= argument. Если хотите pass дополнительные данные через teardown — через context.

Teardown failure делает DagRun failed. Это cost-control feature: если cluster не terminated, оператор знает что money продолжают сжигаться. Если хотите ignore teardown failure → set on_failure_fail_dagrun=False:

terminate_cluster(cluster).as_teardown(setups=cluster, on_failure_fail_dagrun=False)

Nested TaskGroup имеет limit depth ~4-5 уровней. UI rendering становится slow на deep nesting. Если нужно — отдельный DAG.

TaskGroup factory + Dynamic Task Mapping. Можно сочетать — TaskGroup внутри которого .expand(). Но это complex — используйте только когда clearly understood (модуль 07).

TaskGroup prefix_group_id=True (default). Task внутри validate group имеет full id validate.check_row_count. Если хотите старое поведение (flat task_ids) — prefix_group_id=False, но это deprecated.


Проверка знанийKnowledge check
Production team использует Spark на EMR с trigger_rule='all_done' для termination cluster. Случилось: один Spark job failed, but DagRun state success (потому что terminate ran). На AWS billing спустя несколько дней замечают $5000 charges за один уничтоженный кластер. Что произошло и как Setup/Teardown решает это?
ОтветAnswer
Сценарий — classic 'all_done' anti-pattern. trigger_rule='all_done' на teardown означает: запусти когда upstream done (success ИЛИ failure ИЛИ skipped). Поведение: (1) Spark job failed → teardown triggered → terminate_cluster ran; (2) НО — terminate_cluster operator имеет bug или transient AWS error → teardown сам failed; (3) trigger_rule='all_done' на _следующих_ tasks doesn't apply здесь — terminate был last task; (4) DagRun state computed from tasks — если все non-teardown tasks success, dagRun=success even though terminate failed. Result: dagRun success, BUT cluster still running (terminate failed silently). Через дни накапливается $5000 cost. Setup/Teardown (2.7+, stable в 2.10.5+) решает: (1) **on_failure_fail_dagrun=True** (default) — если teardown failed, DagRun становится failed. Alert triggers. SRE видит проблему в day-of, не неделю спустя; (2) **explicit semantic** as_setup() / as_teardown() — нет dependency на trigger_rule magic; (3) **skip teardown if setup didn't run** — если create_cluster был skipped (например DAG started но setup failed validation), teardown тоже skipped — не пытается terminate non-existent cluster (which would also fail); (4) **Independent retry control** для teardown — может иметь свои retries отдельно от main job; (5) **UI shows setup/teardown семантически** — operators знают что это lifecycle resources, не business logic. Code migration: replace `cluster >> job >> terminate.set_trigger_rule('all_done')` на `cluster.as_setup() >> job >> terminate.as_teardown(setups=cluster)`. Plus add monitoring alert на 'EMR cluster age > 24h' — defense in depth для catch any leak. Lesson: lifecycle resources should use lifecycle primitives. trigger_rule — magic, Setup/Teardown — explicit. С 2.10.5 — production ready (early versions имели bugs).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. TaskGroup factory function (make_validation_group(table)) returning TaskGroup — что даёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8