Learning Platform
Глоссарий Troubleshooting
Урок 04.01 · 18 мин
Средний
DAGSchedulerStageJobShuffle Boundary

DAG Scheduler: От действия к стадиям

Когда вы вызываете df.count() или df.write.parquet(...), Spark не просто “выполняет код”. Он строит DAG (Directed Acyclic Graph) — ориентированный ациклический граф зависимостей между операциями, а затем разбивает его на stages для параллельного выполнения.

Что запускает Job?

В Spark есть два типа операций:

  • Transformations (ленивые): filter(), select(), groupBy(), join() — не запускают вычисления, а только добавляют шаги в DAG
  • Actions (запускающие): collect(), count(), save(), show(), take() — триггерят выполнение всего DAG
# Всё это -- transformations, ничего не выполняется
df = spark.read.parquet("/data/orders/")
filtered = df.filter(df.amount > 100)
grouped = filtered.groupBy("customer_id").agg({"amount": "sum"})

# А вот это -- action, запускающий Job
grouped.show()  # <-- Здесь Spark строит и выполняет весь DAG

Каждый action создаёт один Job. Job — это полный план вычислений от чтения данных до получения результата.

Stages: разбиение на shuffle boundaries

DAG Scheduler разбивает Job на stages по границам shuffle (wide dependencies). Stage — это набор задач, которые могут выполняться без перемешивания данных между executors.

Narrow vs Wide Dependencies

ТипПримерыShuffle?Стадия
Narrowfilter(), map(), select(), union()НетОстаётся в текущей stage
WidegroupBy(), join(), repartition(), distinct(), sort()ДаСоздаёт новую stage

Narrow dependency: каждая выходная партиция зависит от одной входной партиции. Данные не нужно перемещать между executors.

Wide dependency: выходная партиция зависит от нескольких входных партиций. Данные необходимо перемешать (shuffle) по сети.

Пример: JOIN создаёт параллельные stages

Рассмотрим реальный запрос с JOIN:

# Два источника данных
orders = spark.read.parquet("/data/orders/")
customers = spark.read.parquet("/data/customers/")

# Фильтрация + JOIN
result = (
    orders
    .filter(orders.amount > 100)        # narrow (Stage 0)
    .join(
        customers,                       # Stage 1: scan customers
        orders.customer_id == customers.id
    )                                    # wide: shuffle + join (Stage 2)
)

result.show()

DAG Scheduler создаёт 3 стадии:

Stage 0: Scan orders -> Filter(amount > 100)     ──┐
                                                     ├── shuffle ──> Stage 2: SortMergeJoin -> Result
Stage 1: Scan customers                           ──┘

Stage 0 и Stage 1 выполняются параллельно — они читают данные из разных источников и не зависят друг от друга. Stage 2 начинается только когда обе предыдущие стадии завершены, потому что для JOIN нужны данные с обеих сторон.

DAG Execution Flow

SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.amount > 100

Stage 0
Scan orders + Filter
shuffle
Stage 1
Scan customers
shuffle
Stage 2
SortMergeJoin + Result
Stages 0 и 1 выполняются параллельноStage 2 ждёт завершения обеихНажмите на stage для деталей

Типы стадий

Spark различает два типа stages:

  • ShuffleMapStage — промежуточная стадия, результат которой используется следующей стадией через shuffle. Stage 0 и Stage 1 в нашем примере — ShuffleMapStages.
  • ResultStage — финальная стадия, результат которой отправляется driver (для show(), collect()) или записывается в хранилище (для write()). Stage 2 — ResultStage.

Просмотр стадий через explain()

result.explain(True)

Вывод explain() показывает физический план с указанием стадий:

== Physical Plan ==
*(3) SortMergeJoin [customer_id#0], [id#5], Inner
:- *(1) Filter (amount#2 > 100)
:  +- *(1) ColumnarToRow
:     +- FileScan parquet [customer_id#0,amount#2]
+- *(2) ColumnarToRow
   +- FileScan parquet [id#5,name#6]

Числа в *() — это stage IDs, созданные whole-stage codegen. *(1) — Stage 0 (scan + filter orders), *(2) — Stage 1 (scan customers), *(3) — Stage 2 (SortMergeJoin).

Tasks: единица работы

Каждая stage разбивается на tasks — по одному task на каждую партицию данных. Если у вас 200 партиций в Stage 0, DAG Scheduler создаст 200 tasks.

TaskSet — это набор всех tasks одной стадии, передаваемый в TaskScheduler для распределения по executors.

Stage 0 (4 partitions) -> TaskSet [Task 0, Task 1, Task 2, Task 3]
Stage 1 (2 partitions) -> TaskSet [Task 0, Task 1]
Stage 2 (200 partitions) -> TaskSet [Task 0, Task 1, ..., Task 199]

Количество tasks в Stage 2 определяется параметром spark.sql.shuffle.partitions (по умолчанию 200). Это часто избыточно для небольших данных и недостаточно для огромных datasets.

Проверка знанийKnowledge check
Что произойдёт, если в вашем запросе 3 JOIN операции? Сколько stages будет создано?
ОтветAnswer
Минимум 7 stages. Каждый JOIN создаёт shuffle boundary. Для 3 JOINов: 4 stage для чтения каждой таблицы (scan stages), и 3 stage для каждого JOIN (shuffle + merge). Но реальное число зависит от типа JOIN -- если Spark выберет BroadcastHashJoin для маленькой таблицы, то shuffle boundary не создаётся и stages будет меньше. Используйте explain() для просмотра реального плана.

Что дальше?

В следующем уроке мы разберём Task Scheduler — компонент, который принимает TaskSet от DAG Scheduler и распределяет tasks по executors с учётом data locality и scheduling policy (FIFO vs FAIR).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Запрос содержит чтение двух таблиц, filter на каждой, JOIN между ними и финальную агрегацию (groupBy + sum). Сколько stages создаст DAG Scheduler?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6