Learning Platform
Глоссарий Troubleshooting
Урок 07.03 · 22 мин
Средний
schedulerpending-splitsanti-starvationqueue

Назначение сплитов на задачи: очереди и anti-starvation

Два предыдущих урока собрали половину картины: SplitSource лениво порождает сплиты батчами, node scheduler выбирает для каждого сплита воркер. Но «выбрал воркер» — это ещё не «driver начал читать данные». Между этими двумя событиями есть очередь. Сплит, назначенный на task, сначала попадает в список pending splits этого task’а и ждёт там, пока освободится driver, готовый его взять.

Эта очередь — не деталь реализации, а механизм управления, от которого зависят три вещи: не захлебнётся ли координатор, не будет ли один воркер простаивать пока другой завален, и не «застрянет» ли отдельная стадия запроса без сплитов, пока другая стадия забрала всю пропускную способность. Этот урок — про устройство очереди pending splits, про потолок её размера и про anti-starvation: защиту от голодания.


Жизненный путь сплита: от scheduler до driver

Зафиксируем последовательность состояний одного сплита внутри одного task’а.

Состояния сплита внутри task
НазначенNode scheduler выбрал для сплита этот воркер и положил сплит в очередь pending splits задачи
driver свободен
В работеСвободный driver взял сплит из очереди, коннектор открыл PageSource, идёт чтение Page за Page
данные прочитаны
ЗавершёнВсе строки сплита прочитаны и переданы дальше по конвейеру операторов, driver свободен для следующего

Пока сплит pending, он не потребляет ни CPU, ни память на чтение данных — он лишь занимает место в очереди (несколько килобайт описания). Реальная работа начинается в состоянии running, когда сплит подхватывает driver. Число driver’ов на task ограничено (это тема следующего урока — task.concurrency), поэтому pending splits — это всегда «больше сплитов, чем сейчас могут быть в работе». Очередь нужна именно для того, чтобы у driver’ов всегда было что взять следующим, без похода к координатору за каждым сплитом по отдельности.


Зачем вообще ограничивать очередь

Соблазнительно сказать: пусть node scheduler сразу засунет в очередь воркера все сплиты, которые ему предназначены. Тогда воркер автономен, к координатору ходить не надо. Но это ломает половину того, что мы строили в прошлых уроках.

Во-первых, это убивает ленивость SplitSource. Чтобы назначить все сплиты сразу, координатор обязан их все сгенерировать — то есть полностью обойти SplitSource заранее. Возвращается eager-перечисление со всеми его минусами: память, время до первого результата.

Во-вторых, это убивает динамическую балансировку. Сплиты разного размера; коннектор отдаёт их батчами; нагрузка на воркеры выясняется только в ходе работы. Если все сплиты розданы в первую же секунду, scheduler уже ничего не может перебалансировать — раскладка зафиксирована до того, как стало понятно, кто из воркеров быстрее.

В-третьих, это ломает dynamic filtering. Динамический фильтр приходит в середине запроса; если сплиты fact-таблицы уже все назначены, отбросить ненужные на этапе enumeration невозможно.

Поэтому действует потолок на число pending splits на task. Node scheduler доливает в очередь до этого потолка и останавливается. Воркер обрабатывает, очередь пустеет, scheduler доливает снова. Очередь работает как буфер ограниченного размера между ленивым производителем (SplitSource) и потребителями (driver’ы).

NOTE

Потолок очереди — это backpressure в чистом виде. Когда очереди всех task’ов source-стадии заполнены, node scheduler переходит в состояние blocked: он не запрашивает у SplitSource новый батч, пока место не освободится. Темп перечисления сплитов автоматически подстраивается под темп их обработки. Никакой ручной синхронизации не требуется — её роль играет ограниченный размер очереди.


Потолок не статичен: max-adjusted-pending-splits-per-task

