TaskGroup vs SubDAG
В Airflow 1.x для группировки tasks использовался SubDagOperator — отдельный DAG, который запускался как одна task родительского DAG. Это был mainstream pattern, но имел серьёзные performance и debugging issues. В Airflow 2.0 появился TaskGroup — UI-only группировка без отдельного DAG. К 2.7 SubDAG считается deprecated и anti-pattern.
Этот урок объясняет почему SubDAG плох и как использовать TaskGroup правильно.
SubDagOperator — что было
Концепция: создаём DAG, оборачиваем в SubDagOperator, используем как task:
# subdag.py
def subdag(parent_dag_id, child_dag_id, args):
subdag = DAG(
dag_id=f"{parent_dag_id}.{child_dag_id}"
default_args=args,
)
with subdag:
task1 = BashOperator(task_id="task1", bash_command="echo 1")
task2 = BashOperator(task_id="task2", bash_command="echo 2")
task1 >> task2
return subdag
# main_dag.py
from subdag import subdag
from airflow.operators.subdag import SubDagOperator
with DAG("main_dag", default_args=args) as dag:
subdag_op = SubDagOperator(
task_id="my_subdag"
subdag=subdag("main_dag", "my_subdag", args),
)
Это создавало:
- Отдельный
dag_id = "main_dag.my_subdag"в metadata DB - Отдельный DagRun для subdag при каждом parent run
- Все subdag tasks parsed/scheduled независимо
Почему SubDAG — anti-pattern
Проблема 1: Deadlock с executor concurrency
SubDagOperator сам task в parent DAG → занимает slot в parent’s pool. Внутренние tasks subdag тоже занимают slots. Это создаёт recursive consumption:
Допустим, parallelism = 32 глобально. Parent DAG имеет 5 subdag operators. Каждый subdag имеет 10 tasks. Запуск всех parallel:
- 5 slots на subdag operators (parents)
- 50 slots на inner tasks (5 × 10)
- Total: 55 slots needed, only 32 available
Deadlock: subdag operators ждут что их inner tasks завершатся, но inner tasks не могут получить slot, потому что subdag operators их держат.
Проблема 2: SequentialExecutor для subdag
До 2.0 default SubDagOperator.executor = SequentialExecutor (potencial для concurrent execution). Это означало что inner tasks subdag runs sequentially, даже если parent DAG имеет CeleryExecutor с массой workers.
Проблема 3: Performance scheduler
Каждый subdag — отдельный DAG → scheduler обрабатывает их в Phase 1-2 main loop. С 100 subdags = 100 дополнительных DagRuns/cycle. Scheduler loop замедляется.
Проблема 4: UI confusion
Subdag показывается как task в parent graph view. Чтобы увидеть inner tasks — нужно zoom-in, открыть отдельный view. Cross-page navigation. Метрики “DAG success rate” путают main DAG и subdag.
Проблема 5: Clear/retry semantics
Если clear subdag operator, нужно ли clear inner tasks? Поведение было inconsistent.
Итог
SubDagOperator deprecated с 2.2, удалён в 3.0. Никогда не используйте в новом коде.
TaskGroup — современный подход
TaskGroup — это UI-only группировка. Tasks остаются в parent DAG, но в graph view они показываются как collapsible group. Никакого отдельного DAG, никаких отдельных DagRun-ов.
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
from datetime import datetime
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
)
def main_dag():
@task
def start_task(): pass
with TaskGroup("etl_group") as etl:
@task
def extract(): return "data"
@task
def transform(d: str): return d.upper()
@task
def load(d: str): print(d)
load(transform(extract()))
@task
def end_task(): pass
start_task() >> etl >> end_task()
В UI вы видите:
start_task → [etl_group] → end_task
При click на etl_group — expand, показывает extract → transform → load.
TaskFlow @task_group decorator
С 2.5+ есть decorator-based syntax:
from airflow.decorators import dag, task, task_group
@dag(...)
def main_dag():
@task_group(group_id="etl_group")
def etl():
@task
def extract(): return "data"
@task
def transform(d): return d.upper()
@task
def load(d): print(d)
load(transform(extract()))
etl() # ← invocation в parent dag
Function-based syntax, более expressive и более readable.
Nested TaskGroups
@dag(...)
def hierarchical():
with TaskGroup("data_processing") as data_group:
with TaskGroup("extraction") as extract:
extract_a = extract_a_task()
extract_b = extract_b_task()
with TaskGroup("transformation") as transform:
transform_a = transform_a_task()
transform_b = transform_b_task()
with TaskGroup("loading") as load:
load_a = load_task()
extract >> transform >> load
В UI:
data_processing
├── extraction
│ ├── extract_a
│ └── extract_b
├── transformation
│ ├── transform_a
│ └── transform_b
└── loading
└── load_a
group_id taskmaster делает namespace: actual task_id внутри = data_processing.extraction.extract_a.
TaskGroup для reusable patterns
Common usage: factory function для reusable sub-graphs.
def create_etl_group(group_id: str, source: str, target: str) -> TaskGroup:
"""Reusable ETL group."""
with TaskGroup(group_id) as group:
@task
def extract(source: str):
return f"data_from_{source}"
@task
def transform(data: str, target: str):
return f"transformed_for_{target}: {data}"
@task
def load(data: str, target: str):
print(f"Loading to {target}: {data}")
load(transform(extract(source), target), target)
return group
@dag(...)
def multi_target_etl():
@task
def start(): pass
@task
def end(): pass
s = start()
e = end()
for target in ["s3", "gcs", "azure"]:
group = create_etl_group(f"etl_{target}", source="postgres", target=target)
s >> group >> e
Создаст три параллельных group: etl_s3, etl_gcs, etl_azure, все после start, все перед end.
TaskGroup vs Dynamic Task Mapping
Иногда люди confused — когда TaskGroup, когда .expand()?
| Use case | TaskGroup | Dynamic Mapping |
|---|---|---|
| Static N задач, известных при parse | ✅ | Излишне |
| Dynamic N задач, известных при runtime | ❌ (нельзя menggunakan loop в TaskGroup) | ✅ |
| Логическая группировка related tasks | ✅ | ❌ |
| Parallel runs одной операции над разными inputs | Возможно но избыточно | ✅ |
# Static — TaskGroup
with TaskGroup("etl_per_table") as group:
for table in ["users", "orders", "products"]:
@task(task_id=f"process_{table}")
def process_table():
pass
# Dynamic — expand
@task
def get_tables() -> list[str]:
return ["users", "orders", "products"] # Может быть динамически из API
@task
def process_table(table_name: str):
pass
process_table.expand(table_name=get_tables())
Edge cases
Pitfall 1: Cross-group dependencies
@dag(...)
def cross_group():
with TaskGroup("group_a") as a:
a_task1 = task_a1()
a_task2 = task_a2()
with TaskGroup("group_b") as b:
b_task1 = task_b1()
# ❌ Trying cross-group dependency
a_task2 >> b_task1
Это работает, но в UI выглядит сложно — связь идёт между groups, не на уровне groups. Если хотите clean visual:
a >> b # Lock-step: all of group_a → all of group_b
Использовать individual tasks dependency только если нужно для logic correctness, не для visualization.
Pitfall 2: group_id collision
# ❌ Same group_id в TaskGroup и task — undefined behavior
with TaskGroup("processing") as group: ...
@task(task_id="processing") def processing_task(): ...
task_id внутри group имеет prefix group_id.task_id. Не collisionируйте с group_id.
Pitfall 3: Глубокая nesting
# UI становится messy с 4+ уровней nesting
with TaskGroup("level1"):
with TaskGroup("level2"):
with TaskGroup("level3"):
with TaskGroup("level4"):
with TaskGroup("level5"):
task_x()
Best practice: max 2-3 уровня nesting. Если больше — это smell для refactor.
Migration от SubDAG к TaskGroup
Если у вас legacy SubDAG в production:
Шаг 1: Найти все SubDAG
grep -r "SubDagOperator" dags/
Шаг 2: Convert
Было:
from airflow.operators.subdag import SubDagOperator
def make_subdag(parent_id, child_id, args):
subdag = DAG(dag_id=f"{parent_id}.{child_id}", ...)
with subdag:
t1 = BashOperator(...)
t2 = BashOperator(...)
t1 >> t2
return subdag
with DAG("main", ...) as dag:
SubDagOperator(task_id="my_subdag", subdag=make_subdag(...))
Стало:
def make_task_group(group_id):
with TaskGroup(group_id) as group:
t1 = BashOperator(task_id="t1", ...)
t2 = BashOperator(task_id="t2", ...)
t1 >> t2
return group
with DAG("main", ...) as dag:
make_task_group("my_subdag")
Шаг 3: Cleanup metadata DB
После migration старые subdag dag_ids остаются в БД. Чистим:
DELETE FROM serialized_dag WHERE dag_id LIKE 'main.%';
DELETE FROM dag WHERE dag_id LIKE 'main.%';
-- Также cleanup dag_run, task_instance, log для этих dag_ids
Или через CLI:
airflow dags delete -y main.my_subdag