Learning Platform
Глоссарий Troubleshooting
Урок 03.07 · 18 мин
Продвинутый
TaskGroupSubDAGAnti-patternDAG Organization

TaskGroup vs SubDAG

В Airflow 1.x для группировки tasks использовался SubDagOperator — отдельный DAG, который запускался как одна task родительского DAG. Это был mainstream pattern, но имел серьёзные performance и debugging issues. В Airflow 2.0 появился TaskGroup — UI-only группировка без отдельного DAG. К 2.7 SubDAG считается deprecated и anti-pattern.

Этот урок объясняет почему SubDAG плох и как использовать TaskGroup правильно.

TaskGroup vs SubDAG — fundamental difference
SubDAG (legacy, deprecated)SubDagOperator создаёт ОТДЕЛЬНЫЙ DAG с dag_id='parent.child'. У него собственная dag_run row, собственные scheduling decisions, собственный entry в serialized_dag. Scheduler обрабатывает его independently от parent.
TaskGroup (modern)Просто visual grouping — tasks физически остаются в parent DAG. Нет отдельного dag_run, нет отдельного entry в serialized_dag. group_id используется как prefix для task_id (group_id.task_id).
parent DagRun triggered
SubDAG: 2x dag_run rowsПри каждом parent run scheduler создаёт ДВА dag_run: один для parent, один для subdag. SubDagOperator как task держит slot в parent's pool, ждёт пока subdag's DagRun завершится. Recursive consumption — основа deadlock проблемы.
TaskGroup: 1x dag_run rowОдин DagRun, все tasks принадлежат parent DAG. Inner tasks scheduled через обычный flow, используют parent's executor с full parallelism. Нет recursive slot consumption.
SubDAG: deadlock riskПри parallelism=32, 5 subdag operators по 10 inner tasks = нужно 55 slots, есть только 32. SubDag operators ждут свои inner tasks, inner tasks не могут получить slot. До 2.0 default SequentialExecutor усугублял (no parallelism внутри subdag).
TaskGroup: native parallelismInner tasks используют common pool через standard scheduler logic. Никаких double slot usage. Можно nest groups (max 2-3 уровня recommended). Clear/retry semantics consistent с обычными tasks.
Removed in Airflow 3.0SubDagOperator deprecated с 2.2, полностью removed в 3.0. Migration path: convert в TaskGroup + cleanup metadata DB (DELETE FROM dag WHERE dag_id LIKE 'parent.%'). Никогда не используйте в новом коде.

SubDagOperator — что было

Концепция: создаём DAG, оборачиваем в SubDagOperator, используем как task:

# subdag.py
def subdag(parent_dag_id, child_dag_id, args):
    subdag = DAG(
        dag_id=f"{parent_dag_id}.{child_dag_id}"
        default_args=args,
    )

    with subdag:
        task1 = BashOperator(task_id="task1", bash_command="echo 1")
        task2 = BashOperator(task_id="task2", bash_command="echo 2")
        task1 >> task2

    return subdag

# main_dag.py
from subdag import subdag
from airflow.operators.subdag import SubDagOperator

with DAG("main_dag", default_args=args) as dag:
    subdag_op = SubDagOperator(
        task_id="my_subdag"
        subdag=subdag("main_dag", "my_subdag", args),
    )

Это создавало:

  • Отдельный dag_id = "main_dag.my_subdag" в metadata DB
  • Отдельный DagRun для subdag при каждом parent run
  • Все subdag tasks parsed/scheduled независимо

Почему SubDAG — anti-pattern

Проблема 1: Deadlock с executor concurrency

SubDagOperator сам task в parent DAG → занимает slot в parent’s pool. Внутренние tasks subdag тоже занимают slots. Это создаёт recursive consumption:

Допустим, parallelism = 32 глобально. Parent DAG имеет 5 subdag operators. Каждый subdag имеет 10 tasks. Запуск всех parallel:

  • 5 slots на subdag operators (parents)
  • 50 slots на inner tasks (5 × 10)
  • Total: 55 slots needed, only 32 available

Deadlock: subdag operators ждут что их inner tasks завершатся, но inner tasks не могут получить slot, потому что subdag operators их держат.

