Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 22 мин
Средний
splitssplitsourceconnectorlazy-generation

SplitSource: как сплиты поступают от коннектора

В прошлом уроке мы говорили: node scheduler берёт батч сплитов и раскладывает по воркерам. Но откуда этот батч берётся и почему именно батч, а не сразу весь список? Если бы Trino, начиная запрос к таблице из 200 тысяч Parquet-файлов, сначала перечислил все 200 тысяч сплитов и сложил их в память, то первый сплит начал бы исполняться только после того, как координатор обошёл всё хранилище. Для широких таблиц это десятки секунд простоя до начала работы — и мегабайты структур в heap координатора.

Trino так не делает. Между коннектором и node scheduler стоит абстракция SplitSource — ленивый, потоковый источник сплитов. Координатор запрашивает у него сплиты порциями, ровно по мере того, как готов их раздавать. Этот урок — про устройство SplitSource, про batch-генерацию и про то, что вообще считается сплитом для разных коннекторов.


Что такое split, если совсем точно

Split (сплит) — это описание куска данных, который один task может прочитать независимо. Подчеркнём: split — это не сами данные, а лёгкий объект-указатель. Он содержит ровно то, что нужно коннектору, чтобы позже физически добыть этот кусок: путь к файлу, диапазон байтов внутри файла, идентификатор партиции, иногда список предпочтительных адресов.

Что именно становится единицей сплита, решает коннектор, и это решение зависит от природы источника:

КоннекторЧто такое один split
Hive / Iceberg / Delta поверх ParquetФайл целиком либо диапазон байтов внутри крупного файла (по row groups)
Hive поверх HDFSЧасто блок HDFS — отсюда и предпочтительные адреса дата-нод
PostgreSQL / MySQL (JDBC)Обычно один split на всю таблицу — JDBC-источник читается одним соединением
KafkaОдин split на партицию топика (или её диапазон offset’ов)
tpch / tpcdsЛогический диапазон строк генерируемого набора
Parquet: row groups и почему один файл часто дает один сплит Kafka: партиция топика как естественная граница сплита

Из этой таблицы видно две вещи. Первая: число сплитов напрямую определяет потолок параллелизма source-стадии — нельзя занять чтением больше driver’ов, чем есть сплитов. Таблица в один большой Parquet-файл, который коннектор не умеет резать, даст один сплит и будет читаться в один поток, как бы много воркеров ни стояло. Вторая: для JDBC-источника параллелизма на чтении обычно нет вовсе — отсюда правило, что федеративный join большой таблицы из PostgreSQL почти всегда упирается в это единственное соединение.

Split — это указатель, а не данные
Файл данныхФизический Parquet-файл в объектном хранилище, например s3://lake/orders/part-0007.parquet
коннектор описывает
SplitЛёгкий объект: путь к файлу, диапазон байтов, partition id. Несколько килобайт, не сами данные
позже: PageSource
ЧтениеТолько когда driver берёт сплит в работу, коннектор открывает PageSource и физически читает байты

SplitManager порождает SplitSource

Когда координатор спланировал запрос и дошёл до source-стадии, он обращается к коннектору за сплитами. В Connector SPI за это отвечает компонент ConnectorSplitManager. Его метод getSplits не возвращает список сплитов — он возвращает объект SplitSource.

Это принципиальное отличие. Список — это «всё, посчитанное прямо сейчас». SplitSource — это «обещание отдавать сплиты, когда попросят». У SplitSource два ключевых метода в логике работы:

  • getNextBatch(maxSize) — отдать следующую порцию сплитов, не больше maxSize штук. Возвращает результат асинхронно (future): порция может быть ещё не готова, потому что коннектор как раз листает директорию в S3.
  • isFinished() — сигнал, что сплиты кончились и больше batch’ей не будет.

Координатор крутит цикл: пока isFinished() ложно и есть куда раздавать — зовёт getNextBatch, получает порцию, отдаёт node scheduler’у. SplitManager при этом внутри может работать в несколько потоков, листать хранилище параллельно, применять фильтры по партициям — всё это спрятано за интерфейсом SplitSource.

