Learning Platform
Глоссарий Troubleshooting
Урок 07.06 · 18 мин
Продвинутый
CodeGenWhole-StageJaninoOperator FusionVolcano Model

Whole-Stage CodeGen: Фузия операторов

Проблема: Volcano Iterator Model

До появления Whole-Stage CodeGen (Spark < 2.0), Spark использовал классическую Volcano Iterator Model — каждый оператор в плане запроса реализовывал интерфейс Iterator с методом next():

Volcano Iterator Model
Aggregate.next()
next()
Filter.next()
next()
Project.next()
next()
Scan.next()

Каждый вызов next() возвращал одну строку, передавая её вверх по цепочке. Проблемы этого подхода:

  1. Виртуальные вызовы — каждый next() — это виртуальный вызов через vtable, который JVM не может инлайнить
  2. Boxing/Unboxing — примитивные типы (int, long) оборачиваются в объекты (Integer, Long) при передаче между операторами
  3. Branch misprediction — переключение между операторами сбивает branch predictor CPU
  4. Нет loop fusion — каждый оператор работает в своём цикле, данные не остаются в CPU cache

Для 1 миллиарда строк с 5 операторами — это 5 миллиардов виртуальных вызовов и столько же boxing/unboxing операций.

Решение: Whole-Stage Code Generation

Whole-Stage CodeGen (WSCG) — технология, которая объединяет (fuses) несколько операторов в один Java-метод. Вместо цепочки next() вызовов генерируется единый цикл, который обрабатывает данные без межоператорных вызовов.

Как это работает

  1. Catalyst анализирует Physical Plan и находит pipeline — последовательность операторов, которые можно объединить
  2. Генерируется Java-код, реализующий весь pipeline в одном методе
  3. Код компилируется через Janino — легковесный Java compiler (не javac)
  4. Скомпилированный байткод выполняется напрямую

Пример: до и после CodeGen

Без WSCG (Volcano Model):

// Каждый оператор — отдельный объект с виртуальным next()
while (scan.hasNext()) {
    Row row = scan.next();           // виртуальный вызов
    Row filtered = filter.next(row);  // виртуальный вызов
    if (filtered != null) {
        Row projected = project.next(filtered); // виртуальный вызов
        aggregate.add(projected);     // виртуальный вызов
    }
}

С WSCG (сгенерированный код):

// Один метод, без виртуальных вызовов
while (scan_hasNext) {
    long age = scan_getLong(2);        // прямой доступ, без boxing
    if (age > 30) {                    // inline filter
        String name = scan_getString(1); // прямой доступ
        agg_buffer[hash(name)] += 1;   // inline aggregate
    }
}

Результат: нет виртуальных вызовов, нет boxing, данные остаются в CPU registers — ускорение в 10x+ для вычислительно-интенсивных операций.

Janino: почему не javac?

Spark использует Janino — встроенный Java compiler, который работает в процессе JVM:

АспектjavacJanino
ПроцессВнешний процессВ том же JVM
Скорость компиляции100+ мс1-10 мс
ЗависимостиJDK installationLightweight JAR
Runtime overheadFork processInline compilation

Janino компилирует сгенерированный Java-код в байткод за миллисекунды, что критично для интерактивных запросов.

Чтение explain(): asterisk-операторы

В выводе explain() операторы с Whole-Stage CodeGen помечены звёздочкой (*):

df = spark.sql("""
    SELECT dept_id, COUNT(*) as cnt, AVG(age) as avg_age
    FROM employees
    WHERE age > 30
    GROUP BY dept_id
""")

