Learning Platform
Глоссарий Troubleshooting
Урок 09.05 · 23 мин
Средний
parallelismaggregationgroup-byinternals

Параллельная агрегация

GROUP BY — сердце аналитического запроса. «Сумма продаж по регионам», «число событий по дням», «средний чек по сегментам» — всё это группирующая агрегация. И как hash join, агрегация сталкивает потоки на общей структуре: значения групп надо свести вместе. Этот урок про то, как DuckDB считает GROUP BY параллельно — про локальную агрегацию в каждом потоке, про partitioned hash aggregate и про то, почему здесь работает уже знакомый приём «локально, потом объединить», но с важным добавлением.

Что делает группирующая агрегация

Сначала механика GROUP BY без параллелизма. Запрос SELECT region, sum(amount) FROM sales GROUP BY region строит для каждого уникального значения region накопленное состояние — здесь это текущая сумма amount.

Работает агрегация через хеш-таблицу, и это другая хеш-таблица, не та, что в join. В join хеш-таблица хранила строки. В агрегации хеш-таблица хранит группы: ключ — значение группирующей колонки (region), значение — накапливаемое состояние агрегата (текущая сумма). Движок идёт по входным строкам: для строки берёт её region, находит в хеш-таблице группу с этим ключом и обновляет её состояние — прибавляет amount к сумме. Группы нет — создаётся новая. Обработав весь вход, хеш-таблица содержит финальные значения всех групп.

Связь с pipelines: групповая агрегация — pipeline breaker. Финальное значение группы не готово, пока не обработана последняя входная строка — она может попасть в любую группу и изменить её итог. Поэтому агрегация сначала поглощает весь вход целиком и лишь потом отдаёт результат.

Шаг 1: локальная агрегация в каждом потоке

Параллелим. Та же ловушка, что в hash join: если все потоки обновляют одну общую хеш-таблицу групп, они конкурируют за неё, нужны блокировки, и параллелизм вырождается. Решение начинается так же — приёмом «сначала локально».

Каждый поток строит свою локальную хеш-таблицу групп. Поток берёт свои morsel-ы входных строк и агрегирует их в свою личную хеш-таблицу: считает суммы по регионам только для своих строк. Хеш-таблица личная — конкуренции нет, блокировки не нужны, потоки агрегируют параллельно и независимо. Перекос между ними сглаживает morsel-driven раздача.

Но дальше — отличие от join, и его важно понять. В hash join результаты потоков были непересекающимися кусками одной хеш-таблицы: каждая строка build-стороны попадала ровно в одну локальную таблицу, и слияние было простой склейкой. В агрегации не так. Группа region = 'EU' встретится у каждого потока: в потоке A какая-то часть EU-строк, в потоке B другая часть, в потоке C третья. После локальной агрегации 'EU' есть в хеш-таблице каждого потока — со своей частичной суммой.

Значит просто склеить локальные хеш-таблицы нельзя. Финальная сумма по 'EU' — это сумма частичных EU-сумм из всех потоков. Локальные результаты надо не склеить, а свести по группам: одинаковые группы из разных потоков объединить, сложив их состояния. Это слияние сложнее, чем в join, и если делать его в один поток — оно само станет узким местом.

Локальная агрегация: одна группа есть у многих потоков
Поток A: локальная HTПоток A агрегировал свои morsel-ы. В его хеш-таблице есть частичная сумма по EU, US и другим группам — только по строкам потока A.
Поток B: локальная HTПоток B агрегировал свои строки. У него тоже есть EU и US — со своими частичными суммами.
Поток C: локальная HTИ у потока C те же группы со своими частями. Финальная сумма по EU — это сумма частичных EU из A, B, C.

Шаг 2: partitioned hash aggregate

Чтобы и слияние шло параллельно, DuckDB применяет partitioned hash aggregate — партиционированную хеш-агрегацию. Это ключевая идея урока.

Замысел такой: разделить пространство групп на партиции по хешу ключа. Все возможные значения группирующей колонки заранее раскладываются по N партициям — какая группа в какую партицию, определяет хеш её ключа. Например, группа 'EU' по хешу всегда попадает в партицию 2, 'US' — всегда в партицию 5, и так для всех групп, у каждого потока одинаково.