Координатор                          SplitSource (от коннектора)
   |                                       |
   |---- getNextBatch(1000) -------------->|  листает S3, режет файлы
   |<--- batch: 1000 splits, finished=no --|
   |  раздаёт node scheduler'у             |
   |---- getNextBatch(1000) -------------->|  очереди воркеров заполнены?
   |  (ждёт, пока освободятся очереди)      |
   |---- getNextBatch(1000) -------------->|
   |<--- batch: 340 splits, finished=YES --|
   |  source-стадия больше сплитов не ждёт |

Почему генерация ленивая и batch’ами

Ленивость даёт три выигрыша, и все три важны для «до железа» понимания.

Время до первого результата. Как только пришёл первый батч, node scheduler уже назначает сплиты, воркеры уже читают данные. Параллельно с этим коннектор продолжает листать хранилище и готовить следующие батчи. Перечисление сплитов и их обработка идут внахлёст, а не последовательно. Для запроса с LIMIT это особенно ярко: запрос может завершиться, прочитав первые сотни строк, так и не заставив коннектор перечислить остальные сплиты — они просто не понадобились.

Память координатора. Несгенерированный сплит не занимает heap. Для таблицы в сотни тысяч файлов разница между «держим список из 200 тысяч объектов» и «держим батч из тысячи плюс курсор» — это десятки или сотни мегабайт, которые иначе пришлось бы закладывать в -Xmx координатора просто под перечисление.

Backpressure. Координатор зовёт getNextBatch не на максимальной скорости, а ровно тогда, когда в очередях воркеров освободилось место. Если воркеры не успевают обрабатывать, новые батчи не запрашиваются, и коннектор не тратит ресурсы на перечисление сплитов, которые ещё некуда деть. Темп генерации сам подстраивается под темп потребления.

Eager против lazy: перечисление сплитов
Eager (как НЕ работает Trino)Гипотетический наивный подход: сначала перечислить все сплиты до единого, потом начать обработку
Trino выбрал другое
Lazy batch (как работает Trino)Перечисление сплитов и их обработка идут параллельно, внахлёст. Память ограничена размером батча и курсором

Где batch-генерация особенно заметна

Partition pruning. Когда таблица партиционирована, а в запросе есть WHERE по партиционирующему столбцу, SplitManager не должен порождать сплиты для отсечённых партиций. Он отфильтровывает партиции до перечисления файлов внутри них. SplitSource в этом случае физически не листает директории ненужных партиций — экономия не только памяти, но и обращений к хранилищу.

Dynamic filtering. Это тема восьмого модуля, но связь зафиксируем здесь. Динамический фильтр становится известен уже в ходе исполнения — после того, как обработана build-сторона join’а. Поскольку SplitSource ленив, к моменту, когда фильтр готов, ещё не все сплиты fact-таблицы перечислены. Координатор применяет динамический фильтр прямо при дальнейшем split enumeration и отбрасывает партиции, которые join всё равно отсеет. Будь генерация eager, все сплиты уже были бы перечислены, и эта оптимизация на этапе перечисления стала бы невозможна.

LIMIT и ранний останов. Запрос SELECT * FROM huge_table LIMIT 100 завершается, как только набраны 100 строк. SplitSource в этот момент закрывается, оставшиеся сплиты просто не порождаются. Сравните с подходом, где список сплитов посчитан целиком заранее: там вся работа по перечислению уже потрачена впустую.

DuckDB: pipeline-исполнение и ранний останов в аналитическом движке

Здесь стоит уточнить общую механику раннего останова. Trino — pipeline-движок: данные стримятся через операторы по мере чтения, а не материализуются целиком. Оператор LIMIT считает прошедшие строки и, как только их 100, сообщает вверх по конвейеру, что больше данных не нужно. Этот сигнал доходит до source-стадии, та закрывает SplitSource, и цепочка останавливается. Ленивость SplitSource — необходимое звено этой цепочки: если бы сплиты материализовались заранее, сигнал «хватит» пришёл бы уже после того, как координатор обошёл всё хранилище. Ленивая генерация и pipeline-исполнение работают вместе — одно без другого не дало бы мгновенного LIMIT на огромной таблице.

