Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 32 мин
Продвинутый
SparkSessionExtensionsCatalyst RulesLogicalPlanTreeNodeOptimizerAnalyzer

Кастомные правила Catalyst: SparkSessionExtensions

Catalyst — расширяемый оптимизатор. Слово “расширяемый” здесь не маркетинговое: начиная с Spark 2.2 существует официальный API, который позволяет добавлять собственные правила на любой фазе конвейера. Именно этим пользуются Iceberg Extensions, Delta Lake, Hudi, и ряд облачных платформ — чтобы добавить синтаксис ALTER TABLE SET TBLPROPERTIES, перехватить MERGE INTO, или переписать план до того, как стандартный оптимизатор до него доберётся.

Этот урок — полное руководство по работе с SparkSessionExtensions: какие точки расширения существуют, как писать Rule[LogicalPlan], как безопасно трансформировать дерево, и где заканчиваются возможности и начинаются риски.

SparkSessionExtensions: точки расширения

SparkSessionExtensions — это реестр функций-расширений. Вы передаёте его в SparkSession.Builder.withExtensions() или через конфигурацию:

// Способ 1: Программно
val spark = SparkSession.builder()
  .withExtensions(ext => {
    ext.injectOptimizerRule(session => MyOptimizerRule(session))
    ext.injectResolutionRule(session => MyAnalyzerRule)
    ext.injectParser((session, parser) => new MyCustomParser(parser))
    ext.injectPlannerStrategy(session => MyPlannerStrategy(session))
  })
  .getOrCreate()

// Способ 2: Через конфигурацию (для prod-деплоя без изменения кода)
// spark.sql.extensions=com.example.MyExtensionsClass
// MyExtensionsClass должен иметь публичный no-arg конструктор и реализовывать
// Function1[SparkSessionExtensions, Unit]

Полный список точек расширения в SparkSessionExtensions (Spark 4.0):

МетодФаза конвейераКогда вызывается
injectResolutionRuleAnalyzer, Resolution batchПосле парсинга, при резолвинге имён
injectPostHocResolutionRuleAnalyzer, Post-hoc batchПосле основной резолюции
injectCheckRuleAnalyzer, Check batchВалидация перед оптимизацией
injectOptimizerRuleOptimizerПосле анализа, перед физическим планированием
injectParserParserПри парсинге SQL-текста
injectPlannerStrategySparkPlannerПри выборе физических операторов
injectColumnarPost-planningПосле физического планирования (columnar правила)
injectQueryStagePrepRuleAQEПеред каждой AQE-стадией

Анатомия Rule[LogicalPlan]

Каждое правило — это объект, реализующий трейт Rule[LogicalPlan]:

// org.apache.spark.sql.catalyst.rules.Rule
abstract class Rule[TreeType <: TreeNode[_]] {
  val ruleName: String = ...

  def apply(plan: TreeType): TreeType
}

Метод apply принимает план-дерево и возвращает (возможно, изменённое) план-дерево. Optimizer применяет правила итеративно до фиксированной точки — пока apply возвращает тот же план (по структуре).

Для трансформации дерева используются методы TreeNode:

// Трансформация сверху вниз (pre-order): правило применяется к узлу, потом к его детям
plan.transformDown {
  case Filter(condition, child) =>
    // Трансформируем: если условие всегда истинно -- убираем Filter
    if (condition == Literal.TrueLiteral) child else Filter(condition, child)
}

// Трансформация снизу вверх (post-order): сначала дети, потом родитель
plan.transformUp {
  case Project(Nil, child) => child  // Пустой проект -- убираем
}

// Трансформация только выражений внутри узлов (не рекурсивно по плану)
plan.transformAllExpressions {
  case Add(Literal(a: Int, _), Literal(b: Int, _)) =>
    Literal(a + b)  // Constant folding
}
WARNING

transformDown vs transformUp — это не просто порядок обхода. Если ваше правило вводит НОВЫЕ узлы, которые сами должны быть трансформированы этим же правилом, используйте transformDown: родительский узел будет обработан, его замена попадёт в обход, и будет обработана снова. transformUp этого не делает — новые узлы в родительской позиции уже не обходятся.

Пример 1: правило оптимизатора — Redundant Distinct Elimination

Напишем правило, которое убирает двойной DISTINCT:

// SELECT DISTINCT * FROM (SELECT DISTINCT * FROM t) -- двойной DISTINCT бессмысленен
package com.example.rules

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.SparkSession

