SplitSource: как сплиты поступают от коннектора
В прошлом уроке мы говорили: node scheduler берёт батч сплитов и раскладывает по воркерам. Но откуда этот батч берётся и почему именно батч, а не сразу весь список? Если бы Trino, начиная запрос к таблице из 200 тысяч Parquet-файлов, сначала перечислил все 200 тысяч сплитов и сложил их в память, то первый сплит начал бы исполняться только после того, как координатор обошёл всё хранилище. Для широких таблиц это десятки секунд простоя до начала работы — и мегабайты структур в heap координатора.
Trino так не делает. Между коннектором и node scheduler стоит абстракция SplitSource — ленивый, потоковый источник сплитов. Координатор запрашивает у него сплиты порциями, ровно по мере того, как готов их раздавать. Этот урок — про устройство SplitSource, про batch-генерацию и про то, что вообще считается сплитом для разных коннекторов.
Что такое split, если совсем точно
Split (сплит) — это описание куска данных, который один task может прочитать независимо. Подчеркнём: split — это не сами данные, а лёгкий объект-указатель. Он содержит ровно то, что нужно коннектору, чтобы позже физически добыть этот кусок: путь к файлу, диапазон байтов внутри файла, идентификатор партиции, иногда список предпочтительных адресов.
Что именно становится единицей сплита, решает коннектор, и это решение зависит от природы источника:
| Коннектор | Что такое один split |
|---|---|
| Hive / Iceberg / Delta поверх Parquet | Файл целиком либо диапазон байтов внутри крупного файла (по row groups) |
| Hive поверх HDFS | Часто блок HDFS — отсюда и предпочтительные адреса дата-нод |
| PostgreSQL / MySQL (JDBC) | Обычно один split на всю таблицу — JDBC-источник читается одним соединением |
| Kafka | Один split на партицию топика (или её диапазон offset’ов) |
| tpch / tpcds | Логический диапазон строк генерируемого набора |
Из этой таблицы видно две вещи. Первая: число сплитов напрямую определяет потолок параллелизма source-стадии — нельзя занять чтением больше driver’ов, чем есть сплитов. Таблица в один большой Parquet-файл, который коннектор не умеет резать, даст один сплит и будет читаться в один поток, как бы много воркеров ни стояло. Вторая: для JDBC-источника параллелизма на чтении обычно нет вовсе — отсюда правило, что федеративный join большой таблицы из PostgreSQL почти всегда упирается в это единственное соединение.
SplitManager порождает SplitSource
Когда координатор спланировал запрос и дошёл до source-стадии, он обращается к коннектору за сплитами. В Connector SPI за это отвечает компонент ConnectorSplitManager. Его метод getSplits не возвращает список сплитов — он возвращает объект SplitSource.
Это принципиальное отличие. Список — это «всё, посчитанное прямо сейчас». SplitSource — это «обещание отдавать сплиты, когда попросят». У SplitSource два ключевых метода в логике работы:
getNextBatch(maxSize)— отдать следующую порцию сплитов, не большеmaxSizeштук. Возвращает результат асинхронно (future): порция может быть ещё не готова, потому что коннектор как раз листает директорию в S3.isFinished()— сигнал, что сплиты кончились и больше batch’ей не будет.
Координатор крутит цикл: пока isFinished() ложно и есть куда раздавать — зовёт getNextBatch, получает порцию, отдаёт node scheduler’у. SplitManager при этом внутри может работать в несколько потоков, листать хранилище параллельно, применять фильтры по партициям — всё это спрятано за интерфейсом SplitSource.
Координатор SplitSource (от коннектора)
| |
|---- getNextBatch(1000) -------------->| листает S3, режет файлы
|<--- batch: 1000 splits, finished=no --|
| раздаёт node scheduler'у |
|---- getNextBatch(1000) -------------->| очереди воркеров заполнены?
| (ждёт, пока освободятся очереди) |
|---- getNextBatch(1000) -------------->|
|<--- batch: 340 splits, finished=YES --|
| source-стадия больше сплитов не ждёт |
Почему генерация ленивая и batch’ами
Ленивость даёт три выигрыша, и все три важны для «до железа» понимания.
Время до первого результата. Как только пришёл первый батч, node scheduler уже назначает сплиты, воркеры уже читают данные. Параллельно с этим коннектор продолжает листать хранилище и готовить следующие батчи. Перечисление сплитов и их обработка идут внахлёст, а не последовательно. Для запроса с LIMIT это особенно ярко: запрос может завершиться, прочитав первые сотни строк, так и не заставив коннектор перечислить остальные сплиты — они просто не понадобились.
Память координатора. Несгенерированный сплит не занимает heap. Для таблицы в сотни тысяч файлов разница между «держим список из 200 тысяч объектов» и «держим батч из тысячи плюс курсор» — это десятки или сотни мегабайт, которые иначе пришлось бы закладывать в -Xmx координатора просто под перечисление.
Backpressure. Координатор зовёт getNextBatch не на максимальной скорости, а ровно тогда, когда в очередях воркеров освободилось место. Если воркеры не успевают обрабатывать, новые батчи не запрашиваются, и коннектор не тратит ресурсы на перечисление сплитов, которые ещё некуда деть. Темп генерации сам подстраивается под темп потребления.
Где batch-генерация особенно заметна
Partition pruning. Когда таблица партиционирована, а в запросе есть WHERE по партиционирующему столбцу, SplitManager не должен порождать сплиты для отсечённых партиций. Он отфильтровывает партиции до перечисления файлов внутри них. SplitSource в этом случае физически не листает директории ненужных партиций — экономия не только памяти, но и обращений к хранилищу.
Dynamic filtering. Это тема восьмого модуля, но связь зафиксируем здесь. Динамический фильтр становится известен уже в ходе исполнения — после того, как обработана build-сторона join’а. Поскольку SplitSource ленив, к моменту, когда фильтр готов, ещё не все сплиты fact-таблицы перечислены. Координатор применяет динамический фильтр прямо при дальнейшем split enumeration и отбрасывает партиции, которые join всё равно отсеет. Будь генерация eager, все сплиты уже были бы перечислены, и эта оптимизация на этапе перечисления стала бы невозможна.
LIMIT и ранний останов. Запрос SELECT * FROM huge_table LIMIT 100 завершается, как только набраны 100 строк. SplitSource в этот момент закрывается, оставшиеся сплиты просто не порождаются. Сравните с подходом, где список сплитов посчитан целиком заранее: там вся работа по перечислению уже потрачена впустую.
Здесь стоит уточнить общую механику раннего останова. Trino — pipeline-движок: данные стримятся через операторы по мере чтения, а не материализуются целиком. Оператор LIMIT считает прошедшие строки и, как только их 100, сообщает вверх по конвейеру, что больше данных не нужно. Этот сигнал доходит до source-стадии, та закрывает SplitSource, и цепочка останавливается. Ленивость SplitSource — необходимое звено этой цепочки: если бы сплиты материализовались заранее, сигнал «хватит» пришёл бы уже после того, как координатор обошёл всё хранилище. Ленивая генерация и pipeline-исполнение работают вместе — одно без другого не дало бы мгновенного LIMIT на огромной таблице.
Размер батча, который запрашивает координатор, не фиксирован жёстко: он подстраивается под то, сколько сплитов воркеры готовы принять, и под потолок очередей. Не стоит думать о нём как о настраиваемой константе — это динамическая величина внутри цикла «запросил батч -> раздал -> запросил ещё». Важно само свойство: координатор никогда не просит у SplitSource больше, чем способен сейчас разместить.
Что увидит инженер
Число сплитов, которое в итоге сгенерировал коннектор, видно в EXPLAIN ANALYZE и в Web UI. В Web UI на странице запроса у source-стадии есть счётчик total splits. В выводе EXPLAIN ANALYZE объём входа source-фрагмента косвенно отражает, на сколько кусков коннектор порезал данные:
EXPLAIN ANALYZE
SELECT count(*) FROM tpch.sf100.lineitem;
Fragment 1 [SOURCE]
CPU: 38.40s, Scheduled: 1.62m, Input: 600037902 rows (0B)
ScanProject[table = tpch:lineitem:sf100]
Input avg.: 9375592.00 rows, Input std.dev.: 0.69%
Если число сплитов подозрительно мало — например, у большой таблицы один-два сплита, — это сигнал, что коннектор не смог распараллелить чтение: один большой нечленимый файл, либо JDBC-источник. Тогда никакое добавление воркеров скорость чтения не поднимет, и решать проблему нужно на уровне раскладки данных (больше файлов разумного размера) или на уровне коннектора.
Попробуй сам
На песочнице курса (Trino 481):
-
Выполните
EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf1.lineitem;и затем то же дляtpch.sf100.lineitem. СравнитеScheduledtime у source-фрагмента. Объясните, почему рост на два порядка по данным не даёт ровно двукратного-стократного роста времени: что в схеме «batch-генерация + параллельная обработка» сглаживает зависимость. -
Рассуждение: у вас есть таблица Iceberg, состоящая из одного Parquet-файла на 4 ГБ, и таблица из 400 файлов по 10 МБ. Суммарный объём одинаков. Какая прочитается быстрее на кластере из 4 воркеров и почему — свяжите ответ с тем, сколько сплитов породит SplitSource в каждом случае и как это ограничивает параллелизм.
-
Объясните своими словами, почему
SELECT * FROM tpch.sf100.orders LIMIT 5возвращается почти мгновенно, хотя таблица огромна. Что произошло с SplitSource в момент, когда набрались 5 строк.