Node scheduler: политики uniform и topology
В предыдущем модуле мы разобрали таксономию исполнения: stage делится на task’и, task обрабатывает split’ы, внутри task’а работают driver’ы. Но осталась дыра в картине. Когда коннектор сгенерировал, скажем, 4000 сплитов на чтение Parquet-файлов, кто-то должен решить: этот сплит уедет на воркер A, тот — на воркер B. От этого решения зависит, упрётся ли запрос в сеть, будет ли один воркер простаивать пока другой завален работой, и прочитает ли Trino данные с локального диска или потащит их по сети.
Этим решением занимается node scheduler — компонент координатора, который для каждого сплита source-стадии выбирает воркер-исполнитель. Это не про «равномерно раскидать» в наивном смысле: node scheduler балансирует сразу три вещи — равномерность нагрузки, data locality (близость данных) и ограничение размера очереди на воркере. Этот урок — про его внутренний механизм и про две политики выбора, uniform и topology.
Зачем вообще что-то решать
Наивный ответ — «раздать по кругу». Он плохо работает по двум причинам.
Parquet: row groups как физическая единица локальности данныхПервая — data locality. В классическом HDFS-развёртывании блок Parquet-файла физически лежит на дисках конкретных трёх дата-нод. Если Trino-воркер живёт на той же машине, что и дата-нода, чтение сплита из этого блока идёт через локальный диск — это десятки гигабайт в секунду. Если воркер на другой машине, чтение идёт через сеть HDFS — и сеть становится потолком. Раздача по кругу игнорирует, где лежат данные, и осознанно теряет производительность.
Вторая — неравномерность во времени. Сплиты не одинаковы: один Parquet-файл — 5 МБ, другой — 500 МБ. Если просто раздать поровну по числу сплитов, воркер, которому достались крупные сплиты, будет работать в разы дольше. А ещё сплиты приходят не все сразу — коннектор генерирует их батчами (об этом следующий урок). Решение нужно принимать инкрементально, по мере поступления, не зная всей картины заранее.
Поэтому node scheduler — это не разовая раздача, а постоянно работающий цикл: пришёл батч сплитов — для каждого выбрали кандидатов-воркеров — среди кандидатов выбрали наименее загруженного — назначили. Повторяем, пока SplitSource не исчерпан.
Политика выбирается свойством node-scheduler.policy
Политика задаётся в config.properties координатора:
# etc/config.properties на координаторе
node-scheduler.policy=uniform
Допустимых значений два: uniform (по умолчанию) и topology. Политика определяет, как scheduler ранжирует воркеров-кандидатов для конкретного сплита. Всё остальное — ограничение размера очереди, выбор наименее загруженного — работает одинаково для обеих.
Важно понимать общий каркас. Scheduler не перебирает все воркеры кластера для каждого сплита — это дорого при сотнях нод. Вместо этого он берёт случайное подмножество кандидатов, размер которого задаётся node-scheduler.min-candidates (по умолчанию 10). Из этого подмножества выбирается воркер с наименьшим числом уже назначенных сплитов. Политика влияет на то, как формируется и упорядочивается множество кандидатов с точки зрения локальности.
Политика uniform
uniform — политика по умолчанию и в большинстве современных развёртываний единственная разумная. Её логика: распределять сплиты по воркерам равномерно, при этом, если у сплита есть предпочтительные адреса (network addresses), стараться попасть на воркер с одним из этих адресов.
Что такое предпочтительные адреса сплита. Коннектор, генерируя сплит, может приложить к нему список хостов, на которых данные сплита локальны. Hive-коннектор поверх HDFS делает именно это: для сплита-блока он знает три дата-ноды, где блок реплицирован. Коннектор поверх объектного хранилища (S3, GCS) обычно отдаёт пустой список — у объекта в S3 нет «локального» воркера, любой воркер тянет его через одинаковую по латентности сеть.
Алгоритм uniform для одного сплита:
- Если у сплита есть предпочтительные адреса — собрать воркеров, живущих на этих адресах, в множество кандидатов.
- Среди этих кандидатов выбрать наименее загруженного (минимум назначенных сплитов).
- Если предпочтительных адресов нет, или все локальные кандидаты уже забиты под завязку, или сплит вообще не привязан к адресам — взять случайное подмножество всех воркеров и выбрать наименее загруженного оттуда.
Ключевая идея: uniform рассматривает кластер как плоское множество воркеров. Нет понятия «стойка», «зона доступности» — есть только «воркер локален для данных» либо «не локален». Для облачных развёртываний поверх объектного хранилища это ровно то, что нужно: локальности всё равно нет, поэтому политика честно раздаёт по кругу, выравнивая загрузку.
Политика topology
topology нужна, когда кластер физически не плоский и сеть между группами нод неоднородна. Классический пример — большой on-premise кластер из нескольких серверных стоек (racks): сеть внутри стойки быстрая, между стойками — общий аплинк, который легко становится бутылочным горлышком. В облаке аналог — несколько availability zones: трафик внутри зоны бесплатен и быстр, между зонами — платный и с большей латентностью.
topology вводит понятие сетевого расстояния. Каждому воркеру и каждому адресу данных приписывается позиция в сетевой топологии — например region/zone/host. Расстояние между воркером и сплитом считается по тому, на каком уровне топологии они расходятся: тот же host — расстояние 0, та же zone, но другой host — 1, тот же region, но другая zone — 2, и так далее. Scheduler предпочитает воркеров, минимально удалённых от данных сплита.
Топология описывается через провайдер, задаваемый node-scheduler.network-topology.type. Базовые варианты:
| Тип | Откуда берётся топология |
|---|---|
flat | Все ноды на расстоянии 0 — по сути отключает учёт топологии |
file | Топология читается из файла соответствия адрес -> сегменты сети |
subnet | Сегменты топологии выводятся из IP-подсетей нод |
Кроме того, topology ограничивает «жадность» локального выбора через настройку node-scheduler.network-topology.segments и связанные пороги: чтобы не было ситуации, когда все сплиты слетелись на одну удачно расположенную ноду и она захлебнулась, пока соседи простаивают. Локальность хороша до тех пор, пока не ломает баланс.
topology имеет смысл только при двух условиях одновременно: (1) данные действительно локальны для части нод — то есть это co-located развёртывание Trino и хранилища, типично HDFS; (2) сеть между группами нод заметно медленнее, чем внутри группы. Для типичного облачного Trino поверх S3 ни одно из условий не выполняется: данные в S3 не локальны ни для кого, и topology не даст ничего, кроме лишней сложности. Оставляйте uniform.
Загрузка очередей: scheduler не льёт бесконечно
Независимо от политики, node scheduler не назначает воркеру сплитов больше, чем тот способен переварить. У каждого task’а есть очередь pending splits — назначенные, но ещё не начатые сплиты. Если бы scheduler лил в неё без ограничений, он бы материализовал весь SplitSource в память координатора и воркеров, потеряв смысл ленивой генерации.
Поэтому действует потолок на число pending splits на task. Когда очереди заполнены, scheduler приостанавливается («blocked») и ждёт, пока воркеры разгребут текущее, прежде чем запросить и назначить следующий батч. Это backpressure: темп генерации сплитов подстраивается под темп их обработки. Детали очередей, anti-starvation и динамической подстройки потолка — тема третьего урока модуля; здесь важно зафиксировать, что выбор воркера всегда идёт с оглядкой на «а влезет ли ещё».
Как увидеть результат назначения
Прямой настройки «покажи мне раскладку сплитов по воркерам» в SQL нет, но косвенно картину видно в EXPLAIN ANALYZE и в Web UI. В выводе EXPLAIN ANALYZE у source-стадии видно число входных сплитов и распределение строк по task’ам:
EXPLAIN ANALYZE
SELECT count(*) FROM tpch.sf10.lineitem WHERE shipdate > DATE '1995-01-01';
Fragment 1 [SOURCE]
CPU: 4.21s, Scheduled: 9.88s, Input: 59986052 rows (0B); per task: avg.: 14996513.00 std.dev.: 51277.34
ScanFilterProject[table = tpch:lineitem:sf10, ...]
Input avg.: 14996513.00 rows, Input std.dev.: 0.34%
Строка per task с std.dev. — это и есть индикатор равномерности. Низкое стандартное отклонение (здесь доли процента) означает, что node scheduler разложил сплиты по task’ам ровно. Большое отклонение — сигнал перекоса: либо сплиты сильно разного размера, либо у вас включена topology и часть нод перетянула на себя локальные данные.
Попробуй сам
На песочнице курса (Trino 481 в Docker) выполните три шага и сравните.
-
Запустите
EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf10.orders;и найдите в выводеFragmentсо словомSOURCE. Запишите значениеInput std.dev.для скана. -
Посмотрите текущую политику:
SHOW SESSION;не покажет её (это server-config, не session), поэтому проверьте напрямую через системную таблицу:SELECT * FROM system.runtime.nodes;— убедитесь, сколько воркеров в кластере и активны ли они. -
Рассуждение без запуска: ваш кластер — это Trino поверх MinIO (S3-совместимое хранилище), три воркера в одном Docker-network. Какую политику node scheduler вы оставите и почему? Что изменилось бы, будь это co-located кластер из трёх стоек с HDFS? Сформулируйте ответ в двух абзацах: первый — про локальность данных, второй — про однородность сети.