object EliminateRedundantDistinct extends Rule[LogicalPlan] {

  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
    // Distinct(Distinct(child)) -> Distinct(child)
    case d @ Distinct(inner @ Distinct(_)) =>
      logDebug(s"Eliminating redundant Distinct: $d")
      inner  // Убираем внешний Distinct -- он ничего не добавляет

    // Project над Distinct, если проект не меняет строки -- Distinct снизу избыточен
    // Это осторожный случай: только если Project -- pure column references
    case project @ Project(projectList, Distinct(child))
        if projectList.forall(_.isInstanceOf[AttributeReference]) =>
      Distinct(Project(projectList, child))  // Опускаем Distinct под Project
  }
}

Регистрируем:

val spark = SparkSession.builder()
  .withExtensions(ext =>
    ext.injectOptimizerRule(_ => EliminateRedundantDistinct)
  )
  .getOrCreate()

// Проверяем
spark.sql("SELECT DISTINCT * FROM (SELECT DISTINCT id, name FROM users)")
     .explain("extended")
// В Optimized Logical Plan должен быть только один Distinct

Пример 2: правило анализатора — автоматический tenant filter

Реальный production-паттерн: мультитенантная система, где каждый запрос должен автоматически получать фильтр WHERE tenant_id = current_tenant(). Без этого правила каждый разработчик должен помнить добавить фильтр вручную.

package com.example.rules

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.SparkSession

// Правило АНАЛИЗАТОРА (injectResolutionRule) -- работает с Analyzed LogicalPlan
// Важно: к моменту вызова имена таблиц/колонок уже резолвены
class TenantIsolationRule(spark: SparkSession) extends Rule[LogicalPlan] {

  // Таблицы, к которым применяем tenant isolation
  private val tenantTables = Set("orders", "customers", "transactions")

  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
    // SubqueryAlias появляется когда Analyzer связывает имя таблицы с её Relation
    case relation @ SubqueryAlias(id, _)
        if tenantTables.contains(id.name.toLowerCase) &&
           !hasExistingTenantFilter(relation) =>

      // Получаем текущий tenant из конфигурации или SparkContext
      val currentTenant = getCurrentTenant()

      // Находим атрибут tenant_id в схеме relation
      val tenantIdAttr = relation.output.find(_.name == "tenant_id").getOrElse {
        throw new AnalysisException(
          s"Table ${id.name} does not have tenant_id column, but TenantIsolationRule is active"
        )
      }

      // Добавляем Filter поверх relation
      Filter(
        EqualTo(tenantIdAttr, Literal(currentTenant)),
        relation
      )
  }

  // Проверяем, нет ли уже фильтра по tenant_id
  // Это предотвращает двойное добавление при итеративном применении правила
  private def hasExistingTenantFilter(plan: LogicalPlan): Boolean = plan.find {
    case Filter(EqualTo(attr: AttributeReference, _), _)
        if attr.name == "tenant_id" => true
    case _ => false
  }.isDefined

  private def getCurrentTenant(): String =
    spark.conf.getOption("app.currentTenant")
         .getOrElse(throw new IllegalStateException("No current tenant configured"))
}
NOTE

resolveOperatorsDown (в отличие от transformDown) применяется только к уже resolved операторам и пропускает unresolved. Это важно в Analyzer-правилах: если вы случайно обращаетесь к атрибутам unresolved-узла, вы получите NullPointerException или некорректный план. Всегда используйте resolveOperatorsDown в правилах анализатора.

Пример 3: кастомный парсер — VACUUM WITH STATS

injectParser позволяет расширять SQL-синтаксис. Это мощный инструмент: так Delta Lake добавляет OPTIMIZE, VACUUM, RESTORE TO VERSION, DESCRIBE HISTORY.

package com.example.parser

import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.SparkSession

// LogicalPlan для нашей новой команды
case class VacuumWithStats(
  tableName: String,
  retentionHours: Int,
  collectStats: Boolean
) extends LogicalPlan {
  override def output: Seq[Attribute] = Nil
  override def children: Seq[LogicalPlan] = Nil
}

