Learning Platform
Глоссарий Troubleshooting
Урок 09.06 · 23 мин
Средний
cbodynamic-filteringpartition-pruningjoin

Dynamic filtering: фильтры с build-side и partition pruning

Этот урок завершает модуль про CBO самой эффектной его оптимизацией. Все предыдущие — статистика, join reordering, distribution, pushdown — работают на этапе планирования: решения принимаются до старта запроса. Dynamic filtering работает иначе. Это оптимизация времени исполнения: фильтр, которого на этапе планирования ещё не существовало, рождается в середине запроса из уже посчитанных данных и применяется к тому, что ещё не прочитано.

Звучит как магия, но механика строгая. Разберём её — и поймём, почему dynamic filtering особенно важен для типичной аналитической схемы «звезда».


Проблема: join читает данные, которые тут же выбросит

Возьмём каноничный аналитический запрос. Большая fact-таблица sales — миллиард строк продаж. Маленькая dimension-таблица stores — справочник магазинов. Нужны продажи только магазинов одного города:

SELECT s.amount, st.store_name
FROM sales s
JOIN stores st ON s.store_id = st.store_id
WHERE st.city = 'Berlin';

Как это исполняется наивно. Trino фильтрует stores по city = 'Berlin' — остаётся, скажем, 12 магазинов. Затем джойнит: читает всю fact-таблицу sales, миллиард строк, и для каждой проверяет, есть ли её store_id среди 12 берлинских.

Расточительность очевидна. Из миллиарда строк sales совпадут лишь те, что относятся к 12 магазинам, — может быть, доля процента. Но прочитан с диска и прогнан через join был весь миллиард. Фильтр city = 'Berlin' относится к stores, и статически, на этапе планирования, протолкнуть его в скан sales нельзя — в sales нет столбца city. Связь между city и нужными store_id в sales обнаруживается только в момент исполнения join’а.

Наивный join: читаем весь fact, выбрасываем почти всё
stores отфильтрованWHERE city = 'Berlin' оставил 12 магазинов — build-сторона join'а
join по store_id
sales: читается весьБез dynamic filtering сканируется весь миллиард строк fact-таблицы, хотя совпадёт доля процента
join отсеивает
Результат: доля процентаСовпали лишь строки 12 магазинов — почти весь прочитанный миллиард выброшен впустую

Идея: построить фильтр из build-side во время исполнения

Dynamic filtering решает это так. Build-сторона join’а — отфильтрованная stores — обрабатывается раньше probe-стороны: чтобы строить хэш-таблицу, build-сторону надо прочитать первой. А значит, к моменту, когда дело дойдёт до чтения sales, Trino уже знает конкретные значения store_id тех самых 12 берлинских магазинов.

Из этого знания Trino строит динамический фильтр — рантайм-предикат «store_id IN (множество из 12 значений)» — и применяет его к скану sales. Теперь скан sales читает не весь миллиард, а только строки, чей store_id входит в это множество. Фильтр родился из данных build-стороны во время исполнения и был применён к ещё не прочитанной probe-стороне.

Spark AQE: Dynamic Partition Pruning ClickHouse: GLOBAL JOIN для распределённых запросов

Последовательность строгая:

  1. Прочитать и отфильтровать build-сторону (stores с city = 'Berlin') — 12 строк.
  2. Собрать с неё значения join-ключа store_id — множество из 12 значений.
  3. Сформировать динамический предикат s.store_id IN (...).
  4. Применить его к скану probe-стороны sales — читать только подходящие строки.
Dynamic filtering: фильтр из build-side
Build-сторона: stores, city='Berlin'Build-сторона обрабатывается первой — иначе не построить хэш-таблицу join'а
собрать значения store_id
Динамический фильтр store_id IN (...)Рантайм-предикат, построенный из реальных значений join-ключа build-стороны во время исполнения
применить к скану sales
Скан sales читает только нужноеProbe-сторона читает только строки с подходящим store_id — вместо всего миллиарда

Точные значения или диапазон min/max

