Модель вычисления RDD
Понимание того, как Spark исполняет RDD — не «запускает job», а именно физически вычисляет партицию на executor-е — это необходимое условие для написания эффективного PySpark-кода и для понимания того, почему whole-stage codegen дает такой выигрыш по сравнению с классическими RDD-операциями.
Iterator-based compute model
Базовая модель вычисления RDD — это цепочка ленивых итераторов (iterator pipeline). Когда executor получает Task, он вызывает rdd.iterator(partition, taskContext). Этот метод либо возвращает кешированные данные из BlockManager, либо вызывает rdd.compute(partition, taskContext), которая возвращает Iterator[T].
Ключевое свойство: Iterator[T] — ленивый. Данные не читаются и не вычисляются до вызова next(). Это означает, что вся цепочка трансформаций над RDD образует стек вложенных итераторов:
Task.runTask()
└── finalRDD.iterator(partition, ctx) // RDD N
└── rdd_N_minus_1.iterator(...) // RDD N-1
└── ...
└── rdd_2.iterator(...) // RDD 2
└── sourceRDD.compute(partition, ctx) // Source: читает HDFS
Когда ShuffleWriter (в ShuffleMapTask) или ResultTask.runTask вызывает next() на верхнем итераторе, запрос «падает вниз» по стеку до источника, который возвращает один элемент. Этот элемент проходит через все трансформации снизу вверх и выдаётся наружу.
// Псевдокод ResultTask.runTask (упрощённо):
override def runTask(context: TaskContext): U = {
val func = job.func // finalize function (e.g., collect)
val (rdd, dep) = ser.deserialize[...] // десериализуем RDD из plan
func(context, rdd.iterator(partition, context))
// rdd.iterator возвращает ленивый Iterator
// func потребляет его элемент за элементом
// данные НИКОГДА не материализуются полностью в памяти
}
Из этого следует важное следствие: peak memory для одной партиции RDD — это максимальный размер данных в одном «буфере» в цепочке, а не весь размер партиции. Для map и filter это буквально один элемент. Для groupByKey — вся группа (именно поэтому groupByKey опасен: итератор materialize все значения группы в память).
map vs mapPartitions: разница в гранулярности
map(f) и mapPartitions(f) — внешне похожи, но mapPartitions передаёт функции весь итератор партиции, а не один элемент. Это принципиально для случаев, когда операция имеет overhead на инициализацию:
# map: f вызывается для каждого элемента
# overhead per element: один вызов Python callable
rdd.map(lambda x: x * 2)
# mapPartitions: f вызывается один раз на партицию, получает Iterator
# можно инициализировать дорогой ресурс один раз на партицию
def process_partition(iterator):
# Инициализация дорогого ресурса один раз на партицию
db_conn = DatabaseConnection(host="db.internal", port=5432)
model = load_ml_model("s3://models/v1.pkl") # 500MB модель
try:
for record in iterator:
yield model.predict(record, db_conn) # используем ресурс
finally:
db_conn.close() # гарантированное закрытие
rdd.mapPartitions(process_partition)
Если бы использовался map, load_ml_model вызывался бы для каждого элемента — это катастрофически медленно для 500MB модели. mapPartitions инициализирует модель один раз и использует её для всех элементов партиции.
Разница в накладных расходах:
map(f): [f(e1), f(e2), ..., f(eN)] -- N вызовов f
mapPartitions(f): f([e1, e2, ..., eN]) -- 1 вызов f, f возвращает Iterator
На уровне JVM mapPartitions создаёт один MapPartitionsRDD, функция которого принимает Iterator[T] и возвращает Iterator[U]. Это один вызов f на Task, а не N. Для JVM это важно: меньше объектов в heap (один closure-объект вместо N), меньше давление на GC.
Практическое правило: используй mapPartitions когда: (1) нужно инициализировать соединение с базой или загрузить модель ML, (2) операция имеет setup-overhead более 1мс на элемент, (3) нужен доступ ко всем элементам партиции для batch-обработки. Используй map когда операция чистая (pure function) и stateless — Spark сможет оптимально управлять памятью.
mapPartitionsWithIndex: нужен номер партиции
mapPartitionsWithIndex(f, preservesPartitioning) передаёт функции не только итератор, но и индекс партиции. Используется для:
- Debugging: какая именно партиция обрабатывается.
- Skewed processing: разная логика для разных партиций (например, partition 0 читает master-данные).
- Диагностики data skew: подсчёт элементов по партициям.
# Диагностика data skew: сколько элементов в каждой партиции?
def count_per_partition(idx, iterator):
count = sum(1 for _ in iterator)
yield (idx, count)
partition_counts = rdd.mapPartitionsWithIndex(count_per_partition)
counts = partition_counts.collect()
print(sorted(counts, key=lambda x: x[1], reverse=True)[:10])
# Топ-10 самых больших партиций
# Пример: [(42, 50000), (17, 48000), (99, 2), ...]
# Если разброс > 10x — data skew!
Как Catalyst компилирует DataFrame в RDD[InternalRow]
Это центральный вопрос всего урока. DataFrame не «обходит» RDD — он компилируется в граф RDD. Точнее, в RDD[InternalRow], где InternalRow — это binary-encoded строка в формате Tungsten (UnsafeRow).
Путь от DataFrame-операции до RDD:
df.filter(col("age") > 30).groupBy("city").count()
|
v
Catalyst: Unresolved Plan -> Analyzed Plan -> Optimized Plan -> Physical Plan
|
v
Physical Plan: Project -> Filter -> HashAggregate -> Exchange -> HashAggregate -> FileScan
|
v
SparkPlan.execute(): каждый оператор вызывает doExecute()
|
v
RDD[InternalRow] граф:
FileScanRDD -> MapPartitionsRDD (filter) -> MapPartitionsRDD (partial-agg)
-> ShuffledRowRDD -> MapPartitionsRDD (final-agg)
Посмотрим на конкретный физический оператор FilterExec:
// sql/core/src/main/scala/org/apache/spark/sql/execution/FilterExec.scala
case class FilterExec(condition: Expression, child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputRows = longMetric("numInputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
// Создаём интерпретируемый predicate для этой партиции
val predicate = Predicate.create(condition, child.output)
predicate.initialize(index) // инициализация UDF state, если есть
iter.filter { row =>
numInputRows += 1
val result = predicate.eval(row)
if (result) numOutputRows += 1
result
}
}
}
// ... также реализует CodegenSupport для whole-stage codegen
}
child.execute() — это doExecute() дочернего плана (например, FileScan). Результат — RDD[InternalRow]. Поверх него mapPartitionsWithIndexInternal (это тот же mapPartitions, но с индексом) применяет фильтр. Итого: два RDD, OneToOneDependency.
Когда executor запускает Task для Stage с FileScan -> Filter -> Project, он вызывает цепочку итераторов. Но в реальности на уровне DataFrame с включённым whole-stage codegen этого не происходит.
Whole-stage codegen: обход классических RDD-итераторов
Whole-stage codegen (WSCG) — это оптимизация, которая генерирует один Java-метод вместо цепочки вложенных итераторов для операторов одного Stage.
Представим физический план для scan -> filter -> project:
Без WSCG (классические RDD-итераторы):
for each row in ProjectIterator.next():
row = FilterIterator.next()
if predicate(FilterIterator.sourceIterator.next()):
return row
Накладные расходы:
- Виртуальный вызов next() для каждого итератора
- Boxing/unboxing примитивных типов (Int -> Integer)
- JVM virtual dispatch для каждого predicate
С WSCG (generated code):
// Один tight loop, один метод, inline все операции:
for (int i = 0; i < batch.numRows(); i++) {
int age = batch.getInt(i, AGE_ORDINAL); // direct memory access
if (age > 30) {
// прямая запись в output batch
outputBatch.setInt(outputRow, CITY_ORDINAL, batch.getInt(i, CITY_ORDINAL));
outputRow++;
}
}
Преимущества:
- Нет виртуальных вызовов
- Нет boxing
- CPU-friendly tight loop (branch predictor работает хорошо)
- Vectorizable (JIT может применить SIMD)
Это реализуется через интерфейс CodegenSupport:
trait CodegenSupport extends SparkPlan {
// Возвращает Java-код для consume() - обработки одной строки
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
// Возвращает Java-код для produce() - итерации над данными
def doProduce(ctx: CodegenContext): String
}
Операторы, реализующие CodegenSupport, не используют классический doExecute() в режиме WSCG. Вместо этого WholeStageCodegenExec (обёртка верхнего уровня) генерирует код всего Stage и компилирует его через Janino (Java-компилятор в runtime). Код кешируется в CodeCache по JVM — повторные выполнения одного плана не перекомпилируют.
// WholeStageCodegenExec.doExecute()
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// Compile generated Java code via Janino
val references = ctx.references.toArray
val (clazz, _) = CodeGenerator.compile(cleanedSource)
// Создаём RDD, где каждый Task выполняет скомпилированный метод
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
override def hasNext: Boolean = buffer.hasNext
override def next: InternalRow = buffer.next()
}
}
}
Обратите внимание: child.execute().mapPartitionsWithIndexInternal — WSCG всё ещё создаёт RDD! Но этот RDD содержит один mapPartitions с скомпилированным tight loop вместо цепочки итераторов. Это и есть «обход»: не полное избавление от RDD, а устранение итераторного стека внутри одного mapPartitions.
InternalRow: что происходит с данными внутри
InternalRow — абстрактный базовый класс для строки данных внутри Spark SQL. Два конкретных представления:
GenericInternalRow: массив Java-объектовArray[Any]. Медленно (boxing), но удобно для тестов и небольших данных.UnsafeRow: непрерывный блок байт в памяти Tungsten. Нет boxing, нет GC overhead, сериализуется в/из shuffle без копирования.
В production весь DataFrame pipeline работает с UnsafeRow. UnsafeRow.get(ordinal, dataType) читает значение напрямую из памяти по смещению:
// UnsafeRow.getInt(ordinal):
def getInt(ordinal: Int): Int = {
Platform.getInt(baseObject, baseOffset + ordinal * 8L + bitSetWidthInBytes)
// прямой доступ к памяти через sun.misc.Unsafe
// ordinal * 8L = смещение фиксированного поля
// +bitSetWidthInBytes = после null bitmap
}
Именно поэтому Platform.getInt с известным offset работает быстрее, чем HashMap.get(columnName) или Row.getAs[Int]("column") — нет lookup по имени, нет boxing.
В generated WSCG-коде обращение к колонке выглядит как:
// Сгенерированный Java-код для filter age > 30:
boolean isNull_0 = i.isNullAt(1); // колонка age ordinal=1
int value_0 = isNull_0 ? -1 : (i.getInt(1)); // direct memory access
boolean result = (!isNull_0) && (value_0 > 30);
Это буквально inline-код без виртуальных вызовов. JIT компилирует это в машинный код с прямым memory read.
RDD API в pipeline: PySpark overhead
Есть один важный нюанс для PySpark: когда вы используете PySpark RDD API (rdd.map, rdd.filter, rdd.mapPartitions), данные сериализуются из JVM в Python и обратно. Это Pickle или CloudPickle сериализация.
JVM executor Python worker
| |
| --- serialize (Pickle) ----------> |
| f(data) -- ваша Python функция
| <-- serialize (Pickle) ---------- |
| |
Это означает:
- Накладные расходы сериализации/десериализации для каждой партиции.
- Python GIL ограничивает параллелизм внутри executor.
DataFrame.filter(col("age") > 30)— JVM-only, никакого Python.rdd.filter(lambda row: row["age"] > 30)— Python round-trip per row.
Вот почему DataFrame API с built-in функциями (col(), lit(), when(), функции из pyspark.sql.functions) в production существенно быстрее PySpark RDD. Для пользовательских функций — использовать pandas_udf (Apache Arrow serialization) вместо обычных udf.
Смешивание DataFrame и RDD API создаёт неочевидные границы сериализации. Паттерн df.rdd.map(python_fn).toDF() форсирует полный round-trip через Python даже если python_fn делает простую трансформацию. Catalyst не видит внутренности Python-замыкания и не может оптимизировать. В production этот паттерн нужно заменять встроенными функциями Spark SQL или pandas_udf.
Что смотреть в Spark UI
Для диагностики модели вычисления:
-
Вкладка Stages — время CPU vs shuffle read/write. Если CPU время маленькое, а shuffle время большое — проблема в партиционировании или лишних shuffle.
-
DAG Visualization — число Stage, наличие лишних Exchange (shuffle). Каждый Exchange — потенциальная область оптимизации.
-
Tasks metrics —
Input Size / Records,Shuffle Read Size. Выбросы (одна task работает 10x дольше остальных) — data skew. -
SQL tab — Plan с аннотациями WSCG. Операторы в
*(N)— whole-stage codegen. Операторы без звёздочки — интерпретируемый режим (например, Python UDF, некоторые сложные expressions).
# Включение расширенного explain для WSCG диагностики
df.explain("codegen")
# Показывает сгенерированный Java-код для каждого codegen-stage
# Ищите: tight for-loop, прямые getInt/getLong вызовы = хороший WSCG
# Ищите: Iterator chains = WSCG не смог применить
Попробуй сам
Сравнение производительности: RDD vs DataFrame, map vs mapPartitions:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
spark = SparkSession.builder \
.appName("compute-model") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
# Данные: 10 миллионов записей
n = 10_000_000
rdd_data = sc.parallelize(range(n), 100)
# Бенчмарк 1: RDD map с lambda (Python round-trip per element)
start = time.time()
result1 = rdd_data.map(lambda x: (x % 1000, x * 2)).filter(lambda kv: kv[1] > 5000).count()
t1 = time.time() - start
# Бенчмарк 2: RDD mapPartitions (один Python call per partition)
def process_partition(iterator):
for x in iterator:
v = x * 2
if v > 5000:
yield (x % 1000, v)
start = time.time()
result2 = rdd_data.mapPartitions(process_partition).count()
t2 = time.time() - start
# Бенчмарк 3: DataFrame с built-in функциями (JVM-only, WSCG)
df = spark.range(n).toDF("id")
start = time.time()
result3 = df.withColumn("val", col("id") * 2) \
.filter(col("val") > 5000) \
.groupBy(col("id") % 1000) \
.count() \
.count()
t3 = time.time() - start
print(f"RDD map+filter: {t1:.2f}s")
print(f"RDD mapPartitions: {t2:.2f}s (ускорение: {t1/t2:.1f}x)")
print(f"DataFrame (WSCG): {t3:.2f}s (ускорение: {t1/t3:.1f}x)")
# Посмотрим план DataFrame
df_plan = df.withColumn("val", col("id") * 2).filter(col("val") > 5000)
df_plan.explain("formatted")
# Обратите внимание на *(N) -- whole-stage codegen operators
Ожидаемые результаты: mapPartitions быстрее map на 20-40% за счёт сокращения Python overhead. DataFrame с WSCG быстрее RDD map в 3-10x за счёт JVM-native execution без Python serialization.