Stages и фрагменты: типы распределения
Из прошлого урока: stage — концептуальная фаза распределённого плана, которая разворачивается во множество task. Но стадии не висят в пустоте — они связаны в дерево, и у каждой есть тип распределения, определяющий, как данные в неё попадают. Этот урок разбирает структуру дерева стадий и пять типов фрагментов: SOURCE, HASH, ROUND_ROBIN, BROADCAST, SINGLE.
Понять типы фрагментов — значит понять, как именно данные движутся по кластеру и почему один и тот же запрос можно исполнить с разным объёмом сетевого обмена.
Дерево стадий
Distributed planning разрезал план на фрагменты, каждому фрагменту — стадия. Стадии образуют дерево. Корневая стадия наверху; под ней дочерние; у дочерних — свои дочерние. Лист дерева — стадия, читающая данные из источника; корень — стадия, отдающая результат клиенту.
Данные текут по дереву снизу вверх: листовые стадии читают таблицы, передают результат родителям, те — своим родителям, корневая собирает всё и отдаёт клиенту. Между соседними стадиями данные идут по сети — это exchange. Конкретное дерево стадий мы увидим ниже, на примере запроса с GROUP BY.
Каждая стадия (точнее, её фрагмент) имеет тип распределения. Тип отвечает на вопрос: как входные данные раскладываются по task этой стадии — на скольких воркерах она исполняется и по какому принципу строки попадают на конкретный воркер. Типов пять.
SOURCE: где данные
SOURCE — тип листовых фрагментов, которые читают данные из коннектора. Их главная задача — прочитать таблицу, и распределение строится вокруг data locality: фрагмент исполняется на тех нодах, где доступны входные splits.
Логика проста: если split — это файл на конкретной ноде или достижимый с конкретной ноды, то и читать его выгоднее на этой ноде, не гоняя данные по сети без нужды. SOURCE-фрагмент не получает данные от других стадий — он начинает их, читая источник. Поэтому SOURCE всегда внизу дерева.
-- В EXPLAIN листовой фрагмент, читающий tpch, имеет тип SOURCE
EXPLAIN (TYPE DISTRIBUTED) SELECT * FROM tpch.sf1.nation;
Fragment 1 [SOURCE]
TableScan[table = tpch:nation:sf1]
HASH: распределение по ключу
HASH (партиционированное распределение) — тип, при котором входные строки распределяются по нодам с помощью хэш-функции от ключа. Фрагмент исполняется на фиксированном числе нод, и для каждой строки хэш от значения ключа детерминированно определяет, на какую ноду она попадёт.
Зачем это нужно. HASH применяется там, где строки с одинаковым значением ключа обязаны оказаться на одной ноде. Два классических случая:
- GROUP BY: чтобы посчитать агрегат по группе, все строки группы должны быть вместе. Хэш от ключа группировки собирает каждую группу на свою ноду.
- Partitioned join: чтобы соединить строки по ключу, совпадающие по ключу строки обеих таблиц должны встретиться на одной ноде. Хэш от join-ключа применяется к обеим сторонам — и совпадающие строки гарантированно сходятся.
Свойство хэш-функции — детерминированность: одно и то же значение ключа всегда даёт один и тот же номер ноды. Именно это обеспечивает корректность: значение country = 'DE' со всех воркеров-источников придёт на один и тот же воркер-приёмник.
ROUND_ROBIN: равномерно по кругу
ROUND_ROBIN — тип, при котором строки раздаются нодам по кругу, поочерёдно: первая строка — ноде 1, вторая — ноде 2, и так далее, затем снова с ноды 1.
В отличие от HASH, ROUND_ROBIN не смотрит на значения строк. Ему всё равно, какой в строке ключ, — он просто равномерно раскидывает строки. Поэтому ROUND_ROBIN применяется там, где нужна именно равномерная загрузка, а группировка по ключу не требуется: например, чтобы ровно распределить строки между task следующей стадии, когда той стадии не важно, какие строки куда.
Главное отличие в одной фразе: HASH распределяет по смыслу (одинаковый ключ — на одну ноду), ROUND_ROBIN распределяет по равномерности (просто поровну, не глядя на содержимое).
BROADCAST: копия на каждую ноду
BROADCAST — тип, при котором входные данные копируются целиком на все ноды фрагмента. Не распределяются (каждой ноде своя часть), а именно дублируются: каждая нода получает полную копию.
Зачем дублировать данные. Главный случай — broadcast join. Когда соединяются большая и маленькая таблицы, маленькую можно целиком разослать каждой ноде. Тогда каждая нода держит у себя полную маленькую таблицу как хэш-таблицу и соединяет её со своей частью большой таблицы — локально, без перераспределения большой таблицы по сети.
Цена и выгода BROADCAST прямо противоположны HASH:
| HASH (partitioned join) | BROADCAST join | |
|---|---|---|
| Что движется по сети | Перераспределяются обе таблицы | Копируется только маленькая таблица |
| Память на ноде | Каждая нода держит часть build-side | Каждая нода держит ВСЮ маленькую таблицу |
| Когда выгодно | Обе таблицы большие | Одна таблица мала и влезает в память каждой ноды |
BROADCAST дёшев по сети, когда копируемая таблица мала. Но если разослать большую таблицу — каждая нода получит её целиком, и память кончится. Поэтому выбор между BROADCAST и HASH для join — одно из ключевых решений оптимизатора, и оно зависит от размеров таблиц.
Spark: shuffle и shuffle exchange между стадиями ClickHouse: GLOBAL IN и GLOBAL JOIN как аналог BROADCASTSINGLE: всё на одной ноде
SINGLE — тип, при котором фрагмент исполняется на одной-единственной ноде. Никакого распределения: вся работа этой стадии — на одном воркере.
SINGLE применяется там, где данные обязаны быть собраны в одном месте и параллелить нечего:
- Финальная агрегация без GROUP BY:
SELECT count(*) FROM ...даёт одно число — собрать частичные счётчики и сложить надо в одной точке. - Финальная сортировка всего результата (
ORDER BYбез LIMIT по партициям). - Отдача результата клиенту — корневая стадия.
SINGLE-фрагмент — это почти всегда корень дерева стадий. Он принимает данные от дочерних стадий и выдаёт финальный результат. Поскольку он на одной ноде, через него не должны проходить большие объёмы — иначе одна нода станет бутылочным горлышком. Поэтому движок старается свернуть данные (частичной агрегацией, LIMIT) ниже по дереву, до того как они дойдут до SINGLE-стадии.
Пять типов рядом
Свяжем типы с деревом стадий целиком. Внизу — SOURCE-фрагменты, читающие таблицы. Выше — промежуточные, чаще всего HASH (для агрегаций и join) или BROADCAST (для рассылки маленькой таблицы), иногда ROUND_ROBIN. Наверху — SINGLE-фрагмент, собирающий финал. Тип каждой стадии — это и есть ответ на вопрос, как данные перешли в неё от дочерних стадий.
Когда читаете распределённый план в EXPLAIN, тип в квадратных скобках рядом с номером фрагмента (Fragment 1 [SOURCE], Fragment 2 [HASH]) сразу говорит о характере сетевого обмена. SOURCE — чтение без обмена. HASH — перераспределение по ключу, обмен есть. BROADCAST — рассылка копий, обмен есть и зависит от размера таблицы. SINGLE — сужение к одной ноде. По одним типам фрагментов можно прикинуть, где запрос гоняет данные по сети.
Попробуй сам
Типы фрагментов видны в EXPLAIN (TYPE DISTRIBUTED):
- Выполните
EXPLAIN (TYPE DISTRIBUTED) SELECT count(*) FROM tpch.sf1.orders. Найдите SOURCE-фрагмент (чтение) и SINGLE-фрагмент (финальный count). Их должно быть видно по типам в скобках. - Выполните то же с
GROUP BYпо столбцу — например,GROUP BY orderstatus. Найдите HASH-фрагмент: данные перераспределены по ключу группировки. - Сделайте join маленькой таблицы (
tpch.sf1.nation, 25 строк) и большой (tpch.sf1.orders). Посмотрите тип фрагмента join — ожидайте BROADCAST для маленькой таблицы. - Сделайте join двух больших таблиц (
tpch.sf1.ordersиtpch.sf1.lineitem). Сравните тип — здесь вероятнее HASH. - Сформулируйте письменно разницу между HASH и ROUND_ROBIN и разницу между BROADCAST и HASH-join по объёму сетевого обмена.