Learning Platform
Глоссарий Troubleshooting
Урок 07.01 · 22 мин
Средний
schedulernode-schedulerdata-localitysplits

Node scheduler: политики uniform и topology

В предыдущем модуле мы разобрали таксономию исполнения: stage делится на task’и, task обрабатывает split’ы, внутри task’а работают driver’ы. Но осталась дыра в картине. Когда коннектор сгенерировал, скажем, 4000 сплитов на чтение Parquet-файлов, кто-то должен решить: этот сплит уедет на воркер A, тот — на воркер B. От этого решения зависит, упрётся ли запрос в сеть, будет ли один воркер простаивать пока другой завален работой, и прочитает ли Trino данные с локального диска или потащит их по сети.

Этим решением занимается node scheduler — компонент координатора, который для каждого сплита source-стадии выбирает воркер-исполнитель. Это не про «равномерно раскидать» в наивном смысле: node scheduler балансирует сразу три вещи — равномерность нагрузки, data locality (близость данных) и ограничение размера очереди на воркере. Этот урок — про его внутренний механизм и про две политики выбора, uniform и topology.


Зачем вообще что-то решать

Наивный ответ — «раздать по кругу». Он плохо работает по двум причинам.

Parquet: row groups как физическая единица локальности данных

Первая — data locality. В классическом HDFS-развёртывании блок Parquet-файла физически лежит на дисках конкретных трёх дата-нод. Если Trino-воркер живёт на той же машине, что и дата-нода, чтение сплита из этого блока идёт через локальный диск — это десятки гигабайт в секунду. Если воркер на другой машине, чтение идёт через сеть HDFS — и сеть становится потолком. Раздача по кругу игнорирует, где лежат данные, и осознанно теряет производительность.

Вторая — неравномерность во времени. Сплиты не одинаковы: один Parquet-файл — 5 МБ, другой — 500 МБ. Если просто раздать поровну по числу сплитов, воркер, которому достались крупные сплиты, будет работать в разы дольше. А ещё сплиты приходят не все сразу — коннектор генерирует их батчами (об этом следующий урок). Решение нужно принимать инкрементально, по мере поступления, не зная всей картины заранее.

Что делает node scheduler
SplitSourceИсточник сплитов от коннектора. Отдаёт сплиты батчами, лениво, по мере запроса координатором
батч сплитов
Node schedulerКомпонент координатора. Для каждого сплита выбирает воркер по политике uniform или topology, учитывая загрузку очередей
назначение
ВоркерыНа выбранном воркере сплит попадает в очередь pending splits соответствующего task'а и ждёт свободного driver'а

Поэтому node scheduler — это не разовая раздача, а постоянно работающий цикл: пришёл батч сплитов — для каждого выбрали кандидатов-воркеров — среди кандидатов выбрали наименее загруженного — назначили. Повторяем, пока SplitSource не исчерпан.


Политика выбирается свойством node-scheduler.policy

Политика задаётся в config.properties координатора:

# etc/config.properties на координаторе
node-scheduler.policy=uniform

Допустимых значений два: uniform (по умолчанию) и topology. Политика определяет, как scheduler ранжирует воркеров-кандидатов для конкретного сплита. Всё остальное — ограничение размера очереди, выбор наименее загруженного — работает одинаково для обеих.

Важно понимать общий каркас. Scheduler не перебирает все воркеры кластера для каждого сплита — это дорого при сотнях нод. Вместо этого он берёт случайное подмножество кандидатов, размер которого задаётся node-scheduler.min-candidates (по умолчанию 10). Из этого подмножества выбирается воркер с наименьшим числом уже назначенных сплитов. Политика влияет на то, как формируется и упорядочивается множество кандидатов с точки зрения локальности.


Политика uniform

uniform — политика по умолчанию и в большинстве современных развёртываний единственная разумная. Её логика: распределять сплиты по воркерам равномерно, при этом, если у сплита есть предпочтительные адреса (network addresses), стараться попасть на воркер с одним из этих адресов.

