External hash aggregation: партиционирование и спилл групп
GROUP BY — одна из самых частых операций в аналитике и одна из самых требовательных к памяти. Чтобы сгруппировать строки, движку нужно где-то хранить состояние каждой группы. Если групп миллионы или их состояние тяжёлое, это состояние может не поместиться в RAM. External hash aggregation — механизм, который позволяет GROUP BY отработать в этом случае, спиллив часть работы на диск.
Этот урок разбирает hash-агрегацию изнутри: как она работает в памяти, что именно ломается при нехватке памяти, и как партиционирование с двухфазной обработкой решают проблему. Внешняя агрегация появилась в DuckDB ещё в версии 0.9 и с тех пор отлаженно прогоняет агрегаты на датасетах, многократно превышающих RAM.
Hash-агрегация в памяти
Сначала — как GROUP BY работает, когда памяти достаточно. DuckDB использует hash-агрегацию: строит hash-таблицу, где ключ — значения группирующих колонок, а значение — накапливаемое состояние агрегата.
Запрос SELECT country, count(*), sum(amount) FROM sales GROUP BY country обрабатывается так:
- Создаётся пустая hash-таблица.
- Движок сканирует
salesчанками по ~2048 строк. - Для каждой строки вычисляется хеш от
country, находится (или создаётся) запись группы в hash-таблице. - В записи группы обновляется состояние: счётчик
countинкрементируется, кsumприбавляетсяamount. - После прохода всей таблицы hash-таблица содержит финальное состояние каждой группы — это и есть результат.
Ключевой момент: память под агрегацию зависит не от числа строк, а от числа групп и тяжести их состояния. GROUP BY country на миллиарде строк — это всего ~200 групп, hash-таблица крошечная. А GROUP BY user_id на тех же строках — это десятки миллионов групп, и hash-таблица становится огромной. Именно второй случай упирается в память.
Что ломается при нехватке памяти
Проблема наивной hash-агрегации: вся hash-таблица должна быть в RAM целиком, потому что любая новая строка может попасть в любую группу. Нельзя «дочитать одну группу до конца» — строки нужной группы разбросаны по всей таблице, они встречаются вперемешку.
Если групп так много, что hash-таблица не влезает в memory_limit, наивный движок здесь падает с OOM. Нужен способ разбить задачу на части, каждая из которых помещается в память.
Партиционирование: разделяй и агрегируй
Идея external hash aggregation — разбить группы на партиции по хешу, и обрабатывать партиции по очереди, по одной за раз.
Наблюдение, которое всё делает возможным: если две строки попадают в одну группу, у них одинаковый group-ключ, а значит и одинаковый хеш. Следовательно, все строки одной группы всегда попадают в одну и ту же партицию по хешу. Группа не может «размазаться» по партициям. Это значит, что партиции можно агрегировать независимо: партиция содержит все строки всех своих групп целиком.
Механизм такой. DuckDB делит пространство хешей на N партиций (через старшие биты хеша). На первой фазе строки раскидываются по партициям. Партиции, помещающиеся в памяти, агрегируются сразу; партиции, для которых памяти не хватает, спиллятся в temp-файлы. На второй фазе спилленные партиции по одной поднимаются с диска и доагрегируются — каждая партиция по отдельности заведомо меньше всей задачи и помещается в память.
Почему это работает в фиксированной памяти: размер одной партиции — это примерно общий размер / N. Увеличивая N, DuckDB делает партиции достаточно мелкими, чтобы каждая в отдельности поместилась в memory_limit. Если даже одна партиция слишком велика, её можно партиционировать рекурсивно — разбить на под-партиции тем же приёмом. В итоге задача любого размера сводится к серии помещающихся в память кусков.
Партиционирование по хешу — ключ к корректности. Поскольку строки одной группы всегда имеют одинаковый хеш, они гарантированно попадают в одну партицию. Поэтому каждую партицию можно агрегировать полностью независимо от других: в ней лежат все строки всех её групп. Никакой группе не нужны данные из соседней партиции — финальные значения собираются попартиционно без дополнительного слияния групп между партициями.
Параллелизм и партиционирование
External hash aggregation хорошо ложится на многопоточность. Партиции независимы, поэтому разные потоки могут агрегировать разные партиции одновременно. DuckDB и для in-memory агрегации использует partitioned hash aggregate: каждый поток строит локальные частичные результаты, которые затем объединяются по партициям.
Связь с памятью прямая: все потоки делят общий memory_limit. Восемь потоков, каждый со своей растущей частичной hash-таблицей, суммарно должны уложиться в лимит. Чем больше потоков, тем меньше памяти на каждого и тем раньше начнётся спилл. Это конкретное проявление общего правила из урока про buffer manager.
Видим внешнюю агрегацию в работе
Воспроизведём ситуацию, где групп заведомо слишком много для маленького лимита.
-- 60 млн строк, 10 млн уникальных user_id -> 10 млн групп
CREATE TABLE events AS
SELECT range AS id,
range % 10000000 AS user_id,
random() * 100 AS amount
FROM range(60000000);
SET memory_limit = '400MB';
SET temp_directory = '/tmp/agg-spill';
-- группировка по 10 млн групп при лимите 400 МБ
SELECT user_id, count(*) AS cnt, sum(amount) AS total
FROM events
GROUP BY user_id
ORDER BY total DESC
LIMIT 5;
-- запрос ОТРАБОТАЕТ: hash-таблица на 10 млн групп
-- не влезла в 400 МБ, но партиции спиллились и доагрегировались
SET memory_limit = '400MB';
EXPLAIN ANALYZE
SELECT user_id, count(*) FROM events GROUP BY user_id;
В выводе оператор HASH_GROUP_BY будет помечен как работающий в external-режиме. Во время выполнения в /tmp/agg-spill появятся файлы спилленных партиций, которые исчезнут по завершении. Подними memory_limit до значения, в которое влезают 10 млн групп, повтори — метка external пропадёт, файлов не будет: агрегация прошла целиком в RAM.
Историческая отметка: именно этот механизм позволил DuckDB прогнать все запросы из набора в 50 ГБ на ноутбуке с 16 ГБ RAM — без external aggregation такие GROUP BY падали бы с OOM.
Ограничение: не всякое состояние спиллится
Важная честная оговорка. External hash aggregation отлично работает для агрегатов с компактным фиксированным состоянием: count, sum, avg, min, max — состояние группы это несколько чисел, его легко записать в spill-файл и поднять обратно.
Но некоторые агрегаты имеют сложное промежуточное состояние, которое на момент написания не все умеют спиллить. Например, агрегаты, которые внутри себя накапливают список или множество значений переменного размера. Если групп с таким тяжёлым несжимаемым состоянием очень много, теоретически возможен OOM даже при включённой внешней агрегации — это известное ограничение, упомянутое в документации DuckDB.
Практический вывод: для подавляющего большинства агрегатных запросов (обычные count/sum/avg/min/max по многим группам) external aggregation работает надёжно. Если же запрос с экзотическим агрегатом по огромному числу групп всё-таки падает по памяти — это сигнал упростить агрегат, уменьшить число групп предварительной фильтрацией или разбить запрос.
| Тип агрегата | Состояние группы | Спиллится надёжно? |
|---|---|---|
| count, sum, avg | несколько чисел | да |
| min, max | одно значение | да |
| Агрегаты со списком/множеством значений | переменный размер, может быть большим | не всегда — возможен OOM при множестве групп |
Попробуй сам
- Создай таблицу с большим числом групп:
CREATE TABLE t AS SELECT range AS id, range % 8000000 AS g, random() AS v FROM range(50000000);— это 8 млн групп. - Задай
SET temp_directory = '/tmp/agg-demo';иSET memory_limit = '350MB';ВыполниSELECT g, count(*), sum(v) FROM t GROUP BY g;(можно сLIMITдля краткости вывода). Запрос должен отработать. - Во время выполнения посмотри
/tmp/agg-demoиз другого терминала — есть ли файлы спилленных партиций? Что с ними после завершения? - Сделай
EXPLAIN ANALYZEтого же запроса при350MBи при большом лимите (например,6GB). Сравни метку режима уHASH_GROUP_BY— external в первом случае и её отсутствие во втором. - Сравни этот запрос с
SELECT count(*) FROM t GROUP BY (g % 50);— здесь всего 50 групп. Почему второй запрос не спиллит даже при350MB, хотя строк столько же? Сформулируй, от чего зависит память агрегации.