Catalyst Optimizer: Обзор архитектуры
Зачем нужен оптимизатор?
Когда вы пишете SQL-запрос или строите цепочку DataFrame-трансформаций, Spark не выполняет ваш код буквально. Вместо этого каждый запрос проходит через Catalyst Optimizer — многофазный оптимизатор, который перестраивает план выполнения для максимальной производительности.
Рассмотрим простой пример:
SELECT name FROM users WHERE age > 30
Без оптимизатора Spark мог бы сначала прочитать все колонки таблицы users, затем отфильтровать строки по age > 30, и только потом отбросить ненужные колонки. С Catalyst тот же запрос выполнится значительно эффективнее: фильтр сдвинется ближе к источнику данных, а ненужные колонки не будут прочитаны вообще.
Что такое Catalyst?
Catalyst — это расширяемый оптимизатор запросов, который использует гибридный подход:
- Rule-based optimization (RBO) — набор детерминированных правил трансформации: predicate pushdown, column pruning, constant folding. Эти правила применяются всегда и гарантированно улучшают план.
- Cost-based optimization (CBO) — статистический подход для выбора оптимальных стратегий (например, порядок join-операций). CBO использует метаданные таблиц: размер, количество строк, распределение значений.
Частая ошибка: Catalyst — это НЕ один этап оптимизации. Это многофазный конвейер из пяти последовательных стадий, каждая из которых трансформирует план запроса. Понимание этих стадий — ключ к чтению explain() вывода и диагностике медленных запросов.
Конвейер Catalyst: 5 стадий
Каждый запрос в Spark проходит через следующие стадии:
- SQL / DataFrame API — ваш код парсится в дерево выражений
- Unresolved Logical Plan — парсер создает план с нерезолвенными ссылками (Parser)
- Analyzed Logical Plan — Analyzer разрешает имена таблиц и колонок через Catalog
- Optimized Logical Plan — Optimizer применяет правила трансформации (RBO + CBO)
- Physical Plan — SparkPlanner выбирает конкретные стратегии выполнения
Пример: путь запроса через Catalyst
Проследим, как простой запрос трансформируется на каждой стадии:
result = spark.sql("""
SELECT name FROM users WHERE age > 30
""")
# Посмотрим полный план
result.explain(True)
Стадия 1 — Parser: Текст SQL разбирается в дерево. Таблица users становится UnresolvedRelation('users'), колонка age — UnresolvedAttribute('age').
Стадия 2 — Analyzer: Catalyst обращается к Catalog (метаданному хранилищу) и узнает, что таблица users существует в базе default, колонка age имеет тип Int, а name — тип String.
Стадия 3 — Optimizer: Применяются правила оптимизации:
- ColumnPruning — из таблицы читаются только
nameиage(без остальных колонок) - PushDownPredicate — фильтр
age > 30сдвигается к источнику данных
Стадия 4 — SparkPlanner: Логический план превращается в физический план с конкретными операторами: FileScan parquet вместо абстрактного Relation, Filter с конкретным predicate.
Стадия 5 — Code Generation: Whole-Stage CodeGen объединяет операторы в единый Java-метод, минимизируя виртуальные вызовы.
DataFrame API и SQL — один и тот же путь
Важно понимать: DataFrame API и SQL-запросы проходят через одинаковый конвейер Catalyst. Два эквивалентных запроса:
# DataFrame API
df.filter(df.age > 30).select("name")
# SQL
spark.sql("SELECT name FROM users WHERE age > 30")
оба генерируют идентичный Optimized Logical Plan. Spark не отдает предпочтение ни одному из API — оптимизатор работает на уровне логического плана, который абстрагирован от синтаксиса.
Расширяемость Catalyst
Catalyst построен на основе деревьев (trees) и правил (rules). Каждый план — это дерево узлов (nodes), а каждая оптимизация — это правило, которое трансформирует дерево. Это архитектурное решение делает Catalyst расширяемым: вы можете добавлять собственные правила оптимизации через SparkSessionExtensions.
Внутренне каждое правило реализует паттерн TreeNode.transform:
// Пример правила: заменяет x AND true → x
object SimplifyBooleanExpressions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case And(left, Literal(true, BooleanType)) => left
}
}
Optimizer итеративно применяет набор правил до фиксированной точки (fixed point) — пока план перестанет изменяться.
Как увидеть все стадии? Используйте df.explain(True) (или df.explain("extended") в Spark 3.0+). Вывод покажет 4 секции: Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan. Мы детально разберем каждую секцию в следующих уроках.
Что дальше?
В следующем уроке мы подробно разберем первые две стадии: как Parser создает Unresolved Logical Plan и как Analyzer разрешает ссылки через Catalog. Вы научитесь читать секции Parsed Logical Plan и Analyzed Logical Plan в выводе explain(true).
Лабораторная работа
Лабораторная даёт пощупать руками всё, что мы обсудили теоретически: разные режимы explain, срабатывание правил Catalyst, генерацию whole-stage codegen, перепланирование AQE и бинарное представление UnsafeRow в Tungsten. Так абстрактный pipeline Logical → Physical становится наблюдаемым.
cd labs/execution-plans
docker compose up -d
Полное описание и шаги проверки — в labs/execution-plans/README.md.