Что такое предпочтительные адреса сплита. Коннектор, генерируя сплит, может приложить к нему список хостов, на которых данные сплита локальны. Hive-коннектор поверх HDFS делает именно это: для сплита-блока он знает три дата-ноды, где блок реплицирован. Коннектор поверх объектного хранилища (S3, GCS) обычно отдаёт пустой список — у объекта в S3 нет «локального» воркера, любой воркер тянет его через одинаковую по латентности сеть.

Алгоритм uniform для одного сплита:

  1. Если у сплита есть предпочтительные адреса — собрать воркеров, живущих на этих адресах, в множество кандидатов.
  2. Среди этих кандидатов выбрать наименее загруженного (минимум назначенных сплитов).
  3. Если предпочтительных адресов нет, или все локальные кандидаты уже забиты под завязку, или сплит вообще не привязан к адресам — взять случайное подмножество всех воркеров и выбрать наименее загруженного оттуда.

Ключевая идея: uniform рассматривает кластер как плоское множество воркеров. Нет понятия «стойка», «зона доступности» — есть только «воркер локален для данных» либо «не локален». Для облачных развёртываний поверх объектного хранилища это ровно то, что нужно: локальности всё равно нет, поэтому политика честно раздаёт по кругу, выравнивая загрузку.

Политика uniform: плоский кластер
Сплит с адресами [w1, w3]Коннектор приложил к сплиту список хостов, где данные локальны — например блок HDFS на дата-нодах
есть локальные кандидаты?
Да: w1 или w3Из локальных воркеров выбирается тот, у кого меньше назначенных сплитов — это и data locality, и баланс
Нет: любой воркерСлучайное подмножество всех воркеров, выбор наименее загруженного. Так работает облако поверх S3, где локальности нет

Политика topology

topology нужна, когда кластер физически не плоский и сеть между группами нод неоднородна. Классический пример — большой on-premise кластер из нескольких серверных стоек (racks): сеть внутри стойки быстрая, между стойками — общий аплинк, который легко становится бутылочным горлышком. В облаке аналог — несколько availability zones: трафик внутри зоны бесплатен и быстр, между зонами — платный и с большей латентностью.

topology вводит понятие сетевого расстояния. Каждому воркеру и каждому адресу данных приписывается позиция в сетевой топологии — например region/zone/host. Расстояние между воркером и сплитом считается по тому, на каком уровне топологии они расходятся: тот же host — расстояние 0, та же zone, но другой host — 1, тот же region, но другая zone — 2, и так далее. Scheduler предпочитает воркеров, минимально удалённых от данных сплита.

Топология описывается через провайдер, задаваемый node-scheduler.network-topology.type. Базовые варианты:

ТипОткуда берётся топология
flatВсе ноды на расстоянии 0 — по сути отключает учёт топологии
fileТопология читается из файла соответствия адрес -> сегменты сети
subnetСегменты топологии выводятся из IP-подсетей нод

Кроме того, topology ограничивает «жадность» локального выбора через настройку node-scheduler.network-topology.segments и связанные пороги: чтобы не было ситуации, когда все сплиты слетелись на одну удачно расположенную ноду и она захлебнулась, пока соседи простаивают. Локальность хороша до тех пор, пока не ломает баланс.

WARNING

topology имеет смысл только при двух условиях одновременно: (1) данные действительно локальны для части нод — то есть это co-located развёртывание Trino и хранилища, типично HDFS; (2) сеть между группами нод заметно медленнее, чем внутри группы. Для типичного облачного Trino поверх S3 ни одно из условий не выполняется: данные в S3 не локальны ни для кого, и topology не даст ничего, кроме лишней сложности. Оставляйте uniform.


Загрузка очередей: scheduler не льёт бесконечно