Это меняет устройство локальной хеш-таблицы. Личная хеш-таблица потока теперь не монолитная — она разбита на те же N партиций. Когда поток агрегирует строку, он по хешу её ключа кладёт частичное состояние в соответствующую партицию своей локальной таблицы. Поток A держит свои партиции 0..N, поток B — свои партиции 0..N, и нумерация партиций у них согласована.

И вот что это даёт на слиянии. Зафиксируем партицию 2 — в неё, по построению хеша, попадают одни и те же группы у всех потоков ('EU' и прочие группы с таким хешем). Чтобы получить финальные значения групп партиции 2, нужно свести вместе только партиции номер 2 из всех потоков — и больше ничего. Партиция 2 полностью независима от партиции 5: их группы не пересекаются, хеш развёл их по разным партициям.

А раз партиции независимы — слияние само параллелится по партициям. Поток A сводит партицию 2 (берёт партицию 2 у всех потоков, складывает состояния групп), поток B сводит партицию 5, поток C сводит партицию 7. Они не мешают друг другу: разные партиции — разные группы. Перекос на этом шаге снова сглаживает morsel-driven раздача — теперь единицей работы выступает партиция.

Партиционирование превратило неудобное «свести все группы из всех потоков» в набор независимых задач «свести одну партицию» — и эти задачи раздаются потокам.

Partitioned hash aggregate: слияние по независимым партициям
Локальные HT, разбитые на партиции по хешу ключаХеш-таблица каждого потока разбита на N партиций. Хеш ключа группы определяет партицию — у всех потоков согласованно.
слияние по партициям, параллельно
Партицию 0 сводит поток XБерёт партицию 0 у всех потоков, складывает частичные состояния групп. Партиция 0 независима от других.
Партицию 1 сводит поток YБерёт партицию 1 у всех потоков. Её группы не пересекаются с группами других партиций.
Партицию 2 сводит поток ZБерёт партицию 2 у всех потоков. Слияние партиций идёт параллельно — узкого места нет.
TIP

Partitioned hash aggregate — это приём «локально, потом объединить» с добавкой партиционирования, и добавка эта принципиальна. В hash join слияние локальных хеш-таблиц было простой склейкой непересекающихся кусков. В агрегации одна группа размазана по всем потокам, поэтому слияние — это сведение по группам, и без партиционирования оно стало бы последовательным узким местом. Партиционирование по хешу ключа делает партиции независимыми — и слияние тоже параллелится. Тот же приём встречается в out-of-core агрегации: партиции дают ещё и единицы спилла на диск.

Почему агрегация особенно выигрывает в DuckDB

Группирующая агрегация — настолько частая операция в аналитике, что её параллельность во многом определяет общее впечатление от скорости движка. Соберём, что даёт partitioned hash aggregate на практике.

Параллельны обе стадии. Локальная агрегация (поглощение входа) идёт без конкуренции — каждый поток в своей хеш-таблице. Слияние тоже идёт параллельно — по независимым партициям. Узкого однопоточного места нет ни на одной стадии. Это и есть смысл партиционирования: оно убрало последовательный шаг, который иначе ограничивал бы масштабирование.

Хорошее масштабирование почти на любом числе групп. И когда групп мало (десяток регионов), и когда их миллионы (агрегация по идентификатору пользователя) — схема работает: партиции независимы, потоки сводят их параллельно, добавление ядер ускоряет и поглощение входа, и слияние.

Низкая стоимость синхронизации. Координация потоков сведена к раздаче partition-задач на слиянии — а это, как и в morsel-driven, дешёвое обращение к диспетчеру за номером следующей единицы работы. Никаких блокировок в горячем цикле обновления групп.

Увидеть параллельную агрегацию в плане можно через EXPLAIN — оператор группирующей агрегации виден явно:

EXPLAIN SELECT region, sum(amount) FROM sales GROUP BY region;

Фрагмент вывода:

PHYSICAL_PLAN
  HASH_GROUP_BY
    Groups: region
    Aggregates: sum(amount)
    TABLE_SCAN sales