Простой фиксированный потолок плох в обе стороны. Слишком маленький — driver’ы периодически простаивают, дожидаясь, пока scheduler сходит за следующей порцией; накладные расходы на раунды назначения растут. Слишком большой — теряются динамическая балансировка и эффект dynamic filtering, плюс растёт память под описания сплитов.

Поэтому в Trino потолок адаптивный. Базовое целевое число pending splits на воркер задаётся, но фактический потолок очереди каждого task’а подстраивается в зависимости от того, насколько быстро этот task разгребает свою очередь. Верхняя граница этой подстройки — свойство координатора:

# etc/config.properties на координаторе
node-scheduler.max-adjusted-pending-splits-per-task=2000

Логика подстройки: task, который быстро опустошает очередь (значит, у него быстрые сплиты или много driver’ов), получает право держать очередь длиннее — чтобы driver’ы не простаивали в ожидании дозалива. Task, который медленно разгребает, держит очередь короче — чтобы scheduler не закидывал лишнего туда, где это всё равно будет долго лежать, и сохранял свободу перебалансировать сплиты в пользу более быстрых воркеров.

Так потолок становится инструментом балансировки, а не просто предохранителем: scheduler естественным образом смещает работу на воркеров, которые справляются быстрее.


Anti-starvation: гарантированный минимум сплитов

Теперь сама тонкая часть. Запрос — это не одна source-стадия. Их может быть несколько: join двух больших таблиц — это два скана, две source-стадии, конкурирующие за одни и те же driver’ы одних и тех же воркеров. Возникает риск голодания (starvation): одна стадия успела назначить свои сплиты первой, забила очереди и потоки, и вторая стадия не получает ничего. А пока вторая стадия стоит без данных, join не может начать строить хэш-таблицу — весь запрос буксует.

Чтобы этого не случилось, node scheduler гарантирует каждой source-стадии минимальное число pending splits на воркере, даже когда общие очереди заполнены под завязку. Это «бронь»: сколько бы сплитов ни хотела залить доминирующая стадия, для каждой другой стадии резервируется неснижаемый минимум мест в очереди.

Anti-starvation: бронь очереди для каждой стадии
Очередь pending splits на воркереОбщая ёмкость очереди task'ов воркера ограничена адаптивным потолком
делится между стадиями
Стадия A: скан большой таблицыДоминирующая стадия — много сплитов, готова занять всю очередь
Стадия B: скан второй таблицыAnti-starvation резервирует для неё гарантированный минимум pending splits, даже если очередь забита стадией A

Эта же бронь служит защитой от взаимной блокировки (deadlock). Представьте: стадия-потребитель не может прогрессировать, пока не получит данные от стадии-производителя, а стадия-производитель не может получить сплиты, потому что очереди заняты стадией-потребителем. Гарантированный минимум разрывает такой цикл: производителю всегда зарезервировано место, чтобы начать читать и сдвинуть запрос с мёртвой точки.

Стоит уточнить, почему deadlock здесь — реальная угроза, а не теоретическая. Stage’и запроса связаны отношением «производитель -> потребитель»: одна стадия читает данные, другая их обрабатывает дальше. Потребитель не завершится, пока производитель не отдаст ему все данные. Если потребитель успел захватить все очереди и потоки воркеров, а производителю не осталось ни одного места под сплит, — система зависает: потребитель ждёт того, что физически некому произвести. Это не «медленно», а «навсегда». Anti-starvation именно поэтому резервирует место заранее, до распределения основной массы, а не пытается разрулить затор по факту его возникновения — разрулить заклинивший deadlock уже нечем.

Минимум сплитов на стадию — это намеренно небольшое число. Цель брони — не дать стадии много, а гарантировать, что она не получит ноль. Основной объём по-прежнему распределяется динамически, по фактической скорости воркеров. Тут видна общая инженерная идея: системе нужны и оптимизация в среднем (динамическая балансировка под скорость воркеров), и защита худшего случая (бронь против голодания). Одна динамика без брони эффективна, пока всем хватает, и ломается на краю; одна бронь без динамики безопасна, но не использует разницу в скорости нод. Node scheduler совмещает оба: бронь снимает катастрофические сценарии, динамика выжимает производительность из остального.


