Параллельная агрегация
GROUP BY — сердце аналитического запроса. «Сумма продаж по регионам», «число событий по дням», «средний чек по сегментам» — всё это группирующая агрегация. И как hash join, агрегация сталкивает потоки на общей структуре: значения групп надо свести вместе. Этот урок про то, как DuckDB считает GROUP BY параллельно — про локальную агрегацию в каждом потоке, про partitioned hash aggregate и про то, почему здесь работает уже знакомый приём «локально, потом объединить», но с важным добавлением.
Что делает группирующая агрегация
Сначала механика GROUP BY без параллелизма. Запрос SELECT region, sum(amount) FROM sales GROUP BY region строит для каждого уникального значения region накопленное состояние — здесь это текущая сумма amount.
Работает агрегация через хеш-таблицу, и это другая хеш-таблица, не та, что в join. В join хеш-таблица хранила строки. В агрегации хеш-таблица хранит группы: ключ — значение группирующей колонки (region), значение — накапливаемое состояние агрегата (текущая сумма). Движок идёт по входным строкам: для строки берёт её region, находит в хеш-таблице группу с этим ключом и обновляет её состояние — прибавляет amount к сумме. Группы нет — создаётся новая. Обработав весь вход, хеш-таблица содержит финальные значения всех групп.
Связь с pipelines: групповая агрегация — pipeline breaker. Финальное значение группы не готово, пока не обработана последняя входная строка — она может попасть в любую группу и изменить её итог. Поэтому агрегация сначала поглощает весь вход целиком и лишь потом отдаёт результат.
Шаг 1: локальная агрегация в каждом потоке
Параллелим. Та же ловушка, что в hash join: если все потоки обновляют одну общую хеш-таблицу групп, они конкурируют за неё, нужны блокировки, и параллелизм вырождается. Решение начинается так же — приёмом «сначала локально».
Каждый поток строит свою локальную хеш-таблицу групп. Поток берёт свои morsel-ы входных строк и агрегирует их в свою личную хеш-таблицу: считает суммы по регионам только для своих строк. Хеш-таблица личная — конкуренции нет, блокировки не нужны, потоки агрегируют параллельно и независимо. Перекос между ними сглаживает morsel-driven раздача.
Но дальше — отличие от join, и его важно понять. В hash join результаты потоков были непересекающимися кусками одной хеш-таблицы: каждая строка build-стороны попадала ровно в одну локальную таблицу, и слияние было простой склейкой. В агрегации не так. Группа region = 'EU' встретится у каждого потока: в потоке A какая-то часть EU-строк, в потоке B другая часть, в потоке C третья. После локальной агрегации 'EU' есть в хеш-таблице каждого потока — со своей частичной суммой.
Значит просто склеить локальные хеш-таблицы нельзя. Финальная сумма по 'EU' — это сумма частичных EU-сумм из всех потоков. Локальные результаты надо не склеить, а свести по группам: одинаковые группы из разных потоков объединить, сложив их состояния. Это слияние сложнее, чем в join, и если делать его в один поток — оно само станет узким местом.
Шаг 2: partitioned hash aggregate
Чтобы и слияние шло параллельно, DuckDB применяет partitioned hash aggregate — партиционированную хеш-агрегацию. Это ключевая идея урока.
Замысел такой: разделить пространство групп на партиции по хешу ключа. Все возможные значения группирующей колонки заранее раскладываются по N партициям — какая группа в какую партицию, определяет хеш её ключа. Например, группа 'EU' по хешу всегда попадает в партицию 2, 'US' — всегда в партицию 5, и так для всех групп, у каждого потока одинаково.
Это меняет устройство локальной хеш-таблицы. Личная хеш-таблица потока теперь не монолитная — она разбита на те же N партиций. Когда поток агрегирует строку, он по хешу её ключа кладёт частичное состояние в соответствующую партицию своей локальной таблицы. Поток A держит свои партиции 0..N, поток B — свои партиции 0..N, и нумерация партиций у них согласована.
И вот что это даёт на слиянии. Зафиксируем партицию 2 — в неё, по построению хеша, попадают одни и те же группы у всех потоков ('EU' и прочие группы с таким хешем). Чтобы получить финальные значения групп партиции 2, нужно свести вместе только партиции номер 2 из всех потоков — и больше ничего. Партиция 2 полностью независима от партиции 5: их группы не пересекаются, хеш развёл их по разным партициям.
А раз партиции независимы — слияние само параллелится по партициям. Поток A сводит партицию 2 (берёт партицию 2 у всех потоков, складывает состояния групп), поток B сводит партицию 5, поток C сводит партицию 7. Они не мешают друг другу: разные партиции — разные группы. Перекос на этом шаге снова сглаживает morsel-driven раздача — теперь единицей работы выступает партиция.
Партиционирование превратило неудобное «свести все группы из всех потоков» в набор независимых задач «свести одну партицию» — и эти задачи раздаются потокам.
Partitioned hash aggregate — это приём «локально, потом объединить» с добавкой партиционирования, и добавка эта принципиальна. В hash join слияние локальных хеш-таблиц было простой склейкой непересекающихся кусков. В агрегации одна группа размазана по всем потокам, поэтому слияние — это сведение по группам, и без партиционирования оно стало бы последовательным узким местом. Партиционирование по хешу ключа делает партиции независимыми — и слияние тоже параллелится. Тот же приём встречается в out-of-core агрегации: партиции дают ещё и единицы спилла на диск.
Почему агрегация особенно выигрывает в DuckDB
Группирующая агрегация — настолько частая операция в аналитике, что её параллельность во многом определяет общее впечатление от скорости движка. Соберём, что даёт partitioned hash aggregate на практике.
Параллельны обе стадии. Локальная агрегация (поглощение входа) идёт без конкуренции — каждый поток в своей хеш-таблице. Слияние тоже идёт параллельно — по независимым партициям. Узкого однопоточного места нет ни на одной стадии. Это и есть смысл партиционирования: оно убрало последовательный шаг, который иначе ограничивал бы масштабирование.
Хорошее масштабирование почти на любом числе групп. И когда групп мало (десяток регионов), и когда их миллионы (агрегация по идентификатору пользователя) — схема работает: партиции независимы, потоки сводят их параллельно, добавление ядер ускоряет и поглощение входа, и слияние.
Низкая стоимость синхронизации. Координация потоков сведена к раздаче partition-задач на слиянии — а это, как и в morsel-driven, дешёвое обращение к диспетчеру за номером следующей единицы работы. Никаких блокировок в горячем цикле обновления групп.
Увидеть параллельную агрегацию в плане можно через EXPLAIN — оператор группирующей агрегации виден явно:
EXPLAIN SELECT region, sum(amount) FROM sales GROUP BY region;
Фрагмент вывода:
PHYSICAL_PLAN
HASH_GROUP_BY
Groups: region
Aggregates: sum(amount)
TABLE_SCAN sales
Оператор HASH_GROUP_BY — это и есть партиционированная хеш-агрегация. Под капотом он исполняется так, как описано: каждый поток сначала агрегирует свои morsel-ы в локальную партиционированную хеш-таблицу, затем потоки параллельно сводят независимые партиции в финальный результат. В плане это один узел, но за ним — двухстадийный параллельный алгоритм.
Попробуй сам
Понаблюдайте за масштабированием агрегации на разном числе групп.
- Создайте крупную таблицу:
CREATE TABLE sales AS SELECT range AS id, (range % 20)::INTEGER AS region, (range % 5000000)::INTEGER AS user_id, (random()*1000)::INTEGER AS amount FROM range(40000000);иCHECKPOINT. - Включите таймер (
.timer on). Выполните агрегацию с малым числом групп:SELECT region, sum(amount) FROM sales GROUP BY region;— здесь всего 20 групп. Запишите время на всех ядрах. - Выполните агрегацию с огромным числом групп:
SELECT user_id, sum(amount) FROM sales GROUP BY user_id;— здесь миллионы групп. Запишите время на всех ядрах. - Ограничьте до одного потока (
SET threads = 1;) и повторите оба запроса. Сравните ускорение от многопоточности для случая 20 групп и для случая миллионов групп. Масштабируется ли агрегация в обоих случаях? - Выполните
EXPLAIN SELECT region, sum(amount) FROM sales GROUP BY region;и найдите операторHASH_GROUP_BY. ЗатемEXPLAIN ANALYZEтого же запроса — посмотрите его время. - Поразмышляйте: почему partitioned hash aggregate масштабируется и на 20 группах, и на миллионах? Что было бы со слиянием, если бы партиций не было и финальное сведение групп шло в один поток?
Этот эксперимент показывает, что группирующая агрегация в DuckDB параллельна на обеих стадиях и держит масштабирование независимо от числа групп.
Trino: partial aggregation и shuffle для GROUP BY