Множество значений с build-стороны не всегда мало. Если build-сторона отфильтровалась слабо и в ней не 12, а миллион значений store_id, держать их все как точное множество-предикат дорого по памяти.

Поэтому у dynamic filtering две стратегии сбора:

  • Точное множество значений. Пока различных значений join-ключа на build-стороне немного — Trino собирает их полным точным списком. Динамический фильтр получается максимально избирательным: IN (конкретные значения).
  • Диапазон min/max. Если различных значений слишком много (превышены внутренние лимиты на число и размер собираемых значений), Trino не держит весь список, а сводит фильтр к диапазону: минимум и максимум join-ключа. Предикат становится «store_id BETWEEN min AND max» — менее избирательный, чем точный список, но дешёвый и всё равно отсекающий часть данных fact-таблицы за пределами диапазона.

Есть ограничение: фильтр по диапазону min/max не применяется к типам DOUBLE, REAL и к типам, для которых упорядочивание не определено, — для них работает только режим точного множества.

СтратегияКогдаКакой предикатИзбирательность
Точное множестворазличных значений немногоIN (значения)высокая
Диапазон min/maxразличных значений многоBETWEEN min AND maxумеренная

Partition pruning: фильтр срабатывает ещё до чтения

Самый сильный эффект dynamic filtering даёт на партиционированной fact-таблице. Здесь динамический фильтр применяется не построчно при чтении, а ещё раньше — на этапе выбора, какие данные вообще читать.

Вспомним из шестого модуля: координатор перечисляет сплиты fact-таблицы лениво, через SplitSource. Динамический фильтр готов в середине запроса — и, поскольку перечисление сплитов ленивое, к этому моменту ещё не все сплиты sales перечислены. Координатор применяет динамический фильтр прямо при дальнейшем split enumeration: если fact-таблица партиционирована по store_id (или по коррелирующему столбцу), партиции, чьи значения не попадают в динамический фильтр, не порождают сплитов вообще. Их файлы не открываются, не читаются, не доходят до воркеров.

Это и есть dynamic partition pruning — динамическое отсечение партиций. Разница с обычным построчным dynamic filtering принципиальна. Построчный фильтр всё-таки открывает файл и отбрасывает строки при чтении. Partition pruning не открывает файлы ненужных партиций вовсе — экономится сам дисковый I/O, а не только обработка прочитанного.

Тот же механизм отсечения работает и внутри файлов колоночных форматов: в Hive/Iceberg с ORC и Parquet динамические фильтры проталкиваются в reader, и тот по статистике пропускает целые stripe’ы или row group’и, не подходящие под фильтр.

Dynamic partition pruning: ненужные партиции не читаются
Динамический фильтр готовМножество store_id с build-стороны собрано в середине запроса
координатор перечисляет сплиты sales лениво
Партиции в фильтреПартиции, чьи значения попадают в динамический фильтр, порождают сплиты и читаются
Партиции вне фильтраПартиции вне фильтра не порождают сплитов вообще — их файлы не открываются, дисковый I/O экономится

Где применяется и где видно

Dynamic filtering применяется не ко всем join’ам — есть условия. Он работает для INNER и RIGHT join’ов с условиями =, <, <=, >, >=, IS NOT DISTINCT FROM, а также для semi-join (подзапрос IN). Сторона, с которой собирается фильтр, — всегда build-сторона; для broadcast join’а фильтр собирается до партиционирования build-стороны и проталкивается локально, для partitioned join’а — после партиционирования по join-ключам и рассылается воркерам.

Увидеть dynamic filtering можно в EXPLAIN и в Web UI. В плане у join-узла появляется dynamicFilterAssignments — назначение динамического фильтра, а у скана probe-стороны — dynamicFilters в списке примененных к нему фильтров. В Web UI на странице запроса есть раздел dynamicFiltersStats — он показывает, сколько динамических фильтров собрано и насколько они сократили чтение.

WARNING