NOTE

Размер батча, который запрашивает координатор, не фиксирован жёстко: он подстраивается под то, сколько сплитов воркеры готовы принять, и под потолок очередей. Не стоит думать о нём как о настраиваемой константе — это динамическая величина внутри цикла «запросил батч -> раздал -> запросил ещё». Важно само свойство: координатор никогда не просит у SplitSource больше, чем способен сейчас разместить.


Что увидит инженер

Число сплитов, которое в итоге сгенерировал коннектор, видно в EXPLAIN ANALYZE и в Web UI. В Web UI на странице запроса у source-стадии есть счётчик total splits. В выводе EXPLAIN ANALYZE объём входа source-фрагмента косвенно отражает, на сколько кусков коннектор порезал данные:

EXPLAIN ANALYZE
SELECT count(*) FROM tpch.sf100.lineitem;
Fragment 1 [SOURCE]
    CPU: 38.40s, Scheduled: 1.62m, Input: 600037902 rows (0B)
    ScanProject[table = tpch:lineitem:sf100]
        Input avg.: 9375592.00 rows, Input std.dev.: 0.69%

Если число сплитов подозрительно мало — например, у большой таблицы один-два сплита, — это сигнал, что коннектор не смог распараллелить чтение: один большой нечленимый файл, либо JDBC-источник. Тогда никакое добавление воркеров скорость чтения не поднимет, и решать проблему нужно на уровне раскладки данных (больше файлов разумного размера) или на уровне коннектора.


Попробуй сам

На песочнице курса (Trino 481):

  1. Выполните EXPLAIN ANALYZE SELECT count(*) FROM tpch.sf1.lineitem; и затем то же для tpch.sf100.lineitem. Сравните Scheduled time у source-фрагмента. Объясните, почему рост на два порядка по данным не даёт ровно двукратного-стократного роста времени: что в схеме «batch-генерация + параллельная обработка» сглаживает зависимость.

  2. Рассуждение: у вас есть таблица Iceberg, состоящая из одного Parquet-файла на 4 ГБ, и таблица из 400 файлов по 10 МБ. Суммарный объём одинаков. Какая прочитается быстрее на кластере из 4 воркеров и почему — свяжите ответ с тем, сколько сплитов породит SplitSource в каждом случае и как это ограничивает параллелизм.

  3. Объясните своими словами, почему SELECT * FROM tpch.sf100.orders LIMIT 5 возвращается почти мгновенно, хотя таблица огромна. Что произошло с SplitSource в момент, когда набрались 5 строк.


Проверка знанийKnowledge check
Почему ConnectorSplitManager возвращает объект SplitSource, а не готовый список сплитов? Какие конкретные выгоды это даёт и как связано с partition pruning и LIMIT?
ОтветAnswer
SplitSource — это ленивый потоковый источник: координатор запрашивает у него сплиты порциями через getNextBatch по мере готовности их раздавать, вместо того чтобы получить весь список сразу. Это даёт три выгоды. Первая — время до первого результата: как только пришёл первый батч, воркеры уже читают данные, а коннектор параллельно готовит следующие батчи; перечисление и обработка идут внахлёст. Вторая — память координатора: несгенерированный сплит не занимает heap, для таблицы в сотни тысяч файлов это экономия десятков-сотен мегабайт. Третья — backpressure: координатор зовёт getNextBatch только когда в очередях воркеров есть место, и темп генерации сам подстраивается под темп обработки. С partition pruning связь в том, что SplitSource физически не листает директории отсечённых WHERE-условием партиций. С LIMIT — запрос завершается, как только набрано нужное число строк, и оставшиеся сплиты просто не порождаются; при eager-подходе вся работа по их перечислению была бы потрачена впустую.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое split в Trino?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5