RDD как абстракция: пять свойств
Apache Spark 4.0 устроен как слоёный пирог. На вершине — DataFrame API, Spark SQL, Structured Streaming. В самом низу — RDD[T], абстракция, не менявшая своего контракта со Spark 1.0. Понимание этого контракта — обязательное условие для работы на уровне production-дебаггинга: когда вы читаете физический план через explain("formatted"), вы видите операции, которые в итоге компилируются в RDD-граф. Когда executor падает с OOM, вы должны знать, сколько партиций держит lineage в памяти и почему.
Этот урок — про RDD как математическую абстракцию. Пять обязательных методов, которые любая реализация RDD обязана определить, и что за ними скрывается на уровне JVM.
Что такое RDD
RDD[T] (Resilient Distributed Dataset) — это неизменяемая, партиционированная коллекция элементов типа T, которая умеет пересчитывать свои партиции при сбое. Ключевое слово — «умеет пересчитывать»: RDD сам знает, как восстановить свои данные, потому что хранит ссылки на родительские RDD и функцию вычисления.
Абстрактный базовый класс живёт в org.apache.spark.rdd.RDD (пакет core/src/main/scala/org/apache/spark/rdd/). Любая реализация — MapPartitionsRDD, ShuffledRDD, HadoopRDD, ParallelCollectionRDD — наследует этот класс и переопределяет пять методов.
// Упрощённый контракт абстрактного RDD
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
// Метод 1: список партиций
protected def getPartitions: Array[Partition]
// Метод 2: вычисление одной партиции
def compute(split: Partition, context: TaskContext): Iterator[T]
// Метод 3: зависимости от родителей
protected def getDependencies: Seq[Dependency[_]] = deps
// Метод 4 (опциональный): партиционер
val partitioner: Option[Partitioner] = None
// Метод 5 (опциональный): предпочтительные локации
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
}
Это весь контракт. Пять методов — и вы реализовали полноценный распределённый датасет. Разберём каждый.
Метод 1: getPartitions
getPartitions возвращает массив объектов Partition — дескрипторов, которые описывают логические куски данных. Важно: Partition — это не сами данные, а только адрес. Для HadoopRDD это путь к HDFS-блоку и offsets; для KafkaRDD это топик, партиция Kafka и диапазон offsets; для ParallelCollectionRDD это индекс в массиве данных.
// Partition для параллельной коллекции
class ParallelCollectionPartition[T: ClassTag](
var rddId: Long,
val slice: Int, // номер партиции
var values: Seq[T] // сами данные (маленький датасет)
) extends Partition {
def index: Int = slice
}
Вызов getPartitions происходит один раз при первом обращении к rdd.partitions, результат кешируется в поле partitions_. Это важно: DAGScheduler вызывает rdd.partitions при построении стадий, и дорогостоящий вызов к Hive Metastore или файловой системе происходит ровно один раз.
Число элементов в getPartitions определяет параллелизм вычислений: каждая партиция станет одним Task. Если у вас 2000 партиций — будет создано 2000 Task в одном Stage.
Метод 2: compute
compute(split: Partition, context: TaskContext): Iterator[T] — это сердце RDD. Метод принимает дескриптор партиции, открывает источник данных и возвращает ленивый итератор элементов.
Слово «ленивый» здесь ключевое. Iterator[T] — это Scala-интерфейс с методами hasNext и next. Данные не материализуются в память целиком. Вместо этого каждый вызов next() читает следующий элемент из источника. Это фундаментальное свойство: вся цепочка трансформаций поверх RDD работает как composited iterator, не создавая промежуточных коллекций.
// HadoopRDD.compute — читает один HDFS-блок
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
// открываем InputFormat, создаём RecordReader для блока
val reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
override def getNext(): (K, V) = {
// читаем следующую запись из InputFormat
finished = !reader.next(key, value)
(key, value)
}
override def close(): Unit = reader.close()
}
new InterruptibleIterator[(K, V)](context, iter)
}
Важная деталь: TaskContext — не просто параметр. Через него compute регистрирует TaskCompletionListener для закрытия ресурсов (file handle, connection). Если Task будет прерван, Spark вызовет всех listeners в обратном порядке. Вот почему утечки file handle в кастомных RDD — классическая ошибка: забыли зарегистрировать cleanup в TaskContext.
Внутри Spark compute никогда не вызывается напрямую. Вместо него вызывается rdd.iterator(partition, context), который сначала проверяет кеш (StorageLevel), и только при cache miss идёт в compute. Это обеспечивает прозрачную работу persist().
Метод 3: getDependencies
getDependencies возвращает список зависимостей от родительских RDD. Это и есть lineage — граф восстановления. Dependency бывает двух видов: NarrowDependency (каждая выходная партиция зависит от одной входной) и ShuffleDependency (каждая выходная партиция может зависеть от всех входных). Деталям посвящён отдельный урок, но вот как это выглядит для простой map:
// MapPartitionsRDD создаётся при вызове rdd.map(f) или rdd.filter(f)
class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // функция трансформации
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false
) extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def getDependencies: Seq[Dependency[_]] =
new OneToOneDependency(prev) :: Nil // narrow: партиция i зависит только от партиции i родителя
}
DAGScheduler обходит граф зависимостей рекурсивно, чтобы построить DAG стадий. ShuffleDependency становится границей стадии; NarrowDependency — нет. Подробнее — в уроке 03.
Метод 4: partitioner
partitioner: Option[Partitioner] — опциональное поле, которое сообщает: «элементы этого RDD распределены по партициям по этому правилу». Это критически важно для оптимизации: если два RDD имеют одинаковый партиционер, их join не требует shuffle.
Партиционер устанавливается операциями, которые явно управляют распределением: groupByKey, reduceByKey, partitionBy. После map партиционер теряется (если только не передать preservesPartitioning = true). Именно поэтому mapValues сохраняет партиционер, а map — нет: mapValues только трогает значение, ключ (а значит, партиция) остаётся тем же.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
pairs = sc.parallelize([(1, "a"), (2, "b"), (1, "c")], 4)
grouped = pairs.groupByKey(numPartitions=4)
print(grouped.partitioner)
# Out: <pyspark.rdd.Partitioner object>
# Это HashPartitioner(4)
# После map партиционер исчезает
mapped = grouped.map(lambda x: (x[0], list(x[1])))
print(mapped.partitioner)
# Out: None — shuffle может понадобиться снова
Отсутствие партиционера — частая причина лишних shuffle. Инструмент диагностики: rdd.partitioner в PySpark или метод toDebugString() для просмотра всего lineage с указанием, где установлены партиционеры.
Метод 5: getPreferredLocations
getPreferredLocations(split: Partition): Seq[String] возвращает список hostname-ов или executor-ов, на которых желательно вычислять данную партицию. TaskScheduler использует эти хинты для data locality: если executor, держащий нужный HDFS-блок, свободен, Task отправится туда. Это позволяет избежать сетевой передачи данных.
// HadoopRDD: локации блоков из HDFS
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition]
val locs = hsplit.inputSplit.value match {
case lsplit: FileSplit => lsplit.getLocations // HDFS block locations
case _ => Seq.empty
}
locs.filter(_ != "localhost")
}
Для in-memory RDD (ParallelCollectionRDD) метод возвращает пустой список — данных в HDFS нет, везде одинаково хорошо. Для Kafka это будет IP брокера с данной партицией топика.
TaskScheduler реализует PROCESS_LOCAL -> NODE_LOCAL -> RACK_LOCAL -> ANY — иерархию locality. Если preferred executor недоступен, Spark ждёт spark.locality.wait (default 3s) и пробует следующий уровень. Если смотреть на Spark UI и видеть много ANY locality — значит, либо данные в памяти на других executor-ах, либо partitioner не учитывает расположение данных.
RDD и DataFrame: один фундамент
Когда вы пишете df.filter(col("age") > 30).groupBy("city").count(), это не «обходит» RDD. Catalyst компилирует DataFrame-план в физический план, который в итоге создаёт RDD[InternalRow]. Каждый физический оператор — FilterExec, HashAggregateExec — является подклассом SparkPlan и реализует метод doExecute(): RDD[InternalRow].
// FilterExec.doExecute() — упрощённо
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = Predicate.create(condition, child.output)
predicate.initialize(index)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
child.execute() — это вызов doExecute() на дочернем плане, который тоже вернёт RDD. Так строится граф RDD снизу вверх. Итоговый результат — RDD[InternalRow], где InternalRow — это и есть UnsafeRow в памяти Tungsten.
Что DataFrame даёт сверх RDD:
- Схема: типизированные колонки,
StructType,StructField. - Оптимизация: Catalyst Rules, CBO, predicate pushdown в источники.
- Whole-stage codegen: вместо цепочки RDD-итераторов — один сгенерированный Java-метод.
- AQE: адаптивная перепланировка во время выполнения.
Что теряется при переходе с RDD на DataFrame:
- Произвольный тип
TвRDD[T]: DataFrame всегдаRDD[Row]илиRDD[InternalRow]. - Контроль над партиционированием «из кода» (нужен
repartition/hint). - Возможность легко написать кастомный
Partitioner. - Прямой контроль над сериализацией.
Это объясняет, почему в серьёзных продакшн-кодебасах ещё встречается RDD API: кастомные источники данных, тонкая настройка партиционирования, операции, которые Catalyst не умеет оптимизировать.
Immutability и ленивость: не просто дизайн
Immutability RDD — это не академическая чистота. Это прямое следствие распределённой природы системы. Если бы RDD был изменяемым, то после сбоя одного executor-а и пересчёта партиции на другом результат мог бы отличаться от оригинального (из-за side effects). Неизменяемость гарантирует, что пересчёт даст тот же результат.
Ленивость (lazy evaluation) — не про производительность само по себе, а про возможность оптимизации. Когда Spark знает весь граф трансформаций до выполнения, он может:
- Объединить несколько
filterв один проход. - Применить predicate pushdown в источник данных.
- Выбрать физическую стратегию join на основе статистик.
- Построить оптимальный план выполнения с учётом данных из предыдущих стадий (AQE).
Eager evaluation лишила бы Spark всех этих возможностей.
Распространённая ошибка — вызов rdd.count() или rdd.collect() внутри трансформации для «отладки». Это прерывает ленивый граф, запускает Job посреди другого Job и приводит к вложенным вычислениям. В production это вызывало deadlock в TaskScheduler до Spark 3.x. Для дебаггинга используйте rdd.toDebugString и rdd.partitions.length — они не запускают вычислений.
RDD API в Spark 4.0: статус
В Spark 4.0 RDD API полностью сохранён, но команда Spark открыто рекомендует DataFrame/Dataset для новых проектов. Причина — не функциональность, а производительность: whole-stage codegen и AQE работают только с DataFrame. Однако несколько сценариев, где RDD остаётся незаменимым:
- Нестандартные типы данных:
RDD[MyDomainObject]с кастомным Kryo-сериализатором. - GraphX: весь GraphX построен на
RDD[(VertexId, VD)]иRDD[Edge[ED]]. - Кастомные источники для Spark Streaming (устаревший DStream API).
- Тонкая настройка partition affinity:
rdd.preferredLocationsс кастомной логикой.
Попробуй сам
Создай простой кастомный RDD, который генерирует числа Фибоначчи. Это покажет, как работают все пять методов контракта:
from pyspark import SparkContext, RDD
from pyspark.rdd import RDD
sc = SparkContext.getOrCreate()
# Встроенный parallelize создаёт ParallelCollectionRDD
nums = sc.parallelize(range(1, 1001), 10) # 10 партиций
# Посмотрим на структуру
print(f"Число партиций: {nums.getNumPartitions()}") # 10
print(f"Партиционер: {nums.partitioner}") # None
print(f"Тип: {type(nums)}")
# ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287
# Lineage через toDebugString
mapped = nums.map(lambda x: x * 2).filter(lambda x: x % 6 == 0)
print(mapped.toDebugString().decode())
# (10) PythonRDD[2] at filter at <stdin>:1 []
# | PythonRDD[1] at map at <stdin>:1 []
# | ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287 []
# Посмотрим на зависимости через внутренний API
# (только в Scala/Java напрямую, в PySpark через explain):
rdd_with_key = nums.map(lambda x: (x % 4, x))
grouped = rdd_with_key.groupByKey(4)
print(f"Партиционер после groupByKey: {grouped.partitioner}")
# <pyspark.rdd.Partitioner object at 0x...>
# Потеря партиционера через map
after_map = grouped.map(lambda kv: kv)
print(f"Партиционер после map: {after_map.partitioner}")
# None -- Spark не знает, сохранил ли map ключи
# Сохранение партиционера через mapValues
after_mapvalues = grouped.mapValues(list)
print(f"Партиционер после mapValues: {after_mapvalues.partitioner}")
# <pyspark.rdd.Partitioner object> -- сохранён!
Ожидаемый результат: mapValues сохраняет партиционер, map — нет. Это объясняет, почему в цепочке RDD-операций правильно использовать mapValues/flatMapValues вместо map там, где ключ не меняется.