df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- *HashAggregate(keys=[dept_id#7], functions=[count(1), avg(age#6)])
   +- Exchange hashpartitioning(dept_id#7, 200), ENSURE_REQUIREMENTS, [plan_id=55]
      +- *HashAggregate(keys=[dept_id#7], functions=[partial_count(1), partial_avg(age#6)])
         +- *Filter (isnotnull(age#6) AND (age#6 > 30))
            +- *LocalTableScan [age#6, dept_id#7]

Разберем, что здесь происходит:

  • *Filter и *LocalTableScan — объединены в один codegen pipeline (фильтр + чтение)
  • *HashAggregate (partial) — частичная агрегация до shuffle, тоже в codegen pipeline
  • Exchange — shuffle (перераспределение по dept_id). Без звёздочки — WSCG не может пересечь shuffle boundary
  • *HashAggregate (final) — финальная агрегация после shuffle, новый codegen pipeline
WARNING

Граница CodeGen: Shuffle Boundary. Whole-Stage CodeGen не может объединить операторы через shuffle boundary. Shuffle требует материализации данных на диск и перераспределения между executors — это физический барьер для фузии операторов. Поэтому в explain() вы увидите два отдельных pipeline: до и после Exchange.

Другие границы CodeGen

Помимо shuffle, WSCG не может объединить:

  • BroadcastExchange — отправка данных на все executors
  • Sort в некоторых конфигурациях (зависит от размера данных)
  • Операторы с UDF — пользовательские функции (Python UDF, Java UDF) прерывают codegen pipeline
# UDF прерывает codegen — избегайте, если возможно
from pyspark.sql.functions import udf

@udf("string")
def custom_transform(name):
    return name.upper()

# Этот pipeline НЕ будет полностью в codegen
df.filter(df.age > 30).withColumn("upper_name", custom_transform(df.name))
TIP

Используйте встроенные функции Spark (upper(), lower(), concat()) вместо UDF. Встроенные функции интегрированы в codegen pipeline и работают на порядок быстрее.

Конфигурация WSCG

# Whole-Stage CodeGen включен по умолчанию (с Spark 2.0)
spark.conf.get("spark.sql.codegen.wholeStage")  # true

# Максимальное количество полей для codegen
# Если в строке > 100 полей, codegen отключается
spark.conf.get("spark.sql.codegen.maxFields")  # 100

# Максимальный размер сгенерированного кода (8KB по умолчанию)
spark.conf.get("spark.sql.codegen.hugeMethodLimit")  # 8000

Spark3.5+ Whole-Stage CodeGen стабилен и включен по умолчанию с Spark 2.0. В Spark 4.0 GA (2025) и 4.1 (текущая) он поддерживает все основные операторы: Filter, Project, HashAggregate, SortMergeJoin, BroadcastHashJoin, Sort. Поведение совместимо со Spark 3.5 LTS.

Анти-паттерн: отключение CodeGen

# НЕ делайте этого в production!
spark.conf.set("spark.sql.codegen.wholeStage", "false")

Отключение WSCG оправдано только для отладки: чтобы сравнить производительность с и без codegen, или чтобы изолировать проблему в сгенерированном коде. В production отключение codegen может замедлить запросы в 10x.

Влияние на производительность

СценарийБез WSCGС WSCGУскорение
Scan + Filter (1B rows)45 сек4 сек11x
Aggregate (100M groups)120 сек15 сек8x
Join + Filter + Project90 сек12 сек7.5x
Scan only (columnar)30 сек2 сек15x

Наибольшее ускорение на columnar scans (Parquet/ORC), где codegen генерирует vectorized чтение.

Итоги модуля: Catalyst + Tungsten

За 6 уроков мы прошли полный путь запроса в Spark:

  1. Catalyst Optimizer трансформирует SQL/DataFrame в оптимизированный план через 5 стадий
  2. Analyzer разрешает ссылки через Catalog
  3. Optimizer применяет правила (predicate pushdown, column pruning, constant folding, CBO)
  4. SparkPlanner выбирает физические стратегии (BroadcastHashJoin vs SortMergeJoin)
  5. Tungsten хранит данные в компактном бинарном формате (UnsafeRow), минимизируя GC
  6. Whole-Stage CodeGen объединяет операторы в единый Java-метод, устраняя overhead Volcano Model

Понимание этих механизмов — основа для чтения explain() планов и диагностики медленных запросов.

Проверка знанийKnowledge check
Почему Whole-Stage CodeGen не может объединить операторы через shuffle boundary?
ОтветAnswer
Shuffle boundary — это физический барьер: данные материализуются на диск, перераспределяются между executors через сеть и читаются заново. CodeGen объединяет операторы в единый Java-метод, который работает с данными в памяти одного executor. Через shuffle данные покидают executor, поэтому единый метод невозможен — нужны два отдельных pipeline: до shuffle (partial aggregation) и после shuffle (final aggregation). В explain() это видно как два отдельных блока *-операторов, разделённых Exchange.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Что означает символ * (звёздочка) перед оператором в выводе explain()? Например: *(1) Filter (age > 30).

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6