Whole-Stage CodeGen: Фузия операторов
Проблема: Volcano Iterator Model
До появления Whole-Stage CodeGen (Spark < 2.0), Spark использовал классическую Volcano Iterator Model — каждый оператор в плане запроса реализовывал интерфейс Iterator с методом next():
Каждый вызов next() возвращал одну строку, передавая её вверх по цепочке. Проблемы этого подхода:
- Виртуальные вызовы — каждый
next()— это виртуальный вызов через vtable, который JVM не может инлайнить - Boxing/Unboxing — примитивные типы (
int,long) оборачиваются в объекты (Integer,Long) при передаче между операторами - Branch misprediction — переключение между операторами сбивает branch predictor CPU
- Нет loop fusion — каждый оператор работает в своём цикле, данные не остаются в CPU cache
Для 1 миллиарда строк с 5 операторами — это 5 миллиардов виртуальных вызовов и столько же boxing/unboxing операций.
Решение: Whole-Stage Code Generation
Whole-Stage CodeGen (WSCG) — технология, которая объединяет (fuses) несколько операторов в один Java-метод. Вместо цепочки next() вызовов генерируется единый цикл, который обрабатывает данные без межоператорных вызовов.
Как это работает
- Catalyst анализирует Physical Plan и находит pipeline — последовательность операторов, которые можно объединить
- Генерируется Java-код, реализующий весь pipeline в одном методе
- Код компилируется через Janino — легковесный Java compiler (не javac)
- Скомпилированный байткод выполняется напрямую
Пример: до и после CodeGen
Без WSCG (Volcano Model):
// Каждый оператор — отдельный объект с виртуальным next()
while (scan.hasNext()) {
Row row = scan.next(); // виртуальный вызов
Row filtered = filter.next(row); // виртуальный вызов
if (filtered != null) {
Row projected = project.next(filtered); // виртуальный вызов
aggregate.add(projected); // виртуальный вызов
}
}
С WSCG (сгенерированный код):
// Один метод, без виртуальных вызовов
while (scan_hasNext) {
long age = scan_getLong(2); // прямой доступ, без boxing
if (age > 30) { // inline filter
String name = scan_getString(1); // прямой доступ
agg_buffer[hash(name)] += 1; // inline aggregate
}
}
Результат: нет виртуальных вызовов, нет boxing, данные остаются в CPU registers — ускорение в 10x+ для вычислительно-интенсивных операций.
Janino: почему не javac?
Spark использует Janino — встроенный Java compiler, который работает в процессе JVM:
| Аспект | javac | Janino |
|---|---|---|
| Процесс | Внешний процесс | В том же JVM |
| Скорость компиляции | 100+ мс | 1-10 мс |
| Зависимости | JDK installation | Lightweight JAR |
| Runtime overhead | Fork process | Inline compilation |
Janino компилирует сгенерированный Java-код в байткод за миллисекунды, что критично для интерактивных запросов.
Чтение explain(): asterisk-операторы
В выводе explain() операторы с Whole-Stage CodeGen помечены звёздочкой (*):
df = spark.sql("""
SELECT dept_id, COUNT(*) as cnt, AVG(age) as avg_age
FROM employees
WHERE age > 30
GROUP BY dept_id
""")
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- *HashAggregate(keys=[dept_id#7], functions=[count(1), avg(age#6)])
+- Exchange hashpartitioning(dept_id#7, 200), ENSURE_REQUIREMENTS, [plan_id=55]
+- *HashAggregate(keys=[dept_id#7], functions=[partial_count(1), partial_avg(age#6)])
+- *Filter (isnotnull(age#6) AND (age#6 > 30))
+- *LocalTableScan [age#6, dept_id#7]
Разберем, что здесь происходит:
*Filterи*LocalTableScan— объединены в один codegen pipeline (фильтр + чтение)*HashAggregate(partial) — частичная агрегация до shuffle, тоже в codegen pipelineExchange— shuffle (перераспределение поdept_id). Без звёздочки — WSCG не может пересечь shuffle boundary*HashAggregate(final) — финальная агрегация после shuffle, новый codegen pipeline
Граница CodeGen: Shuffle Boundary. Whole-Stage CodeGen не может объединить операторы через shuffle boundary. Shuffle требует материализации данных на диск и перераспределения между executors — это физический барьер для фузии операторов. Поэтому в explain() вы увидите два отдельных pipeline: до и после Exchange.
Другие границы CodeGen
Помимо shuffle, WSCG не может объединить:
- BroadcastExchange — отправка данных на все executors
- Sort в некоторых конфигурациях (зависит от размера данных)
- Операторы с UDF — пользовательские функции (Python UDF, Java UDF) прерывают codegen pipeline
# UDF прерывает codegen — избегайте, если возможно
from pyspark.sql.functions import udf
@udf("string")
def custom_transform(name):
return name.upper()
# Этот pipeline НЕ будет полностью в codegen
df.filter(df.age > 30).withColumn("upper_name", custom_transform(df.name))
Используйте встроенные функции Spark (upper(), lower(), concat()) вместо UDF. Встроенные функции интегрированы в codegen pipeline и работают на порядок быстрее.
Конфигурация WSCG
# Whole-Stage CodeGen включен по умолчанию (с Spark 2.0)
spark.conf.get("spark.sql.codegen.wholeStage") # true
# Максимальное количество полей для codegen
# Если в строке > 100 полей, codegen отключается
spark.conf.get("spark.sql.codegen.maxFields") # 100
# Максимальный размер сгенерированного кода (8KB по умолчанию)
spark.conf.get("spark.sql.codegen.hugeMethodLimit") # 8000
Spark3.5+ Whole-Stage CodeGen стабилен и включен по умолчанию с Spark 2.0. В Spark 4.0 GA (2025) и 4.1 (текущая) он поддерживает все основные операторы: Filter, Project, HashAggregate, SortMergeJoin, BroadcastHashJoin, Sort. Поведение совместимо со Spark 3.5 LTS.
Анти-паттерн: отключение CodeGen
# НЕ делайте этого в production!
spark.conf.set("spark.sql.codegen.wholeStage", "false")
Отключение WSCG оправдано только для отладки: чтобы сравнить производительность с и без codegen, или чтобы изолировать проблему в сгенерированном коде. В production отключение codegen может замедлить запросы в 10x.
Влияние на производительность
| Сценарий | Без WSCG | С WSCG | Ускорение |
|---|---|---|---|
| Scan + Filter (1B rows) | 45 сек | 4 сек | 11x |
| Aggregate (100M groups) | 120 сек | 15 сек | 8x |
| Join + Filter + Project | 90 сек | 12 сек | 7.5x |
| Scan only (columnar) | 30 сек | 2 сек | 15x |
Наибольшее ускорение на columnar scans (Parquet/ORC), где codegen генерирует vectorized чтение.
Итоги модуля: Catalyst + Tungsten
За 6 уроков мы прошли полный путь запроса в Spark:
- Catalyst Optimizer трансформирует SQL/DataFrame в оптимизированный план через 5 стадий
- Analyzer разрешает ссылки через Catalog
- Optimizer применяет правила (predicate pushdown, column pruning, constant folding, CBO)
- SparkPlanner выбирает физические стратегии (BroadcastHashJoin vs SortMergeJoin)
- Tungsten хранит данные в компактном бинарном формате (UnsafeRow), минимизируя GC
- Whole-Stage CodeGen объединяет операторы в единый Java-метод, устраняя overhead Volcano Model
Понимание этих механизмов — основа для чтения explain() планов и диагностики медленных запросов.