Learning Platform
Глоссарий Troubleshooting
Урок 05.05 · 22 мин
Средний
query-lifecycledistributed-planstagesfragments

Distributed planning: фрагментация на stages

Оптимизированный логический план — это дерево PlanNode, описывающее операции над данными. Но Trino — распределённый движок: запрос исполняют много воркеров параллельно. Логический план об этом ничего не говорит — он единый и абстрактный. Чтобы запрос стал распределённым, план нужно разрезать на части, которые поедут на разные воркеры. Это и есть distributed planning.

Этот урок — про фрагментацию плана на stages: где проходят линии разреза, что такое фрагмент и stage, и какие бывают типы фрагментов по способу распределения данных.


Почему план нужно разрезать

Логический план — одно дерево. Но воркеров много, и они физически разные машины: данные между ними передаются по сети. Между некоторыми операциями данные текут локально, в пределах одного воркера. А между некоторыми — обязаны перейти с воркера на воркер по сети.

Возьмём GROUP BY country. Чтобы посчитать агрегат по каждой стране корректно, все строки одной страны должны оказаться на одном воркере. Но прочитаны они были на разных воркерах — таблица-то распределена. Значит, между «чтением» и «финальной агрегацией» данные обязаны перераспределиться по сети: строки каждой страны собираются на свой воркер. Это перераспределение и есть точка, где план надо разрезать.

Граница разреза — там, где данные идут по сети
Чтение и частичная агрегацияКаждый воркер читает свою часть таблицы и считает частичный агрегат локально.
данные идут по сети — здесь разрез
Финальная агрегацияСтроки перераспределены: все данные одной группы собраны на одном воркере, считается финальный агрегат.

Distributed planning разрезает дерево PlanNode именно по таким линиям — там, где между операциями необходим обмен данными по сети (он называется exchange). Получившиеся куски и есть фрагменты.


Фрагмент и stage

Фрагмент (plan fragment) — это часть распределённого плана: поддерево PlanNode, ограниченное линиями разреза. Внутри фрагмента данные текут локально, без сетевого обмена. Между фрагментами — наоборот, обмен по сети.

Stage (стадия) — это фаза распределённого плана, соответствующая фрагменту. Стадии образуют дерево, повторяющее структуру разрезанного плана: корневая стадия наверху, дочерние под ней. Корневая стадия агрегирует вывод дочерних и отдаёт результат клиенту.

И вот критически важное уточнение, которое надо зафиксировать раз и навсегда.

WARNING

Stage — это КОНЦЕПТУАЛЬНАЯ единица распределённого плана. Stage сам по себе НЕ исполняется на воркере. Stage — это описание фазы: “вот такой фрагмент плана надо выполнить, распределив его по кластеру”. Тем, что реально запускается и работает на конкретном воркере, является task — runtime-исполнение стадии. Одна stage распараллеливается на множество task на разных воркерах. Не путайте описание (stage) и исполнение (task) — это разные уровни.

То есть иерархия такая: distributed planning производит дерево stages. Каждая stage — это план фазы, концепция. Когда запрос запускается на исполнение, каждая stage разворачивается в набор task — и вот task уже физически живут и работают на воркерах. Distributed planning заканчивается на уровне stages; task — это уже следующий модуль про распределённое исполнение.

Логический план — фрагменты — stages
Логический планЕдиное дерево PlanNode — оптимизированное, но не разрезанное.
разрез по линиям обмена
ФрагментыКуски плана между линиями разреза. Внутри — локальный поток данных.
каждому фрагменту — стадия
Дерево stagesКонцептуальные фазы распределённого плана. Stage — описание, не исполнение.

Типы фрагментов: как распределяются данные

Каждый фрагмент имеет тип распределения — он определяет, как данные попадают в этот фрагмент: на скольких нодах он выполняется и как входные данные раскладываются по этим нодам. Тип фрагмента виден в выводе EXPLAIN и указан рядом с номером фрагмента.

Тип фрагментаКак распределяются данные
SINGLEФрагмент выполняется на одной ноде. Обычно финальная агрегация и отдача результата
HASHВыполняется на фиксированном числе нод; входные строки распределяются по хэш-функции от ключа
ROUND_ROBINСтроки раздаются нодам по кругу, поочерёдно — для равномерной загрузки
BROADCASTВходные данные копируются (транслируются) целиком на все ноды фрагмента
SOURCEВыполняется на нодах, где доступны входные splits — ради data locality

Разберём логику каждого.

SOURCE — это фрагменты, которые читают данные из коннектора. Они исполняются там, где splits, чтобы по возможности читать данные локально. Это нижние фрагменты дерева — листья.

HASH — распределение по хэшу от ключа. Применяется, когда строки с одинаковым ключом обязаны попасть на одну ноду: для GROUP BY (одна группа — одна нода) и для partitioned join (одинаковый join-ключ с обеих сторон — на одной ноде). Хэш-функция от ключа детерминированно сопоставляет строку с нодой.

BROADCAST — копирование данных на все ноды. Применяется в broadcast join: маленькая таблица целиком рассылается каждой ноде, и каждая нода соединяет её со своей частью большой таблицы локально. Дёшево, когда копируемая таблица мала.

ROUND_ROBIN — поочерёдная раздача строк нодам без привязки к ключу. Применяется, когда нужно просто равномерно распределить нагрузку, а группировка по ключу не требуется.

