Правила оптимизации: Как Catalyst ускоряет запросы
Optimizer: третья стадия Catalyst
После того как Analyzer создал Resolved Logical Plan, Optimizer перестраивает его для повышения производительности. Optimizer применяет десятки правил трансформации, каждое из которых работает по принципу pattern matching: находит определенную структуру в дереве плана и заменяет её на более эффективную.
Критически важно: Optimizer не меняет семантику запроса. Результат выполнения оптимизированного плана всегда идентичен результату неоптимизированного.
Ключевые правила оптимизации
1. PushDownPredicate (Predicate Pushdown)
Суть: Перемещает фильтры (WHERE) как можно ближе к источнику данных.
До оптимизации:
Project [name, dept_name]
+- Filter (age > 30)
+- Join Inner (dept_id = dept_id)
:- Scan employees [id, name, age, dept_id]
+- Scan departments [dept_id, dept_name]
После PushDownPredicate:
Project [name, dept_name]
+- Join Inner (dept_id = dept_id)
:- Filter (age > 30)
: +- Scan employees [id, name, age, dept_id]
+- Scan departments [dept_id, dept_name]
Фильтр age > 30 сдвинулся до join-операции. Это критическое улучшение: если из 10 миллионов строк employees только 3 миллиона удовлетворяют условию age > 30, join будет обрабатывать в 3 раза меньше данных.
Для файловых источников (Parquet, ORC) predicate pushdown идёт ещё глубже — фильтр передается на уровень чтения файлов, и целые row groups (блоки по 128MB) пропускаются через min/max statistics.
2. ColumnPruning (Column Pruning)
Суть: Убирает колонки, которые не нужны для финального результата.
Рассмотрим запрос:
SELECT name, dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
Таблица employees содержит колонки [id, name, age, dept_id], но в результате нужны только name и dept_id (для join). ColumnPruning исключает id и age из чтения:
-- До: Scan employees [id, name, age, dept_id]
-- После: Scan employees [name, dept_id]
Для колоночных форматов (Parquet, ORC) это означает, что физически не читаются ненужные колонки с диска — огромная экономия I/O.
3. ConstantFolding (Constant Folding)
Суть: Вычисляет константные выражения на этапе планирования, а не при выполнении.
-- До: WHERE price > 100 * 1.2
-- После: WHERE price > 120.0
Без ConstantFolding выражение 100 * 1.2 вычислялось бы для каждой строки таблицы. С ConstantFolding оно вычисляется один раз при компиляции плана.
4. BooleanSimplification
Суть: Упрощает логические выражения.
-- x AND true → x
-- x OR false → x
-- x AND false → false (весь фильтр убирается)
-- NOT(NOT(x)) → x
-- x AND x → x
5. CombineFilters
Суть: Объединяет последовательные Filter узлы в один.
# Пользователь написал:
df.filter(df.age > 30).filter(df.age < 60)
# До CombineFilters:
# Filter (age < 60)
# Filter (age > 30)
# Scan
# После CombineFilters:
# Filter (age > 30 AND age < 60)
# Scan
Один Filter с составным предикатом эффективнее двух вложенных: меньше проходов по данным, лучше codegen.
6. ReorderJoin (CBO)
Суть: Переставляет порядок join-операций на основе статистик таблиц.
Для запроса с тремя таблицами:
SELECT * FROM big_table b
JOIN medium_table m ON b.id = m.id
JOIN small_table s ON m.id = s.id
Без CBO Spark выполнит join в порядке написания: (big JOIN medium) JOIN small. С CBO (если статистики собраны), Spark может изменить порядок на (small JOIN medium) JOIN big, что генерирует значительно меньше промежуточных данных.
# Собрать статистики для CBO
spark.sql("ANALYZE TABLE big_table COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE medium_table COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE small_table COMPUTE STATISTICS")
CBO в Spark включен по умолчанию (spark.sql.cbo.enabled=true), но работает только если собраны статистики таблиц. Без ANALYZE TABLE CBO не может оценить размеры таблиц и порядок join останется как в запросе.
Чтение Optimized Logical Plan
Продолжим наш сквозной пример:
result = spark.sql("""
SELECT name, dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
WHERE e.age > 30
""")
result.explain(True)
Секция == Optimized Logical Plan ==:
== Optimized Logical Plan ==
Project [name#5, dept_name#12]
+- Join Inner, (dept_id#7 = dept_id#11)
:- Project [name#5, dept_id#7]
: +- Filter (isnotnull(dept_id#7) AND isnotnull(age#6) AND (age#6 > 30))
: +- LocalRelation [id#4, name#5, age#6, dept_id#7]
+- Project [dept_id#11, dept_name#12]
+- Filter isnotnull(dept_id#11)
+- LocalRelation [dept_id#11, dept_name#12]
Что произошло по сравнению с Analyzed планом:
- PushDownPredicate — фильтр
age > 30сдвинулся под join (теперь он в веткеemployees) - ColumnPruning — из
employeesчитаются толькоnameиdept_id(плюсageдля фильтра) - Null-safety — добавлены
isnotnull()проверки для join-ключей (строки сnullвdept_idне могут участвовать в inner join)
Порядок применения правил
Optimizer применяет правила пакетами (batches). Каждый пакет содержит набор правил, которые применяются итеративно до фиксированной точки:
- Finish Analysis — финальная очистка после Analyzer
- Union — оптимизации UNION операций
- Subquery — оптимизации подзапросов
- Replace Operators — замена высокоуровневых операторов
- Aggregate — оптимизации агрегаций
- Operator Optimization — основной блок: PushDownPredicate, ColumnPruning, ConstantFolding, BooleanSimplification, CombineFilters
- Join Reorder — CBO-based перестановка join
Каждый пакет применяется до тех пор, пока план не перестанет изменяться (fixed point) или пока не будет достигнут лимит итераций (spark.sql.optimizer.maxIterations, по умолчанию 100).
Когда CBO превосходит RBO?
Rule-based оптимизации (RBO) — это “всегда правильные” трансформации. Но есть ситуации, где только CBO может выбрать оптимальный план:
| Решение | RBO | CBO |
|---|---|---|
| Predicate pushdown | Всегда применяет | — |
| Column pruning | Всегда применяет | — |
| Порядок join | Оставляет как есть | Выбирает по статистикам |
| Broadcast vs SortMerge join | По порогу (10MB) | По реальным размерам |
| Фильтр selectivity | Не оценивает | Учитывает распределение значений |