class ExtendedSqlParser(
  session: SparkSession,
  delegate: ParserInterface  // Стандартный парсер Spark
) extends ParserInterface {

  override def parsePlan(sqlText: String): LogicalPlan = {
    // Пробуем распарсить нашим синтаксисом
    if (sqlText.trim.toUpperCase.startsWith("VACUUM")) {
      parseVacuumStatement(sqlText)
    } else {
      // Всё остальное -- делегируем стандартному парсеру
      delegate.parsePlan(sqlText)
    }
  }

  private def parseVacuumStatement(sql: String): LogicalPlan = {
    // VACUUM tableName [RETAIN n HOURS] [WITH STATS]
    val regex = """VACUUM\s+(\w+)(?:\s+RETAIN\s+(\d+)\s+HOURS)?(\s+WITH\s+STATS)?""".r
    sql.trim.toUpperCase match {
      case regex(table, retentionStr, withStats) =>
        VacuumWithStats(
          tableName = table.toLowerCase,
          retentionHours = Option(retentionStr).map(_.toInt).getOrElse(168), // 7 дней
          collectStats = withStats != null
        )
      case _ =>
        throw new ParseException(s"Cannot parse VACUUM statement: $sql")
    }
  }

  // Все остальные методы делегируем стандартному парсеру
  override def parseExpression(sqlText: String) = delegate.parseExpression(sqlText)
  override def parseTableIdentifier(sqlText: String) = delegate.parseTableIdentifier(sqlText)
  override def parseFunctionIdentifier(sqlText: String) = delegate.parseFunctionIdentifier(sqlText)
  override def parseMultipartIdentifier(sqlText: String) = delegate.parseMultipartIdentifier(sqlText)
  override def parseTableSchema(sqlText: String) = delegate.parseTableSchema(sqlText)
  override def parseDataType(sqlText: String) = delegate.parseDataType(sqlText)
  override def parseQuery(sqlText: String) = delegate.parseQuery(sqlText)
}

Регистрация и использование:

val spark = SparkSession.builder()
  .withExtensions(ext =>
    ext.injectParser((session, delegate) =>
      new ExtendedSqlParser(session, delegate)
    )
  )
  .getOrCreate()

// Теперь работает наш синтаксис
spark.sql("VACUUM myTable RETAIN 24 HOURS WITH STATS")
// Возвращает VacuumWithStats -- нужно ещё добавить executor через injectPlannerStrategy

Жизненный цикл правила в Optimizer

Понимание того, как Optimizer применяет правила, критично для написания корректных правил.

Optimizer выполняет batches (группы правил):

Batch "Finish Analysis"        -- однократный проход (maxIterations=1)
  ├── EliminateResolvedHint
  ├── EliminateSubqueryAliases
  └── ...

Batch "Union"                  -- однократный
  └── CombineUnions

Batch "Operator Optimization"  -- до фиксированной точки (maxIterations=100)
  ├── NullPropagation
  ├── ConstantFolding
  ├── LikeSimplification
  ├── BooleanSimplification
  ├── SimplifyConditionals
  ├── ... (десятки встроенных правил)
  └── [ваши injectOptimizerRule правила добавляются ЗДЕСЬ]

Batch "Early Filter and Projection Push-Down"  -- однократный
  ├── SamplePushDown
  ├── PushPredicateThroughNonJoin
  └── ...

Ваше кастомное правило, добавленное через injectOptimizerRule, будет частью batch “Operator Optimization” — того самого, который применяется до фиксированной точки. Это значит: если ваше правило изменило план, Optimizer пройдёт всю пачку правил ещё раз, включая ваше. Это продолжается до тех пор, пока ни одно правило не изменит план.

Место кастомных правил в конвейере Catalyst
ParserSQL-текст парсится стандартным парсером (или вашим через injectParser). Результат -- Unresolved Logical Plan с UnresolvedRelation, UnresolvedAttribute
Unresolved Logical Plan
AnalyzerAnalyzer резолвит имена через Catalog. Сначала Resolution rules (injectResolutionRule), потом Post-hoc rules (injectPostHocResolutionRule), потом Check rules (injectCheckRule)
Analyzed Logical Plan
Optimizer + ваши правилаOptimizer применяет батчи правил. Ваши injectOptimizerRule правила добавляются в batch 'Operator Optimization' -- итеративный, до fixed point. Это основное место для оптимизаций
Optimized Logical Plan
SparkPlanner + ваши StrategySparkPlanner выбирает физические операторы через Strategies. Ваши injectPlannerStrategy стратегии обходят Optimized LogicalPlan и преобразуют его в физические SparkPlan
Physical Plan (SparkPlan)
PrepareForExecutionCollapseCodegenStages объединяет кодогенерируемые операторы. PrepareForExecution финализирует план (InsertAdaptiveSparkPlan для AQE). Ваши injectColumnar и injectQueryStagePrepRule вызываются здесь
Executed Plan

TreeNode API: полный арсенал трансформаций

Для сложных правил важно знать все инструменты TreeNode:

// Найти первый узел, удовлетворяющий условию
plan.find { case _: Filter => true; case _ => false }

// Проверить наличие узла
plan.exists { case _: Sort => true; case _ => false }