SINGLE — фрагмент на одной ноде. Применяется там, где данные обязаны быть собраны в одном месте: финальная агрегация без GROUP BY, финальная сортировка, отдача результата. Это верхний, корневой фрагмент.

Дерево фрагментов запроса с GROUP BY
Fragment 0 — SINGLEКорневой фрагмент: финальная агрегация и отдача результата клиенту, на одной ноде.
данные по сети
Fragment 1 — HASHПромежуточный фрагмент: строки распределены по хэшу ключа группировки, частичные агрегаты собираются по группам.
данные по сети
Fragment 2 — SOURCEНижний фрагмент: чтение таблицы из коннектора там, где доступны splits.

Типы фрагментов напрямую связаны с производительностью. Выбор «broadcast или hash-распределение для join» — одно из главных решений оптимизатора, и от него сильно зависит, сколько данных уйдёт по сети. Этому посвящён отдельный модуль про cost-based optimizer; здесь важно понять саму палитру типов и то, что каждый фрагмент имеет один из них.

Spark: DAG и stages как аналог фрагментации плана ClickHouse: распределённый план и pipeline операций

Чтение распределённого плана

Распределённый план показывает EXPLAIN (TYPE DISTRIBUTED). Его вывод — это список фрагментов, у каждого номер и тип:

EXPLAIN (TYPE DISTRIBUTED)
SELECT nationkey, count(*)
FROM tpch.sf1.customer
GROUP BY nationkey;
Fragment 0 [SINGLE]
    Output[nationkey, _col1]
        RemoteSource[1]

Fragment 1 [HASH]
    Aggregate(FINAL)[nationkey]
        RemoteSource[2]

Fragment 2 [SOURCE]
    Aggregate(PARTIAL)[nationkey]
        TableScan[tpch:customer]

Читается так. Fragment 2 [SOURCE] — нижний: читает customer и считает частичный агрегат (PARTIAL) прямо там, где прочитал, чтобы уменьшить объём данных до сетевого обмена. Fragment 1 [HASH] — данные перераспределены по хэшу nationkey, и считается финальный агрегат (FINAL) по каждой группе. Fragment 0 [SINGLE] — собирает результат на одной ноде и отдаёт клиенту. RemoteSource[N] — это и есть линия разреза: точка, где фрагмент получает данные от фрагмента N по сети.

NOTE

В выводе EXPLAIN ANALYZE есть удобное правило нумерации: меньший номер фрагмента — это последний шаг исполнения (ближе к результату), больший номер — первый шаг (ближе к источнику). Поэтому Fragment 0 это всегда корень-финал, а фрагмент с наибольшим номером — самое начало, чтение таблиц.


Место distributed planning в цикле

Distributed planning — последний этап планирования. Он превращает единый логический план в дерево stages — распределённый план. Дальше начинается уже исполнение: каждая stage разворачивается в task, task работают на воркерах. Граница чёткая: планирование заканчивается на дереве stages, исполнение начинается с task — и это тема следующего модуля.


Попробуй сам

Распределённый план изучается через EXPLAIN (TYPE DISTRIBUTED):

  1. Выполните EXPLAIN (TYPE DISTRIBUTED) SELECT nationkey, count(*) FROM tpch.sf1.customer GROUP BY nationkey. Выпишите все фрагменты и их типы. Найдите SOURCE, HASH, SINGLE.
  2. В выводе найдите RemoteSource — это линии разреза. Сколько их? Каждая означает один сетевой обмен между фрагментами.
  3. Выполните EXPLAIN (TYPE DISTRIBUTED) для запроса с join двух таблиц tpch. Посмотрите, какой тип у фрагментов join — BROADCAST или HASH. Попробуйте join маленькой и большой таблицы и наоборот.
  4. Сформулируйте письменно, почему Fragment 0 — это всегда финал, а фрагмент с наибольшим номером — всегда чтение таблиц.

Проверка знанийKnowledge check
Чем stage отличается от task, по каким линиям distributed planning разрезает план на фрагменты, и что означают типы фрагментов SOURCE, HASH, BROADCAST и SINGLE?
ОтветAnswer
Stage и task — это разные уровни. Stage — концептуальная единица распределённого плана, фаза, соответствующая фрагменту; stage сам по себе НЕ исполняется на воркере, это описание фазы. Task — runtime-исполнение стадии: одна stage распараллеливается на множество task на разных воркерах, и именно task физически работают на воркерах. Distributed planning заканчивается на дереве stages; task — это уже этап исполнения. Distributed planning разрезает единое дерево PlanNode по линиям, где между операциями необходим обмен данными по сети (exchange) — например, перед GROUP BY строки одной группы обязаны собраться на одном воркере, и это перераспределение и есть линия разреза. Получившиеся куски плана — фрагменты, внутри фрагмента данные текут локально. Каждый фрагмент имеет тип распределения: SOURCE выполняется на нодах, где доступны splits, ради data locality (нижние фрагменты, чтение); HASH распределяет входные строки по хэш-функции от ключа, чтобы строки с одинаковым ключом попали на одну ноду (для GROUP BY и partitioned join); BROADCAST копирует входные данные целиком на все ноды (для broadcast join маленькой таблицы); SINGLE выполняется на одной ноде, где данные обязаны быть собраны вместе (финальная агрегация, сортировка, отдача результата — корневой фрагмент).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. По каким линиям distributed planning разрезает логический план на фрагменты?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6