Независимо от политики, node scheduler не назначает воркеру сплитов больше, чем тот способен переварить. У каждого task’а есть очередь pending splits — назначенные, но ещё не начатые сплиты. Если бы scheduler лил в неё без ограничений, он бы материализовал весь SplitSource в память координатора и воркеров, потеряв смысл ленивой генерации.

Поэтому действует потолок на число pending splits на task. Когда очереди заполнены, scheduler приостанавливается («blocked») и ждёт, пока воркеры разгребут текущее, прежде чем запросить и назначить следующий батч. Это backpressure: темп генерации сплитов подстраивается под темп их обработки. Детали очередей, anti-starvation и динамической подстройки потолка — тема третьего урока модуля; здесь важно зафиксировать, что выбор воркера всегда идёт с оглядкой на «а влезет ли ещё».


Как увидеть результат назначения

Прямой настройки «покажи мне раскладку сплитов по воркерам» в SQL нет, но косвенно картину видно в EXPLAIN ANALYZE и в Web UI. В выводе EXPLAIN ANALYZE у source-стадии видно число входных сплитов и распределение строк по task’ам:

EXPLAIN ANALYZE
SELECT count(*) FROM tpch.sf10.lineitem WHERE shipdate > DATE '1995-01-01';
Fragment 1 [SOURCE]
    CPU: 4.21s, Scheduled: 9.88s, Input: 59986052 rows (0B); per task: avg.: 14996513.00 std.dev.: 51277.34
    ScanFilterProject[table = tpch:lineitem:sf10, ...]
        Input avg.: 14996513.00 rows, Input std.dev.: 0.34%

Строка per task с std.dev. — это и есть индикатор равномерности. Низкое стандартное отклонение (здесь доли процента) означает, что node scheduler разложил сплиты по task’ам ровно. Большое отклонение — сигнал перекоса: либо сплиты сильно разного размера, либо у вас включена topology и часть нод перетянула на себя локальные данные.


Попробуй сам

На песочнице курса (Trino 481 в Docker) выполните три шага и сравните.

  1. Запустите EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf10.orders; и найдите в выводе Fragment со словом SOURCE. Запишите значение Input std.dev. для скана.

  2. Посмотрите текущую политику: SHOW SESSION; не покажет её (это server-config, не session), поэтому проверьте напрямую через системную таблицу: SELECT * FROM system.runtime.nodes; — убедитесь, сколько воркеров в кластере и активны ли они.

  3. Рассуждение без запуска: ваш кластер — это Trino поверх MinIO (S3-совместимое хранилище), три воркера в одном Docker-network. Какую политику node scheduler вы оставите и почему? Что изменилось бы, будь это co-located кластер из трёх стоек с HDFS? Сформулируйте ответ в двух абзацах: первый — про локальность данных, второй — про однородность сети.


Проверка знанийKnowledge check
Кластер Trino развёрнут в облаке: воркеры читают данные из объектного хранилища S3, вся сеть внутри одного региона. Какую политику node scheduler стоит выбрать и почему смена на topology тут ничего не даст?
ОтветAnswer
Стоит оставить uniform — политику по умолчанию. Topology имеет смысл только когда выполнены два условия: данные физически локальны для части воркеров и сеть между группами нод заметно медленнее, чем внутри группы. В развёртывании поверх S3 не выполнено первое условие: объект в S3 не локален ни для одного воркера, любой воркер тянет данные через сеть с примерно одинаковой латентностью — коннектор отдаёт сплиты с пустым списком предпочтительных адресов. Не выполнено и второе: сеть внутри одного региона достаточно однородна. Поэтому topology не получит данных о локальности, по которым могла бы оптимизировать, и лишь добавит конфигурационной сложности и риск перекоса нагрузки. Uniform в этом случае честно раздаёт сплиты по кругу, выравнивая загрузку очередей воркеров, — это ровно то поведение, которое нужно.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какую задачу решает node scheduler в Trino?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5