Проблема 2: SequentialExecutor для subdag

До 2.0 default SubDagOperator.executor = SequentialExecutor (potencial для concurrent execution). Это означало что inner tasks subdag runs sequentially, даже если parent DAG имеет CeleryExecutor с массой workers.

Проблема 3: Performance scheduler

Каждый subdag — отдельный DAG → scheduler обрабатывает их в Phase 1-2 main loop. С 100 subdags = 100 дополнительных DagRuns/cycle. Scheduler loop замедляется.

Проблема 4: UI confusion

Subdag показывается как task в parent graph view. Чтобы увидеть inner tasks — нужно zoom-in, открыть отдельный view. Cross-page navigation. Метрики “DAG success rate” путают main DAG и subdag.

Проблема 5: Clear/retry semantics

Если clear subdag operator, нужно ли clear inner tasks? Поведение было inconsistent.

Итог

SubDagOperator deprecated с 2.2, удалён в 3.0. Никогда не используйте в новом коде.


TaskGroup — современный подход

TaskGroup — это UI-only группировка. Tasks остаются в parent DAG, но в graph view они показываются как collapsible group. Никакого отдельного DAG, никаких отдельных DagRun-ов.

from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
from datetime import datetime

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def main_dag():
    @task
    def start_task(): pass

    with TaskGroup("etl_group") as etl:
        @task
        def extract(): return "data"

        @task
        def transform(d: str): return d.upper()

        @task
        def load(d: str): print(d)

        load(transform(extract()))

    @task
    def end_task(): pass

    start_task() >> etl >> end_task()

В UI вы видите:

start_task → [etl_group] → end_task

При click на etl_group — expand, показывает extract → transform → load.


TaskFlow @task_group decorator

С 2.5+ есть decorator-based syntax:

from airflow.decorators import dag, task, task_group

@dag(...)
def main_dag():
    @task_group(group_id="etl_group")
    def etl():
        @task
        def extract(): return "data"
        @task
        def transform(d): return d.upper()
        @task
        def load(d): print(d)

        load(transform(extract()))

    etl()  # ← invocation в parent dag

Function-based syntax, более expressive и более readable.


Nested TaskGroups

@dag(...)
def hierarchical():
    with TaskGroup("data_processing") as data_group:
        with TaskGroup("extraction") as extract:
            extract_a = extract_a_task()
            extract_b = extract_b_task()

        with TaskGroup("transformation") as transform:
            transform_a = transform_a_task()
            transform_b = transform_b_task()

        with TaskGroup("loading") as load:
            load_a = load_task()

        extract >> transform >> load

В UI:

data_processing
├── extraction
│   ├── extract_a
│   └── extract_b
├── transformation
│   ├── transform_a
│   └── transform_b
└── loading
    └── load_a

group_id taskmaster делает namespace: actual task_id внутри = data_processing.extraction.extract_a.


TaskGroup для reusable patterns

Common usage: factory function для reusable sub-graphs.

def create_etl_group(group_id: str, source: str, target: str) -> TaskGroup:
    """Reusable ETL group."""
    with TaskGroup(group_id) as group:
        @task
        def extract(source: str):
            return f"data_from_{source}"

        @task
        def transform(data: str, target: str):
            return f"transformed_for_{target}: {data}"

        @task
        def load(data: str, target: str):
            print(f"Loading to {target}: {data}")

        load(transform(extract(source), target), target)

    return group

@dag(...)
def multi_target_etl():
    @task
    def start(): pass

    @task
    def end(): pass

    s = start()
    e = end()

    for target in ["s3", "gcs", "azure"]:
        group = create_etl_group(f"etl_{target}", source="postgres", target=target)
        s >> group >> e

Создаст три параллельных group: etl_s3, etl_gcs, etl_azure, все после start, все перед end.


TaskGroup vs Dynamic Task Mapping

Иногда люди confused — когда TaskGroup, когда .expand()?

Use caseTaskGroupDynamic Mapping
Static N задач, известных при parseИзлишне
Dynamic N задач, известных при runtime❌ (нельзя menggunakan loop в TaskGroup)
Логическая группировка related tasks
Parallel runs одной операции над разными inputsВозможно но избыточно
# Static — TaskGroup
with TaskGroup("etl_per_table") as group:
    for table in ["users", "orders", "products"]:
        @task(task_id=f"process_{table}")
        def process_table():
            pass

