DAG Scheduler: От действия к стадиям
Когда вы вызываете df.count() или df.write.parquet(...), Spark не просто “выполняет код”. Он строит DAG (Directed Acyclic Graph) — ориентированный ациклический граф зависимостей между операциями, а затем разбивает его на stages для параллельного выполнения.
Что запускает Job?
В Spark есть два типа операций:
- Transformations (ленивые):
filter(),select(),groupBy(),join()— не запускают вычисления, а только добавляют шаги в DAG - Actions (запускающие):
collect(),count(),save(),show(),take()— триггерят выполнение всего DAG
# Всё это -- transformations, ничего не выполняется
df = spark.read.parquet("/data/orders/")
filtered = df.filter(df.amount > 100)
grouped = filtered.groupBy("customer_id").agg({"amount": "sum"})
# А вот это -- action, запускающий Job
grouped.show() # <-- Здесь Spark строит и выполняет весь DAG
Каждый action создаёт один Job. Job — это полный план вычислений от чтения данных до получения результата.
Stages: разбиение на shuffle boundaries
DAG Scheduler разбивает Job на stages по границам shuffle (wide dependencies). Stage — это набор задач, которые могут выполняться без перемешивания данных между executors.
Narrow vs Wide Dependencies
| Тип | Примеры | Shuffle? | Стадия |
|---|---|---|---|
| Narrow | filter(), map(), select(), union() | Нет | Остаётся в текущей stage |
| Wide | groupBy(), join(), repartition(), distinct(), sort() | Да | Создаёт новую stage |
Narrow dependency: каждая выходная партиция зависит от одной входной партиции. Данные не нужно перемещать между executors.
Wide dependency: выходная партиция зависит от нескольких входных партиций. Данные необходимо перемешать (shuffle) по сети.
Пример: JOIN создаёт параллельные stages
Рассмотрим реальный запрос с JOIN:
# Два источника данных
orders = spark.read.parquet("/data/orders/")
customers = spark.read.parquet("/data/customers/")
# Фильтрация + JOIN
result = (
orders
.filter(orders.amount > 100) # narrow (Stage 0)
.join(
customers, # Stage 1: scan customers
orders.customer_id == customers.id
) # wide: shuffle + join (Stage 2)
)
result.show()
DAG Scheduler создаёт 3 стадии:
Stage 0: Scan orders -> Filter(amount > 100) ──┐
├── shuffle ──> Stage 2: SortMergeJoin -> Result
Stage 1: Scan customers ──┘
Stage 0 и Stage 1 выполняются параллельно — они читают данные из разных источников и не зависят друг от друга. Stage 2 начинается только когда обе предыдущие стадии завершены, потому что для JOIN нужны данные с обеих сторон.
SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.amount > 100
Scan orders + Filter
Scan customers
SortMergeJoin + Result
Типы стадий
Spark различает два типа stages:
- ShuffleMapStage — промежуточная стадия, результат которой используется следующей стадией через shuffle. Stage 0 и Stage 1 в нашем примере — ShuffleMapStages.
- ResultStage — финальная стадия, результат которой отправляется driver (для
show(),collect()) или записывается в хранилище (дляwrite()). Stage 2 — ResultStage.
Просмотр стадий через explain()
result.explain(True)
Вывод explain() показывает физический план с указанием стадий:
== Physical Plan ==
*(3) SortMergeJoin [customer_id#0], [id#5], Inner
:- *(1) Filter (amount#2 > 100)
: +- *(1) ColumnarToRow
: +- FileScan parquet [customer_id#0,amount#2]
+- *(2) ColumnarToRow
+- FileScan parquet [id#5,name#6]
Числа в *() — это stage IDs, созданные whole-stage codegen. *(1) — Stage 0 (scan + filter orders), *(2) — Stage 1 (scan customers), *(3) — Stage 2 (SortMergeJoin).
Tasks: единица работы
Каждая stage разбивается на tasks — по одному task на каждую партицию данных. Если у вас 200 партиций в Stage 0, DAG Scheduler создаст 200 tasks.
TaskSet — это набор всех tasks одной стадии, передаваемый в TaskScheduler для распределения по executors.
Stage 0 (4 partitions) -> TaskSet [Task 0, Task 1, Task 2, Task 3]
Stage 1 (2 partitions) -> TaskSet [Task 0, Task 1]
Stage 2 (200 partitions) -> TaskSet [Task 0, Task 1, ..., Task 199]
Количество tasks в Stage 2 определяется параметром spark.sql.shuffle.partitions (по умолчанию 200). Это часто избыточно для небольших данных и недостаточно для огромных datasets.
Что дальше?
В следующем уроке мы разберём Task Scheduler — компонент, который принимает TaskSet от DAG Scheduler и распределяет tasks по executors с учётом data locality и scheduling policy (FIFO vs FAIR).