Dynamic filtering эффективен ровно тогда, когда build-сторона действительно селективна — сильно отфильтрована и даёт небольшое множество значений join-ключа. Если build-сторона почти не отфильтрована (например, в stores без WHERE или с неселективным условием), динамический фильтр охватит почти все значения store_id, отсекать в sales будет нечего, а на сбор фильтра уйдут ресурсы. Польза dynamic filtering прямо пропорциональна селективности build-стороны: на схеме «звезда» с маленькими отфильтрованными dimension-таблицами он даёт огромный выигрыш, на join’е двух почти неотфильтрованных больших таблиц — почти ничего. Это не рычаг, который надо крутить, — он работает автоматически; понимать его нужно, чтобы знать, когда ждать от него эффекта.


Попробуй сам

На песочнице курса (Trino 481):

  1. Выполните EXPLAIN для join с селективным фильтром по маленькой таблице: EXPLAIN SELECT l.* FROM tpch.sf10.lineitem l JOIN tpch.sf1.orders o ON l.orderkey = o.orderkey WHERE o.orderpriority = '1-URGENT';. Найдите в плане у join-узла dynamicFilterAssignments и у скана lineitemdynamicFilters. Сформулируйте, какая таблица — build-сторона и откуда собирается фильтр.

  2. Запустите этот же запрос как EXPLAIN ANALYZE и затем откройте его в Web UI. Найдите раздел dynamicFiltersStats. Зафиксируйте, насколько динамический фильтр сократил объём прочитанных данных lineitem.

  3. Рассуждение в двух абзацах. Первый: объясните, почему фильтр WHERE st.city = 'Berlin' нельзя статически, на этапе планирования, протолкнуть в скан fact-таблицы sales, и почему dynamic filtering обходит это ограничение. Второй: в чём разница между построчным dynamic filtering и dynamic partition pruning по тому, что именно экономится — обработка или дисковый I/O.


Проверка знанийKnowledge check
Как работает dynamic filtering, чем он отличается от статического pushdown по моменту работы, и почему его эффективность зависит от селективности build-стороны?
ОтветAnswer
Dynamic filtering — это оптимизация времени исполнения: фильтр, которого на этапе планирования ещё не существовало, рождается в середине запроса из уже посчитанных данных и применяется к тому, что ещё не прочитано. Этим он отличается от статического pushdown и других оптимизаций CBO, которые принимают все решения на этапе планирования, до старта запроса. Механика опирается на порядок исполнения join'а: build-сторона (например, маленькая dimension-таблица stores, отфильтрованная по city = 'Berlin') обрабатывается раньше probe-стороны, потому что по ней строится хэш-таблица. Значит, к моменту чтения большой probe-стороны (fact-таблицы sales) Trino уже знает конкретные значения join-ключа build-стороны. Из них он строит динамический предикат — store_id IN (множество значений) или, если значений слишком много, диапазон BETWEEN min AND max — и применяет его к скану probe-стороны: читаются только подходящие строки вместо всего объёма. Это решает проблему, которую статический pushdown решить не может: фильтр city = 'Berlin' относится к stores, и протолкнуть его в скан sales на этапе планирования нельзя — в sales нет столбца city, связь между city и нужными store_id обнаруживается только при исполнении join'а. На партиционированной fact-таблице эффект сильнее всего: поскольку координатор перечисляет сплиты лениво, динамический фильтр применяется прямо при дальнейшем split enumeration, и партиции вне фильтра не порождают сплитов вовсе — их файлы не открываются, экономится сам дисковый I/O, а не только обработка прочитанного; это dynamic partition pruning. Эффективность dynamic filtering прямо пропорциональна селективности build-стороны: если build-сторона сильно отфильтрована и даёт небольшое множество значений ключа, динамический фильтр резко сокращает чтение probe-стороны; если же build-сторона почти не отфильтрована, фильтр охватит почти все значения, отсекать будет нечего, а ресурсы на сбор фильтра потратятся. Поэтому на схеме «звезда» с маленькими отфильтрованными dimension-таблицами dynamic filtering даёт огромный выигрыш, а на join'е двух почти неотфильтрованных больших таблиц — почти ничего.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Чем dynamic filtering принципиально отличается от статического pushdown и других оптимизаций CBO?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6