Назначение сплитов на задачи: очереди и anti-starvation
Два предыдущих урока собрали половину картины: SplitSource лениво порождает сплиты батчами, node scheduler выбирает для каждого сплита воркер. Но «выбрал воркер» — это ещё не «driver начал читать данные». Между этими двумя событиями есть очередь. Сплит, назначенный на task, сначала попадает в список pending splits этого task’а и ждёт там, пока освободится driver, готовый его взять.
Эта очередь — не деталь реализации, а механизм управления, от которого зависят три вещи: не захлебнётся ли координатор, не будет ли один воркер простаивать пока другой завален, и не «застрянет» ли отдельная стадия запроса без сплитов, пока другая стадия забрала всю пропускную способность. Этот урок — про устройство очереди pending splits, про потолок её размера и про anti-starvation: защиту от голодания.
Жизненный путь сплита: от scheduler до driver
Зафиксируем последовательность состояний одного сплита внутри одного task’а.
Пока сплит pending, он не потребляет ни CPU, ни память на чтение данных — он лишь занимает место в очереди (несколько килобайт описания). Реальная работа начинается в состоянии running, когда сплит подхватывает driver. Число driver’ов на task ограничено (это тема следующего урока — task.concurrency), поэтому pending splits — это всегда «больше сплитов, чем сейчас могут быть в работе». Очередь нужна именно для того, чтобы у driver’ов всегда было что взять следующим, без похода к координатору за каждым сплитом по отдельности.
Зачем вообще ограничивать очередь
Соблазнительно сказать: пусть node scheduler сразу засунет в очередь воркера все сплиты, которые ему предназначены. Тогда воркер автономен, к координатору ходить не надо. Но это ломает половину того, что мы строили в прошлых уроках.
Во-первых, это убивает ленивость SplitSource. Чтобы назначить все сплиты сразу, координатор обязан их все сгенерировать — то есть полностью обойти SplitSource заранее. Возвращается eager-перечисление со всеми его минусами: память, время до первого результата.
Во-вторых, это убивает динамическую балансировку. Сплиты разного размера; коннектор отдаёт их батчами; нагрузка на воркеры выясняется только в ходе работы. Если все сплиты розданы в первую же секунду, scheduler уже ничего не может перебалансировать — раскладка зафиксирована до того, как стало понятно, кто из воркеров быстрее.
В-третьих, это ломает dynamic filtering. Динамический фильтр приходит в середине запроса; если сплиты fact-таблицы уже все назначены, отбросить ненужные на этапе enumeration невозможно.
Поэтому действует потолок на число pending splits на task. Node scheduler доливает в очередь до этого потолка и останавливается. Воркер обрабатывает, очередь пустеет, scheduler доливает снова. Очередь работает как буфер ограниченного размера между ленивым производителем (SplitSource) и потребителями (driver’ы).
Потолок очереди — это backpressure в чистом виде. Когда очереди всех task’ов source-стадии заполнены, node scheduler переходит в состояние blocked: он не запрашивает у SplitSource новый батч, пока место не освободится. Темп перечисления сплитов автоматически подстраивается под темп их обработки. Никакой ручной синхронизации не требуется — её роль играет ограниченный размер очереди.
Потолок не статичен: max-adjusted-pending-splits-per-task
Простой фиксированный потолок плох в обе стороны. Слишком маленький — driver’ы периодически простаивают, дожидаясь, пока scheduler сходит за следующей порцией; накладные расходы на раунды назначения растут. Слишком большой — теряются динамическая балансировка и эффект dynamic filtering, плюс растёт память под описания сплитов.
Поэтому в Trino потолок адаптивный. Базовое целевое число pending splits на воркер задаётся, но фактический потолок очереди каждого task’а подстраивается в зависимости от того, насколько быстро этот task разгребает свою очередь. Верхняя граница этой подстройки — свойство координатора:
# etc/config.properties на координаторе
node-scheduler.max-adjusted-pending-splits-per-task=2000
Логика подстройки: task, который быстро опустошает очередь (значит, у него быстрые сплиты или много driver’ов), получает право держать очередь длиннее — чтобы driver’ы не простаивали в ожидании дозалива. Task, который медленно разгребает, держит очередь короче — чтобы scheduler не закидывал лишнего туда, где это всё равно будет долго лежать, и сохранял свободу перебалансировать сплиты в пользу более быстрых воркеров.
Так потолок становится инструментом балансировки, а не просто предохранителем: scheduler естественным образом смещает работу на воркеров, которые справляются быстрее.
Anti-starvation: гарантированный минимум сплитов
Теперь сама тонкая часть. Запрос — это не одна source-стадия. Их может быть несколько: join двух больших таблиц — это два скана, две source-стадии, конкурирующие за одни и те же driver’ы одних и тех же воркеров. Возникает риск голодания (starvation): одна стадия успела назначить свои сплиты первой, забила очереди и потоки, и вторая стадия не получает ничего. А пока вторая стадия стоит без данных, join не может начать строить хэш-таблицу — весь запрос буксует.
Чтобы этого не случилось, node scheduler гарантирует каждой source-стадии минимальное число pending splits на воркере, даже когда общие очереди заполнены под завязку. Это «бронь»: сколько бы сплитов ни хотела залить доминирующая стадия, для каждой другой стадии резервируется неснижаемый минимум мест в очереди.
Эта же бронь служит защитой от взаимной блокировки (deadlock). Представьте: стадия-потребитель не может прогрессировать, пока не получит данные от стадии-производителя, а стадия-производитель не может получить сплиты, потому что очереди заняты стадией-потребителем. Гарантированный минимум разрывает такой цикл: производителю всегда зарезервировано место, чтобы начать читать и сдвинуть запрос с мёртвой точки.
Стоит уточнить, почему deadlock здесь — реальная угроза, а не теоретическая. Stage’и запроса связаны отношением «производитель -> потребитель»: одна стадия читает данные, другая их обрабатывает дальше. Потребитель не завершится, пока производитель не отдаст ему все данные. Если потребитель успел захватить все очереди и потоки воркеров, а производителю не осталось ни одного места под сплит, — система зависает: потребитель ждёт того, что физически некому произвести. Это не «медленно», а «навсегда». Anti-starvation именно поэтому резервирует место заранее, до распределения основной массы, а не пытается разрулить затор по факту его возникновения — разрулить заклинивший deadlock уже нечем.
Минимум сплитов на стадию — это намеренно небольшое число. Цель брони — не дать стадии много, а гарантировать, что она не получит ноль. Основной объём по-прежнему распределяется динамически, по фактической скорости воркеров. Тут видна общая инженерная идея: системе нужны и оптимизация в среднем (динамическая балансировка под скорость воркеров), и защита худшего случая (бронь против голодания). Одна динамика без брони эффективна, пока всем хватает, и ломается на краю; одна бронь без динамики безопасна, но не использует разницу в скорости нод. Node scheduler совмещает оба: бронь снимает катастрофические сценарии, динамика выжимает производительность из остального.
Что увидеть и на что смотреть
Состояние очередей видно в Web UI на странице запроса: у каждой стадии показаны queued splits (то же, что pending) и running splits. Полезные наблюдения:
- Стабильно большое число queued splits при малом running — driver’ов мало относительно потока сплитов; стоит посмотреть
task.concurrency(следующий урок). - Running splits, проседающие до нуля у одной стадии при заполненных очередях другой, — потенциальная картина голодания; здесь и работает anti-starvation, не давая просесть в ноль навсегда.
- Сильный перекос queued splits между воркерами — либо сплиты очень разного размера, либо включена
topologyи часть нод перетянула локальные данные.
В EXPLAIN ANALYZE прямого счётчика очереди нет, но разница между Scheduled time (полное время, включая ожидание) и CPU time у стадии косвенно говорит о простое: большой разрыв — стадия много ждала, в том числе в очередях.
Fragment 2 [SOURCE]
CPU: 12.4s, Scheduled: 41.7s, Input: 150000000 rows
Здесь Scheduled втрое больше CPU — стадия значительную часть времени не считала, а ждала: данных по конвейеру, освобождения driver’ов, дозалива очереди.
Попробуй сам
На песочнице курса (Trino 481):
-
Запустите join двух таблиц TPC-H:
EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf10.orders o JOIN tpch.sf10.lineitem l ON o.orderkey = l.orderkey;. Найдите два source-фрагмента (два скана). Сравните у нихScheduledиCPUtime. У какого больше разрыв и почему — свяжите с тем, что одна таблица крупнее другой. -
Откройте Web UI, повторите тот же запрос и во время его выполнения посмотрите на стадии: видны ли queued и running splits, как они меняются. Зафиксируйте, у какой стадии running splits держится выше.
-
Рассуждение: объясните своими словами, почему без anti-starvation join двух больших таблиц мог бы зависнуть навсегда, а не просто исполняться медленно. В ответе используйте слова «бронь», «производитель», «потребитель».