Что увидеть и на что смотреть

Состояние очередей видно в Web UI на странице запроса: у каждой стадии показаны queued splits (то же, что pending) и running splits. Полезные наблюдения:

  • Стабильно большое число queued splits при малом running — driver’ов мало относительно потока сплитов; стоит посмотреть task.concurrency (следующий урок).
  • Running splits, проседающие до нуля у одной стадии при заполненных очередях другой, — потенциальная картина голодания; здесь и работает anti-starvation, не давая просесть в ноль навсегда.
  • Сильный перекос queued splits между воркерами — либо сплиты очень разного размера, либо включена topology и часть нод перетянула локальные данные.

В EXPLAIN ANALYZE прямого счётчика очереди нет, но разница между Scheduled time (полное время, включая ожидание) и CPU time у стадии косвенно говорит о простое: большой разрыв — стадия много ждала, в том числе в очередях.

Fragment 2 [SOURCE]
    CPU: 12.4s, Scheduled: 41.7s, Input: 150000000 rows

Здесь Scheduled втрое больше CPU — стадия значительную часть времени не считала, а ждала: данных по конвейеру, освобождения driver’ов, дозалива очереди.


Попробуй сам

На песочнице курса (Trino 481):

  1. Запустите join двух таблиц TPC-H: EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf10.orders o JOIN tpch.sf10.lineitem l ON o.orderkey = l.orderkey;. Найдите два source-фрагмента (два скана). Сравните у них Scheduled и CPU time. У какого больше разрыв и почему — свяжите с тем, что одна таблица крупнее другой.

  2. Откройте Web UI, повторите тот же запрос и во время его выполнения посмотрите на стадии: видны ли queued и running splits, как они меняются. Зафиксируйте, у какой стадии running splits держится выше.

  3. Рассуждение: объясните своими словами, почему без anti-starvation join двух больших таблиц мог бы зависнуть навсегда, а не просто исполняться медленно. В ответе используйте слова «бронь», «производитель», «потребитель».


Проверка знанийKnowledge check
Зачем у каждого task ограничен размер очереди pending splits, почему этот потолок сделан адаптивным, и какую проблему решает anti-starvation при запросе с несколькими source-стадиями?
ОтветAnswer
Очередь pending splits ограничена потолком, потому что заливать в неё сразу все сплиты воркера означало бы отказаться от ленивости SplitSource (пришлось бы перечислить все сплиты заранее), от динамической балансировки (раскладка зафиксировалась бы до того, как стало ясно, кто из воркеров быстрее) и от dynamic filtering (отбросить ненужные сплиты в середине запроса было бы уже нельзя). Потолок работает как backpressure: когда очереди заполнены, scheduler не запрашивает у SplitSource новый батч. Адаптивным потолок сделан, чтобы балансировать две крайности: слишком короткая очередь оставляет driver'ы простаивать в ожидании дозалива, слишком длинная убивает балансировку и раздувает память. Task, быстро опустошающий очередь, получает право держать её длиннее; медленный — короче, и scheduler смещает работу на быстрых воркеров. Anti-starvation решает проблему голодания: когда у запроса несколько source-стадий, конкурирующих за одни driver'ы, одна стадия может забить все очереди и оставить другую без сплитов. Scheduler резервирует каждой стадии гарантированный минимум pending splits, даже при заполненных очередях. Это же не даёт запросу зайти в deadlock, когда стадия-потребитель ждёт данные от стадии-производителя, а та не может получить сплиты — броня всегда оставляет производителю место, чтобы начать и сдвинуть запрос с мёртвой точки.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое pending splits в контексте одного task'а?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5