Distributed planning: фрагментация на stages
Оптимизированный логический план — это дерево PlanNode, описывающее операции над данными. Но Trino — распределённый движок: запрос исполняют много воркеров параллельно. Логический план об этом ничего не говорит — он единый и абстрактный. Чтобы запрос стал распределённым, план нужно разрезать на части, которые поедут на разные воркеры. Это и есть distributed planning.
Этот урок — про фрагментацию плана на stages: где проходят линии разреза, что такое фрагмент и stage, и какие бывают типы фрагментов по способу распределения данных.
Почему план нужно разрезать
Логический план — одно дерево. Но воркеров много, и они физически разные машины: данные между ними передаются по сети. Между некоторыми операциями данные текут локально, в пределах одного воркера. А между некоторыми — обязаны перейти с воркера на воркер по сети.
Возьмём GROUP BY country. Чтобы посчитать агрегат по каждой стране корректно, все строки одной страны должны оказаться на одном воркере. Но прочитаны они были на разных воркерах — таблица-то распределена. Значит, между «чтением» и «финальной агрегацией» данные обязаны перераспределиться по сети: строки каждой страны собираются на свой воркер. Это перераспределение и есть точка, где план надо разрезать.
Distributed planning разрезает дерево PlanNode именно по таким линиям — там, где между операциями необходим обмен данными по сети (он называется exchange). Получившиеся куски и есть фрагменты.
Фрагмент и stage
Фрагмент (plan fragment) — это часть распределённого плана: поддерево PlanNode, ограниченное линиями разреза. Внутри фрагмента данные текут локально, без сетевого обмена. Между фрагментами — наоборот, обмен по сети.
Stage (стадия) — это фаза распределённого плана, соответствующая фрагменту. Стадии образуют дерево, повторяющее структуру разрезанного плана: корневая стадия наверху, дочерние под ней. Корневая стадия агрегирует вывод дочерних и отдаёт результат клиенту.
И вот критически важное уточнение, которое надо зафиксировать раз и навсегда.
Stage — это КОНЦЕПТУАЛЬНАЯ единица распределённого плана. Stage сам по себе НЕ исполняется на воркере. Stage — это описание фазы: “вот такой фрагмент плана надо выполнить, распределив его по кластеру”. Тем, что реально запускается и работает на конкретном воркере, является task — runtime-исполнение стадии. Одна stage распараллеливается на множество task на разных воркерах. Не путайте описание (stage) и исполнение (task) — это разные уровни.
То есть иерархия такая: distributed planning производит дерево stages. Каждая stage — это план фазы, концепция. Когда запрос запускается на исполнение, каждая stage разворачивается в набор task — и вот task уже физически живут и работают на воркерах. Distributed planning заканчивается на уровне stages; task — это уже следующий модуль про распределённое исполнение.
Типы фрагментов: как распределяются данные
Каждый фрагмент имеет тип распределения — он определяет, как данные попадают в этот фрагмент: на скольких нодах он выполняется и как входные данные раскладываются по этим нодам. Тип фрагмента виден в выводе EXPLAIN и указан рядом с номером фрагмента.
| Тип фрагмента | Как распределяются данные |
|---|---|
| SINGLE | Фрагмент выполняется на одной ноде. Обычно финальная агрегация и отдача результата |
| HASH | Выполняется на фиксированном числе нод; входные строки распределяются по хэш-функции от ключа |
| ROUND_ROBIN | Строки раздаются нодам по кругу, поочерёдно — для равномерной загрузки |
| BROADCAST | Входные данные копируются (транслируются) целиком на все ноды фрагмента |
| SOURCE | Выполняется на нодах, где доступны входные splits — ради data locality |
Разберём логику каждого.
SOURCE — это фрагменты, которые читают данные из коннектора. Они исполняются там, где splits, чтобы по возможности читать данные локально. Это нижние фрагменты дерева — листья.
HASH — распределение по хэшу от ключа. Применяется, когда строки с одинаковым ключом обязаны попасть на одну ноду: для GROUP BY (одна группа — одна нода) и для partitioned join (одинаковый join-ключ с обеих сторон — на одной ноде). Хэш-функция от ключа детерминированно сопоставляет строку с нодой.
BROADCAST — копирование данных на все ноды. Применяется в broadcast join: маленькая таблица целиком рассылается каждой ноде, и каждая нода соединяет её со своей частью большой таблицы локально. Дёшево, когда копируемая таблица мала.
ROUND_ROBIN — поочерёдная раздача строк нодам без привязки к ключу. Применяется, когда нужно просто равномерно распределить нагрузку, а группировка по ключу не требуется.
SINGLE — фрагмент на одной ноде. Применяется там, где данные обязаны быть собраны в одном месте: финальная агрегация без GROUP BY, финальная сортировка, отдача результата. Это верхний, корневой фрагмент.
Типы фрагментов напрямую связаны с производительностью. Выбор «broadcast или hash-распределение для join» — одно из главных решений оптимизатора, и от него сильно зависит, сколько данных уйдёт по сети. Этому посвящён отдельный модуль про cost-based optimizer; здесь важно понять саму палитру типов и то, что каждый фрагмент имеет один из них.
Spark: DAG и stages как аналог фрагментации плана ClickHouse: распределённый план и pipeline операцийЧтение распределённого плана
Распределённый план показывает EXPLAIN (TYPE DISTRIBUTED). Его вывод — это список фрагментов, у каждого номер и тип:
EXPLAIN (TYPE DISTRIBUTED)
SELECT nationkey, count(*)
FROM tpch.sf1.customer
GROUP BY nationkey;
Fragment 0 [SINGLE]
Output[nationkey, _col1]
RemoteSource[1]
Fragment 1 [HASH]
Aggregate(FINAL)[nationkey]
RemoteSource[2]
Fragment 2 [SOURCE]
Aggregate(PARTIAL)[nationkey]
TableScan[tpch:customer]
Читается так. Fragment 2 [SOURCE] — нижний: читает customer и считает частичный агрегат (PARTIAL) прямо там, где прочитал, чтобы уменьшить объём данных до сетевого обмена. Fragment 1 [HASH] — данные перераспределены по хэшу nationkey, и считается финальный агрегат (FINAL) по каждой группе. Fragment 0 [SINGLE] — собирает результат на одной ноде и отдаёт клиенту. RemoteSource[N] — это и есть линия разреза: точка, где фрагмент получает данные от фрагмента N по сети.
В выводе EXPLAIN ANALYZE есть удобное правило нумерации: меньший номер фрагмента — это последний шаг исполнения (ближе к результату), больший номер — первый шаг (ближе к источнику). Поэтому Fragment 0 это всегда корень-финал, а фрагмент с наибольшим номером — самое начало, чтение таблиц.
Место distributed planning в цикле
Distributed planning — последний этап планирования. Он превращает единый логический план в дерево stages — распределённый план. Дальше начинается уже исполнение: каждая stage разворачивается в task, task работают на воркерах. Граница чёткая: планирование заканчивается на дереве stages, исполнение начинается с task — и это тема следующего модуля.
Попробуй сам
Распределённый план изучается через EXPLAIN (TYPE DISTRIBUTED):
- Выполните
EXPLAIN (TYPE DISTRIBUTED) SELECT nationkey, count(*) FROM tpch.sf1.customer GROUP BY nationkey. Выпишите все фрагменты и их типы. НайдитеSOURCE,HASH,SINGLE. - В выводе найдите
RemoteSource— это линии разреза. Сколько их? Каждая означает один сетевой обмен между фрагментами. - Выполните
EXPLAIN (TYPE DISTRIBUTED)для запроса с join двух таблиц tpch. Посмотрите, какой тип у фрагментов join —BROADCASTилиHASH. Попробуйте join маленькой и большой таблицы и наоборот. - Сформулируйте письменно, почему
Fragment 0— это всегда финал, а фрагмент с наибольшим номером — всегда чтение таблиц.