Оператор HASH_GROUP_BY — это и есть партиционированная хеш-агрегация. Под капотом он исполняется так, как описано: каждый поток сначала агрегирует свои morsel-ы в локальную партиционированную хеш-таблицу, затем потоки параллельно сводят независимые партиции в финальный результат. В плане это один узел, но за ним — двухстадийный параллельный алгоритм.

Попробуй сам

Понаблюдайте за масштабированием агрегации на разном числе групп.

  1. Создайте крупную таблицу: CREATE TABLE sales AS SELECT range AS id, (range % 20)::INTEGER AS region, (range % 5000000)::INTEGER AS user_id, (random()*1000)::INTEGER AS amount FROM range(40000000); и CHECKPOINT.
  2. Включите таймер (.timer on). Выполните агрегацию с малым числом групп: SELECT region, sum(amount) FROM sales GROUP BY region; — здесь всего 20 групп. Запишите время на всех ядрах.
  3. Выполните агрегацию с огромным числом групп: SELECT user_id, sum(amount) FROM sales GROUP BY user_id; — здесь миллионы групп. Запишите время на всех ядрах.
  4. Ограничьте до одного потока (SET threads = 1;) и повторите оба запроса. Сравните ускорение от многопоточности для случая 20 групп и для случая миллионов групп. Масштабируется ли агрегация в обоих случаях?
  5. Выполните EXPLAIN SELECT region, sum(amount) FROM sales GROUP BY region; и найдите оператор HASH_GROUP_BY. Затем EXPLAIN ANALYZE того же запроса — посмотрите его время.
  6. Поразмышляйте: почему partitioned hash aggregate масштабируется и на 20 группах, и на миллионах? Что было бы со слиянием, если бы партиций не было и финальное сведение групп шло в один поток?

Этот эксперимент показывает, что группирующая агрегация в DuckDB параллельна на обеих стадиях и держит масштабирование независимо от числа групп.

Trino: partial aggregation и shuffle для GROUP BY
Проверка знанийKnowledge check
Почему слияние результатов параллельной агрегации сложнее, чем в hash join, и как partitioned hash aggregate делает это слияние тоже параллельным?
ОтветAnswer
Группирующая агрегация считает GROUP BY через хеш-таблицу групп: ключ — значение группирующей колонки, значение — накапливаемое состояние агрегата (например текущая сумма). Параллелится она приёмом сначала локально: каждый поток строит свою личную хеш-таблицу групп и агрегирует в неё свои morsel-ы — конкуренции нет, блокировки не нужны. Но слияние здесь сложнее, чем в hash join. В join результаты потоков были непересекающимися кусками одной хеш-таблицы — каждая строка build-стороны попадала ровно в одну локальную таблицу, и слияние было простой склейкой. В агрегации одна и та же группа встречается у всех потоков: группа EU есть в локальной хеш-таблице каждого потока со своей частичной суммой, потому что EU-строки разошлись по разным потокам. Поэтому локальные результаты нельзя склеить — их надо свести по группам: одинаковые группы из разных потоков объединить, сложив состояния. Если делать это в один поток, слияние само станет узким местом, ограничивающим масштабирование. Partitioned hash aggregate решает это партиционированием. Пространство групп заранее делится на N партиций по хешу ключа: какая группа в какую партицию — определяет хеш, у всех потоков согласованно (EU всегда в партицию 2, US всегда в партицию 5). Локальная хеш-таблица каждого потока разбита на те же N партиций, и поток кладёт частичное состояние группы в партицию по хешу её ключа. Это даёт независимость партиций: в партицию 2 у всех потоков попадают одни и те же группы, и чтобы получить финальные значения групп партиции 2, нужно свести только партиции номер 2 из всех потоков — партиция 2 не пересекается с партицией 5, хеш развёл их группы. А раз партиции независимы, слияние само параллелится по партициям: один поток сводит партицию 2, другой партицию 5, третий партицию 7, не мешая друг другу, и единицей работы при раздаче выступает партиция. Партиционирование превратило неудобное сведение всех групп из всех потоков в набор независимых задач свести одну партицию — поэтому параллельны обе стадии агрегации, и поглощение входа, и слияние, без однопоточного узкого места.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что хранит хеш-таблица в группирующей агрегации (в отличие от хеш-таблицы в hash join)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6