Параллельное сканирование
В прошлом уроке мы разобрали morsel-driven parallelism в общем виде: диспетчер раздаёт потокам мелкие morsel-ы из общего пула. Теперь применим эту модель к самому первому оператору почти любого запроса — к сканированию таблицы. Сканирование — это source большинства pipeline, и от того, насколько хорошо оно параллелится, зависит скорость всего запроса. Этот урок про то, как DuckDB читает одну таблицу всеми потоками одновременно и почему этим потокам почти не нужно друг с другом договариваться.
Сканирование — это source, и его надо параллелить первым
Вспомним прошлый урок про pipelines: у каждого pipeline есть source, и чаще всего source — это сканирование таблицы. Сканирование стоит в самом начале конвейера и подаёт данные всем операторам выше.
Отсюда простое следствие. Если source выдаёт данные медленно, в один поток, то операторы выше будут голодать — им нечего обрабатывать, они ждут source. Сколько ядер ни дай фильтру и агрегации, они упрутся в скорость подачи. Чтобы параллелизм всего запроса заработал, параллелить надо в первую очередь сам source — сканирование. Параллельное сканирование — фундамент, на котором стоит параллелизм остального pipeline.
К счастью, сканирование параллелится особенно хорошо — лучше, чем join или агрегация. Причина в его природе: чтение данных не создаёт общего состояния. Поток, читающий одну часть таблицы, ничего не должен сообщать потоку, читающему другую часть. Им нечего делить — каждый просто читает свой кусок. Это разительно отличается от hash join, где потоки строят общую хеш-таблицу, или от агрегации, где потоки сводят значения в общие группы. Сканирование — параллельная операция почти без точек соприкосновения.
Структура хранилища уже нарезана для параллелизма
Чтобы раздать сканирование потокам, таблицу нужно поделить на куски. И здесь DuckDB ничего не изобретает заново — нужная нарезка уже существует в storage-формате.
Вспомним модуль про хранилище. Таблица физически разбита на row groups — горизонтальные полосы примерно по 122 880 строк. Каждая row group самодостаточна: свои колоночные сегменты, своё сжатие, своя статистика. Чтобы прочитать row group, не нужно ничего знать про соседние row groups.
Это свойство — независимость row groups — и есть готовая основа параллельного сканирования. Куски, по которым раздаётся работа, не приходится вырезать на лету: storage-формат уже нарезал таблицу на независимые единицы. Морсель сканирования естественно строится из row groups (и более мелких частей внутри них). Связь, заложенная ещё в модуле про хранилище, замыкается здесь: row group проектировался как единица параллельной работы — и параллельное сканирование именно ею и оперирует.
Чтение по байтовым диапазонам
Спустимся ещё на уровень ниже — к самому вводу-выводу. Когда поток получил morsel сканирования, ему нужно прочитать соответствующие данные из файла .duckdb.
И тут пригодится свойство storage-формата из модуля про хранилище: файл разбит на блоки фиксированного размера, и каждый блок адресуется простой арифметикой — его смещение в файле есть block_id, умноженный на размер блока. То есть для любой row group и любого её колоночного сегмента DuckDB точно знает байтовый диапазон в файле — с какого байта по какой эти данные лежат.
Это превращает «прочитать morsel» в «прочитать конкретный байтовый диапазон файла». А чтение разных, непересекающихся диапазонов одного файла — операция, которую потоки выполняют полностью независимо. Поток A читает байты с 10-го по 20-й мегабайт, поток B — с 40-го по 50-й. Их диапазоны не пересекаются, они не пишут, а только читают — никакой координации между ними не нужно вообще. Каждый поток выдаёт операционной системе свой запрос на чтение своего участка файла.
Более того, такая раскладка дружелюбна к диску. Современные SSD и дисковые контроллеры обрабатывают несколько независимых запросов на чтение параллельно и эффективнее, чем один поток, дёргающий диск по очереди. Несколько потоков, каждый со своим байтовым диапазоном, естественно создают параллельную нагрузку на ввод-вывод — и пропускная способность диска используется полнее.
Почему координация почти не нужна
Соберём, в чём именно проявляется «почти отсутствие координации» при параллельном сканировании, — потому что это и делает скан таким хорошо масштабируемым оператором.
Потоки не делят данные. Каждый поток читает свой morsel — свои row groups, свои байтовые диапазоны. Один поток не нуждается в данных, прочитанных другим. Нет общей структуры, которую все наполняют, нет значений, которые надо свести вместе.
Единственная точка соприкосновения — диспетчер morsel-ов. Та самая координация из прошлого урока: общий пул morsel-ов, из которого потоки берут следующий кусок. Поток, дочитав свой morsel сканирования, приходит к диспетчеру за следующим. Вот и вся синхронизация — короткое обращение к пулу, чтобы взять номер очередного куска. Это касание измеряется не данными, а единственным числом «какой morsel следующий».
Сравните это с join и агрегацией. В hash join потоки строят общую хеш-таблицу — это серьёзная общая структура, требующая аккуратной синхронизации. В групповой агрегации потоки сводят значения в общие группы — снова общее состояние. Сканирование от всего этого свободно: его общее состояние — лишь счётчик «какой morsel выдать следующим». Поэтому скан масштабируется почти идеально: добавили ядро — оно просто включилось в разбор пула morsel-ов и читает свои байтовые диапазоны, ни с кем не конкурируя за общие данные.
Параллельное сканирование — самый чистый случай morsel-driven parallelism. Единица работы (morsel из row groups) и единица ввода-вывода (байтовый диапазон файла) совпадают и полностью независимы между потоками. Единственное общее состояние — диспетчер, выдающий номер следующего morsel-а. Поэтому скан масштабируется по ядрам почти линейно — лучше, чем операторы с общей структурой данных вроде hash join.
Где параллельный скан упирается в потолок
Параллельное сканирование масштабируется отлично, но не бесконечно — у него есть естественный потолок, и полезно понимать какой.
Этот потолок — пропускная способность диска. Если таблица не в кэше операционной системы и читается с диска, то суммарная скорость, с которой все потоки вместе поднимают байты, ограничена тем, как быстро отдаёт байты сам носитель. Допустим, восемь потоков уже выбирают всю пропускную способность SSD — тогда добавление девятого и десятого потока скан не ускорит: байты уже текут с диска на максимуме, лишние потоки просто ждут ввод-вывод. Узкое звено переехало с CPU на диск.
Здесь и срабатывает компрессия из прошлого модуля — и видно, как два механизма работают вместе. Сжатие уменьшает объём байт, которые надо поднять с диска. Меньше байт при той же пропускной способности диска — значит потолок «упёрлись в диск» отодвигается, и больше потоков успевают заниматься полезной работой, прежде чем ввод-вывод станет ограничением. Параллельное сканирование раздаёт работу потокам, а lightweight-сжатие уменьшает саму работу ввода-вывода — вместе они и дают быстрый скан.
А когда данные уже в памяти — в кэше ОС после первого чтения или в in-memory базе — дискового потолка нет вообще, и параллельное сканирование упирается уже в скорость CPU и пропускную способность памяти. Тогда оно масштабируется по ядрам ещё ближе к линейному.
Попробуй сам
Исследуйте масштабирование сканирования.
- Создайте крупную таблицу:
CREATE TABLE scan_big AS SELECT range AS id, (random() * 100)::INTEGER AS v, range::VARCHAR AS s FROM range(40000000);иCHECKPOINT. - Включите таймер (
.timer on) и выполните простой полный скан с агрегацией:SELECT sum(v) FROM scan_big;. Запишите время на всех ядрах (threadsпо умолчанию). - Ограничьте до одного потока:
SET threads = 1;, повторите. Сравните со временем на всех ядрах — во сколько раз быстрее многопоточный скан? - Постепенно увеличивайте:
SET threads = 2;,4,8(до вашего числа ядер), каждый раз повторяя запрос. Где ускорение начинает замедляться? Это может быть признаком, что вы упёрлись в пропускную способность диска или памяти. - Прогрейте кэш: выполните скан дважды подряд — второй раз данные уже в кэше ОС. Сравните время «холодного» и «горячего» скана. Объясните разницу через дисковый потолок: при горячем кэше его нет.
- Посмотрите
EXPLAIN ANALYZE SELECT sum(v) FROM scan_big;— у оператора TABLE_SCAN видно время и число строк. Прикиньте, какую долю всего времени запроса занимает именно сканирование.
Этот эксперимент даёт почувствовать, что сканирование параллелится почти линейно — пока не упрётся в диск или память, и тогда дальнейшее добавление потоков перестаёт помогать.
Trino: splits как единица параллельного чтения