# Dynamic — expand
@task
def get_tables() -> list[str]:
    return ["users", "orders", "products"]  # Может быть динамически из API

@task
def process_table(table_name: str):
    pass

process_table.expand(table_name=get_tables())

Edge cases

Pitfall 1: Cross-group dependencies

@dag(...)
def cross_group():
    with TaskGroup("group_a") as a:
        a_task1 = task_a1()
        a_task2 = task_a2()

    with TaskGroup("group_b") as b:
        b_task1 = task_b1()

    # ❌ Trying cross-group dependency
    a_task2 >> b_task1

Это работает, но в UI выглядит сложно — связь идёт между groups, не на уровне groups. Если хотите clean visual:

a >> b  # Lock-step: all of group_a → all of group_b

Использовать individual tasks dependency только если нужно для logic correctness, не для visualization.

Pitfall 2: group_id collision

# ❌ Same group_id в TaskGroup и task — undefined behavior
with TaskGroup("processing") as group: ...
@task(task_id="processing") def processing_task(): ...

task_id внутри group имеет prefix group_id.task_id. Не collisionируйте с group_id.

Pitfall 3: Глубокая nesting

# UI становится messy с 4+ уровней nesting
with TaskGroup("level1"):
    with TaskGroup("level2"):
        with TaskGroup("level3"):
            with TaskGroup("level4"):
                with TaskGroup("level5"):
                    task_x()

Best practice: max 2-3 уровня nesting. Если больше — это smell для refactor.


Migration от SubDAG к TaskGroup

Если у вас legacy SubDAG в production:

Шаг 1: Найти все SubDAG

grep -r "SubDagOperator" dags/

Шаг 2: Convert

Было:

from airflow.operators.subdag import SubDagOperator

def make_subdag(parent_id, child_id, args):
    subdag = DAG(dag_id=f"{parent_id}.{child_id}", ...)
    with subdag:
        t1 = BashOperator(...)
        t2 = BashOperator(...)
        t1 >> t2
    return subdag

with DAG("main", ...) as dag:
    SubDagOperator(task_id="my_subdag", subdag=make_subdag(...))

Стало:

def make_task_group(group_id):
    with TaskGroup(group_id) as group:
        t1 = BashOperator(task_id="t1", ...)
        t2 = BashOperator(task_id="t2", ...)
        t1 >> t2
    return group

with DAG("main", ...) as dag:
    make_task_group("my_subdag")

Шаг 3: Cleanup metadata DB

После migration старые subdag dag_ids остаются в БД. Чистим:

DELETE FROM serialized_dag WHERE dag_id LIKE 'main.%';
DELETE FROM dag WHERE dag_id LIKE 'main.%';
-- Также cleanup dag_run, task_instance, log для этих dag_ids

Или через CLI:

airflow dags delete -y main.my_subdag

Проверка знанийKnowledge check
Почему SubDAG считается anti-pattern в 2.x, и как TaskGroup решает эти проблемы?
ОтветAnswer
**SubDAG проблемы**: (1) Deadlock с concurrency — SubDagOperator сам занимает slot в parent's pool + inner tasks тоже занимают slots → recursive consumption может вызвать deadlock; (2) Default SequentialExecutor для inner tasks (до фикса) — не использует параллелизм; (3) Performance scheduler — каждый subdag = отдельный DAG, scheduler processes их independently в main loop; (4) UI confusion — отдельный graph view для subdag, cross-page navigation; (5) Clear/retry semantics inconsistent. **TaskGroup решение**: (1) UI-only группировка — tasks остаются в parent DAG, нет отдельного DAG/DagRun; (2) Inner tasks используют parent's executor — full parallelism; (3) Нет recursive slot consumption; (4) Native UI — collapsible group в parent graph view; (5) Consistent clear/retry. SubDAG deprecated в 2.2, удалён в 3.0. Всегда используйте TaskGroup в новом коде.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Почему SubDagOperator считается anti-pattern в 2.x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8