// Собрать все узлы определённого типа
val allFilters: Seq[Filter] = plan.collect { case f: Filter => f }

// Найти с контекстом (родительский узел доступен)
plan.collectWithSubqueries { case s: ScalarSubquery => s }

// Трансформировать только поддеревья, без subqueries
plan.transformWithSubqueries {
  case Filter(cond, child) => Filter(simplify(cond), child)
}

// Трансформировать выражения внутри всего дерева
plan.transformAllExpressionsDown {
  case Cast(child, dataType, _, _) if dataType == child.dataType => child
}

// foreach для side effects (логирование, сбор статистики)
plan.foreach {
  case agg: Aggregate => println(s"Found aggregate: ${agg.simpleString(10)}")
  case _ =>
}

Правила анализатора: резолюция custom функций

Ещё один production-паттерн: регистрация кастомных SQL-функций, которые должны быть резолвены особым образом. Например, функция current_tenant(), которая заменяется на конкретное значение из конфигурации:

object ResolveCurrentTenant extends Rule[LogicalPlan] {

  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsDown {
    // UnresolvedFunction появляется когда парсер встречает вызов неизвестной функции
    case UnresolvedFunction(
          Seq("current_tenant"),
          Seq(),
          isDistinct = false,
          filter = None,
          ignoreNulls = false) =>
      // Заменяем на Literal с текущим tenant
      val tenant = SparkEnv.get.conf.get("spark.app.tenant", "default")
      Literal(tenant, StringType)
  }
}

После этого правила SELECT current_tenant() вернёт строку, а не ошибку “Undefined function: current_tenant”.

Риски и ограничения кастомных правил

Кастомные правила — мощный инструмент, но со значительными рисками:

1. Coupling с внутренними классами Spark. LogicalPlan, Filter, Project, Aggregate — это @DeveloperApi или вообще internal. При обновлении Spark они могут измениться. Реальный случай: Spark 3.4 переименовал GlobalLimit в Limit с другой структурой. Правила, которые pattern-matching-ают GlobalLimit, перестали работать молча (план просто не трансформировался).

2. Бесконечные циклы в итеративном Optimizer. Если ваше правило при каждом применении создаёт план, который снова соответствует условию — Optimizer будет его применять снова и снова до достижения maxIterations (100 по умолчанию), после чего выведет WARN и остановится. Пример ошибки:

// БАГ: правило создаёт Filter, который сам соответствует условию
object BuggyRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
    case Filter(cond, child) if !cond.isInstanceOf[And] =>
      // Создаём новый Filter -- он снова соответствует case Filter(...)!
      Filter(And(cond, Literal.TrueLiteral), child)
  }
}
// Optimizer применит это правило 100 раз, добавляя AND TRUE снова и снова

Исправление: добавьте guard-условие, проверяющее что трансформация действительно нужна.

3. Некорректные атрибуты после трансформации. Если ваше правило добавляет новый Project или Filter, новые выражения должны ссылаться на атрибуты из output дочерних узлов. Ссылка на атрибут с неправильным exprId приведёт к AnalysisException: Resolved attribute(s) ... missing from child.

4. Thread-safety. Правило может быть использовано из нескольких потоков (несколько параллельных SparkSession-ов используют один SparkContext). Если правило содержит mutable состояние — это race condition.

5. Стоимость правил. Каждое правило вызывается при каждом запросе. Правило, которое делает RPC-вызов (например, к внешнему сервису авторизации) за 50мс — это 50мс к latency каждого SQL-запроса. Правила должны быть максимально быстрыми: только дерево-трансформации, никаких I/O.

Отладка кастомных правил

// Включить детальное логирование Optimizer
spark.conf.set("spark.sql.optimizer.excludedRules", "")  // не исключать правила
spark.conf.set("spark.sql.planChangeLog.level", "WARN")
// Теперь в логах видно каждое изменение плана с указанием правила

// Посмотреть план на каждой стадии
val df = spark.sql("SELECT DISTINCT name FROM (SELECT DISTINCT name FROM users)")
println("=== Unresolved ===")
println(df.queryExecution.logical)

println("=== Analyzed ===")
println(df.queryExecution.analyzed)

println("=== Optimized ===")
println(df.queryExecution.optimizedPlan)

println("=== Physical ===")
println(df.queryExecution.executedPlan)

// Проверить, что ваше правило применилось
// Ищем в Optimized план отсутствие двойного Distinct

Попробуй сам

Реализуем простое, но полезное правило: замена COUNT(DISTINCT col) на APPROX_COUNT_DISTINCT(col) при наличии конфигурационного флага. Это реальный паттерн для аналитики, где точный подсчёт не нужен, но скорость критична:

# Пишем правило на Python через py4j (для демонстрации -- в prod лучше Scala)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("catalyst-rules-demo") \
    .master("local[*]") \
    .getOrCreate()

# Создаём тестовые данные
import random
data = [(i, f"user_{i % 100}", random.choice(["A", "B", "C"]), float(i % 1000))
        for i in range(10000)]
df = spark.createDataFrame(data, ["id", "name", "category", "amount"])
df.createOrReplaceTempView("events")

# Запрос с COUNT DISTINCT
q1 = spark.sql("SELECT category, COUNT(DISTINCT name) as unique_users FROM events GROUP BY category")

# Посмотрим на оптимизированный план
print("=== План с COUNT DISTINCT ===")
q1.explain("formatted")

# В плане увидим:
# Aggregate [category#1], [category#1, count(distinct name#2) AS unique_users#5L]

# Теперь эквивалентный запрос с APPROX_COUNT_DISTINCT
q2 = spark.sql("SELECT category, APPROX_COUNT_DISTINCT(name) as unique_users FROM events GROUP BY category")

print("=== План с APPROX_COUNT_DISTINCT ===")
q2.explain("formatted")

# APPROX_COUNT_DISTINCT использует HLL (HyperLogLog) -- один pass без sort
# Для миллионов уникальных значений это в 10-100x быстрее

# Замеряем разницу
import time

start = time.time()
q1.collect()
print(f"COUNT DISTINCT: {time.time() - start:.2f}s")

start = time.time()
q2.collect()
print(f"APPROX_COUNT_DISTINCT: {time.time() - start:.2f}s")

# На больших данных с высокой кардинальностью разница значительна
# (до 10x) потому что COUNT DISTINCT требует sort + hash для точного подсчёта

Для тестирования правила на уровне ScalaTest:

class MyOptimizerRuleSpec extends QueryTest with SharedSparkSession {

  test("EliminateRedundantDistinct removes double distinct") {
    withExtensions(ext => ext.injectOptimizerRule(_ => EliminateRedundantDistinct)) {
      val df = spark.sql("SELECT DISTINCT * FROM (SELECT DISTINCT id, name FROM users)")
      val plan = df.queryExecution.optimizedPlan

      // В оптимизированном плане должен быть только один Distinct
      val distinctNodes = plan.collect { case d: Distinct => d }
      assert(distinctNodes.length == 1,
        s"Expected 1 Distinct but found ${distinctNodes.length}: $plan")
    }
  }
}
TIP

При разработке правила включайте spark.sql.planChangeLog.level=TRACE — это покажет каждое изменение плана с именем правила, которое его вызвало. Это незаменимо когда несколько правил взаимодействуют неожиданным образом. После отладки уровень нужно снизить — TRACE генерирует гигабайты логов при нагрузке.

Spark SQL Catalog: как работает резолюция имён
Проверка знанийKnowledge check
Вы написали правило оптимизатора, которое через injectOptimizerRule добавляет Filter с предикатом безопасности к каждому Scan. После деплоя вы замечаете: первые запросы работают корректно, но через несколько часов работы кластера часть запросов перестаёт применять фильтр безопасности. При перезапуске SparkSession всё снова работает нормально. Что вероятнее всего происходит и как это исправить?
ОтветAnswer
Наиболее вероятная причина: правило содержит mutable состояние, которое изменяется между вызовами. Например, кэш уже обработанных таблиц (Set[String]) без синхронизации, который в условиях concurrent SparkSession'ов заполняется некорректно -- последующие вызовы видят таблицу как "уже обработанную" и пропускают добавление фильтра. Второй вариант: правило читает tenant/user из ThreadLocal или SparkContext.localProperties, которые в executor-thread теряются. Третий: правило мутирует один экземпляр LogicalPlan (shared между запросами через Spark SQL кэш), добавляя фильтр один раз и потом считая план "уже обработанным". Диагностика: включить planChangeLog.level=DEBUG и найти запросы, в которых правило не сработало -- сравнить логи с успешными запросами. Исправление: 1) Сделать правило stateless -- никакого mutable состояния на уровне объекта правила. 2) Идемпотентность: правило должно добавлять фильтр даже если оно уже было применено к этому logical plan object -- проверка через hasExistingSecurityFilter. 3) Если состояние нужно -- хранить его в SparkContext.localProperties (thread-bound к driver-thread текущего запроса).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Правило добавлено через injectOptimizerRule. При каком условии Optimizer перестанет применять это правило к плану?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5