Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 28 мин
Продвинутый
RDDAbstractionPartitionsLineageDataFrame internals

RDD как абстракция: пять свойств

Apache Spark 4.0 устроен как слоёный пирог. На вершине — DataFrame API, Spark SQL, Structured Streaming. В самом низу — RDD[T], абстракция, не менявшая своего контракта со Spark 1.0. Понимание этого контракта — обязательное условие для работы на уровне production-дебаггинга: когда вы читаете физический план через explain("formatted"), вы видите операции, которые в итоге компилируются в RDD-граф. Когда executor падает с OOM, вы должны знать, сколько партиций держит lineage в памяти и почему.

Этот урок — про RDD как математическую абстракцию. Пять обязательных методов, которые любая реализация RDD обязана определить, и что за ними скрывается на уровне JVM.

Что такое RDD

RDD[T] (Resilient Distributed Dataset) — это неизменяемая, партиционированная коллекция элементов типа T, которая умеет пересчитывать свои партиции при сбое. Ключевое слово — «умеет пересчитывать»: RDD сам знает, как восстановить свои данные, потому что хранит ссылки на родительские RDD и функцию вычисления.

Абстрактный базовый класс живёт в org.apache.spark.rdd.RDD (пакет core/src/main/scala/org/apache/spark/rdd/). Любая реализация — MapPartitionsRDD, ShuffledRDD, HadoopRDD, ParallelCollectionRDD — наследует этот класс и переопределяет пять методов.

// Упрощённый контракт абстрактного RDD
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

  // Метод 1: список партиций
  protected def getPartitions: Array[Partition]

  // Метод 2: вычисление одной партиции
  def compute(split: Partition, context: TaskContext): Iterator[T]

  // Метод 3: зависимости от родителей
  protected def getDependencies: Seq[Dependency[_]] = deps

  // Метод 4 (опциональный): партиционер
  val partitioner: Option[Partitioner] = None

  // Метод 5 (опциональный): предпочтительные локации
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
}

Это весь контракт. Пять методов — и вы реализовали полноценный распределённый датасет. Разберём каждый.

Метод 1: getPartitions

getPartitions возвращает массив объектов Partition — дескрипторов, которые описывают логические куски данных. Важно: Partition — это не сами данные, а только адрес. Для HadoopRDD это путь к HDFS-блоку и offsets; для KafkaRDD это топик, партиция Kafka и диапазон offsets; для ParallelCollectionRDD это индекс в массиве данных.

// Partition для параллельной коллекции
class ParallelCollectionPartition[T: ClassTag](
    var rddId: Long,
    val slice: Int,          // номер партиции
    var values: Seq[T]       // сами данные (маленький датасет)
) extends Partition {
  def index: Int = slice
}

Вызов getPartitions происходит один раз при первом обращении к rdd.partitions, результат кешируется в поле partitions_. Это важно: DAGScheduler вызывает rdd.partitions при построении стадий, и дорогостоящий вызов к Hive Metastore или файловой системе происходит ровно один раз.

Число элементов в getPartitions определяет параллелизм вычислений: каждая партиция станет одним Task. Если у вас 2000 партиций — будет создано 2000 Task в одном Stage.

Структура партиционирования RDD
RDD[T]Логический датасет. Хранит ссылку на массив Partition-объектов. Partition — это дескриптор, не данные.
Partition 0Partition объект: rddId, index, metadata. Для HadoopRDD содержит путь + byte-range в HDFS-файле. Сериализуется и шлётся на executor.
Partition 1Каждая партиция — один Task в Stage. Task получает Partition-дескриптор и вызывает compute(partition, taskContext).
Partition NКоличество партиций определяет степень параллелизма. Для Parquet — число файлов * число row groups. Можно изменить через repartition/coalesce.
Task 0 -> compute(p0)DAGScheduler создаёт Task per Partition. TaskScheduler отправляет Task на executor с data locality. Executor вызывает rdd.iterator(partition, context).

Метод 2: compute

compute(split: Partition, context: TaskContext): Iterator[T] — это сердце RDD. Метод принимает дескриптор партиции, открывает источник данных и возвращает ленивый итератор элементов.

Слово «ленивый» здесь ключевое. Iterator[T] — это Scala-интерфейс с методами hasNext и next. Данные не материализуются в память целиком. Вместо этого каждый вызов next() читает следующий элемент из источника. Это фундаментальное свойство: вся цепочка трансформаций поверх RDD работает как composited iterator, не создавая промежуточных коллекций.

// HadoopRDD.compute — читает один HDFS-блок
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
  val iter = new NextIterator[(K, V)] {
    val split = theSplit.asInstanceOf[HadoopPartition]
    // открываем InputFormat, создаём RecordReader для блока
    val reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

    override def getNext(): (K, V) = {
      // читаем следующую запись из InputFormat
      finished = !reader.next(key, value)
      (key, value)
    }
    override def close(): Unit = reader.close()
  }
  new InterruptibleIterator[(K, V)](context, iter)
}

Важная деталь: TaskContext — не просто параметр. Через него compute регистрирует TaskCompletionListener для закрытия ресурсов (file handle, connection). Если Task будет прерван, Spark вызовет всех listeners в обратном порядке. Вот почему утечки file handle в кастомных RDD — классическая ошибка: забыли зарегистрировать cleanup в TaskContext.

Внутри Spark compute никогда не вызывается напрямую. Вместо него вызывается rdd.iterator(partition, context), который сначала проверяет кеш (StorageLevel), и только при cache miss идёт в compute. Это обеспечивает прозрачную работу persist().

Метод 3: getDependencies

getDependencies возвращает список зависимостей от родительских RDD. Это и есть lineage — граф восстановления. Dependency бывает двух видов: NarrowDependency (каждая выходная партиция зависит от одной входной) и ShuffleDependency (каждая выходная партиция может зависеть от всех входных). Деталям посвящён отдельный урок, но вот как это выглядит для простой map:

// MapPartitionsRDD создаётся при вызове rdd.map(f) или rdd.filter(f)
class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // функция трансформации
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false
) extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def getDependencies: Seq[Dependency[_]] =
    new OneToOneDependency(prev) :: Nil  // narrow: партиция i зависит только от партиции i родителя
}

DAGScheduler обходит граф зависимостей рекурсивно, чтобы построить DAG стадий. ShuffleDependency становится границей стадии; NarrowDependency — нет. Подробнее — в уроке 03.

Метод 4: partitioner

partitioner: Option[Partitioner] — опциональное поле, которое сообщает: «элементы этого RDD распределены по партициям по этому правилу». Это критически важно для оптимизации: если два RDD имеют одинаковый партиционер, их join не требует shuffle.

Партиционер устанавливается операциями, которые явно управляют распределением: groupByKey, reduceByKey, partitionBy. После map партиционер теряется (если только не передать preservesPartitioning = true). Именно поэтому mapValues сохраняет партиционер, а map — нет: mapValues только трогает значение, ключ (а значит, партиция) остаётся тем же.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

pairs = sc.parallelize([(1, "a"), (2, "b"), (1, "c")], 4)
grouped = pairs.groupByKey(numPartitions=4)

print(grouped.partitioner)
# Out: <pyspark.rdd.Partitioner object>
# Это HashPartitioner(4)

# После map партиционер исчезает
mapped = grouped.map(lambda x: (x[0], list(x[1])))
print(mapped.partitioner)
# Out: None — shuffle может понадобиться снова

Отсутствие партиционера — частая причина лишних shuffle. Инструмент диагностики: rdd.partitioner в PySpark или метод toDebugString() для просмотра всего lineage с указанием, где установлены партиционеры.

Метод 5: getPreferredLocations

getPreferredLocations(split: Partition): Seq[String] возвращает список hostname-ов или executor-ов, на которых желательно вычислять данную партицию. TaskScheduler использует эти хинты для data locality: если executor, держащий нужный HDFS-блок, свободен, Task отправится туда. Это позволяет избежать сетевой передачи данных.

// HadoopRDD: локации блоков из HDFS
override def getPreferredLocations(split: Partition): Seq[String] = {
  val hsplit = split.asInstanceOf[HadoopPartition]
  val locs = hsplit.inputSplit.value match {
    case lsplit: FileSplit => lsplit.getLocations  // HDFS block locations
    case _ => Seq.empty
  }
  locs.filter(_ != "localhost")
}

Для in-memory RDD (ParallelCollectionRDD) метод возвращает пустой список — данных в HDFS нет, везде одинаково хорошо. Для Kafka это будет IP брокера с данной партицией топика.

TaskScheduler реализует PROCESS_LOCAL -> NODE_LOCAL -> RACK_LOCAL -> ANY — иерархию locality. Если preferred executor недоступен, Spark ждёт spark.locality.wait (default 3s) и пробует следующий уровень. Если смотреть на Spark UI и видеть много ANY locality — значит, либо данные в памяти на других executor-ах, либо partitioner не учитывает расположение данных.

Пять свойств RDD и их роль в планировании
getPartitionsArray[Partition] — дескрипторы кусков данных. DAGScheduler: сколько Task создать. Вызывается один раз, кешируется.
computeIterator[T] — ленивое чтение данных партиции. Вызывается на executor per Task. Никогда напрямую: через rdd.iterator() с проверкой кеша.
getDependenciesSeq[Dependency] — lineage граф. DAGScheduler обходит для построения Stage границ. ShuffleDependency = граница Stage. NarrowDependency = пайплайнится.
partitionerOption[Partitioner] — правило распределения по партициям. Если два RDD имеют одинаковый partitioner — join без shuffle (co-partitioned). groupByKey/reduceByKey устанавливают.
getPreferredLocationsSeq[String] — hostname/executor где данные физически расположены. TaskScheduler: data locality PROCESS_LOCAL -> NODE_LOCAL -> ANY. spark.locality.wait=3s default.

RDD и DataFrame: один фундамент

Когда вы пишете df.filter(col("age") > 30).groupBy("city").count(), это не «обходит» RDD. Catalyst компилирует DataFrame-план в физический план, который в итоге создаёт RDD[InternalRow]. Каждый физический оператор — FilterExec, HashAggregateExec — является подклассом SparkPlan и реализует метод doExecute(): RDD[InternalRow].

// FilterExec.doExecute() — упрощённо
override protected def doExecute(): RDD[InternalRow] = {
  val numOutputRows = longMetric("numOutputRows")
  child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
    val predicate = Predicate.create(condition, child.output)
    predicate.initialize(index)
    iter.filter { row =>
      val r = predicate.eval(row)
      if (r) numOutputRows += 1
      r
    }
  }
}

child.execute() — это вызов doExecute() на дочернем плане, который тоже вернёт RDD. Так строится граф RDD снизу вверх. Итоговый результат — RDD[InternalRow], где InternalRow — это и есть UnsafeRow в памяти Tungsten.

Что DataFrame даёт сверх RDD:

  • Схема: типизированные колонки, StructType, StructField.
  • Оптимизация: Catalyst Rules, CBO, predicate pushdown в источники.
  • Whole-stage codegen: вместо цепочки RDD-итераторов — один сгенерированный Java-метод.
  • AQE: адаптивная перепланировка во время выполнения.

Что теряется при переходе с RDD на DataFrame:

  • Произвольный тип T в RDD[T]: DataFrame всегда RDD[Row] или RDD[InternalRow].
  • Контроль над партиционированием «из кода» (нужен repartition / hint).
  • Возможность легко написать кастомный Partitioner.
  • Прямой контроль над сериализацией.

Это объясняет, почему в серьёзных продакшн-кодебасах ещё встречается RDD API: кастомные источники данных, тонкая настройка партиционирования, операции, которые Catalyst не умеет оптимизировать.

Immutability и ленивость: не просто дизайн

Immutability RDD — это не академическая чистота. Это прямое следствие распределённой природы системы. Если бы RDD был изменяемым, то после сбоя одного executor-а и пересчёта партиции на другом результат мог бы отличаться от оригинального (из-за side effects). Неизменяемость гарантирует, что пересчёт даст тот же результат.

Ленивость (lazy evaluation) — не про производительность само по себе, а про возможность оптимизации. Когда Spark знает весь граф трансформаций до выполнения, он может:

  • Объединить несколько filter в один проход.
  • Применить predicate pushdown в источник данных.
  • Выбрать физическую стратегию join на основе статистик.
  • Построить оптимальный план выполнения с учётом данных из предыдущих стадий (AQE).

Eager evaluation лишила бы Spark всех этих возможностей.

WARNING

Распространённая ошибка — вызов rdd.count() или rdd.collect() внутри трансформации для «отладки». Это прерывает ленивый граф, запускает Job посреди другого Job и приводит к вложенным вычислениям. В production это вызывало deadlock в TaskScheduler до Spark 3.x. Для дебаггинга используйте rdd.toDebugString и rdd.partitions.length — они не запускают вычислений.

RDD API в Spark 4.0: статус

В Spark 4.0 RDD API полностью сохранён, но команда Spark открыто рекомендует DataFrame/Dataset для новых проектов. Причина — не функциональность, а производительность: whole-stage codegen и AQE работают только с DataFrame. Однако несколько сценариев, где RDD остаётся незаменимым:

  1. Нестандартные типы данных: RDD[MyDomainObject] с кастомным Kryo-сериализатором.
  2. GraphX: весь GraphX построен на RDD[(VertexId, VD)] и RDD[Edge[ED]].
  3. Кастомные источники для Spark Streaming (устаревший DStream API).
  4. Тонкая настройка partition affinity: rdd.preferredLocations с кастомной логикой.

Попробуй сам

Создай простой кастомный RDD, который генерирует числа Фибоначчи. Это покажет, как работают все пять методов контракта:

from pyspark import SparkContext, RDD
from pyspark.rdd import RDD

sc = SparkContext.getOrCreate()

# Встроенный parallelize создаёт ParallelCollectionRDD
nums = sc.parallelize(range(1, 1001), 10)  # 10 партиций

# Посмотрим на структуру
print(f"Число партиций: {nums.getNumPartitions()}")  # 10
print(f"Партиционер: {nums.partitioner}")             # None
print(f"Тип: {type(nums)}")
# ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

# Lineage через toDebugString
mapped = nums.map(lambda x: x * 2).filter(lambda x: x % 6 == 0)
print(mapped.toDebugString().decode())
# (10) PythonRDD[2] at filter at <stdin>:1 []
#  |   PythonRDD[1] at map at <stdin>:1 []
#  |   ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287 []

# Посмотрим на зависимости через внутренний API
# (только в Scala/Java напрямую, в PySpark через explain):
rdd_with_key = nums.map(lambda x: (x % 4, x))
grouped = rdd_with_key.groupByKey(4)
print(f"Партиционер после groupByKey: {grouped.partitioner}")
# <pyspark.rdd.Partitioner object at 0x...>

# Потеря партиционера через map
after_map = grouped.map(lambda kv: kv)
print(f"Партиционер после map: {after_map.partitioner}")
# None -- Spark не знает, сохранил ли map ключи

# Сохранение партиционера через mapValues
after_mapvalues = grouped.mapValues(list)
print(f"Партиционер после mapValues: {after_mapvalues.partitioner}")
# <pyspark.rdd.Partitioner object> -- сохранён!

Ожидаемый результат: mapValues сохраняет партиционер, map — нет. Это объясняет, почему в цепочке RDD-операций правильно использовать mapValues/flatMapValues вместо map там, где ключ не меняется.

Проверка знанийKnowledge check
Вы пишете PySpark-код: rdd1 = sc.parallelize(data, 200).groupByKey(100). rdd2 = rdd1.mapValues(process_fn). rdd3 = rdd1.join(rdd2). Сколько shuffle-операций произойдёт при вызове rdd3.count()? Обоснуйте через пять свойств RDD.
ОтветAnswer
Произойдёт ровно одна shuffle-операция — при groupByKey(100). Вот почему: rdd1 после groupByKey имеет HashPartitioner(100) (свойство 4: partitioner). rdd2 = rdd1.mapValues(fn) — mapValues сохраняет partitioner родителя, поэтому rdd2.partitioner тоже HashPartitioner(100). При join(rdd2) DAGScheduler видит, что оба входных RDD имеют одинаковый партиционер с одинаковым числом партиций. Это называется co-partitioned join. Зависимость будет OneToOneDependency (narrow), а не ShuffleDependency. DAGScheduler не создаст новую Stage для join. Итого: один shuffle (groupByKey), один Stage для join без перемещения данных. Если бы вместо mapValues использовался map — партиционер rdd2 стал бы None, и join потребовал бы второй shuffle.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Вы вызываете rdd.partitions на HadoopRDD, указывающем на 1000 HDFS-файлов. Сколько раз Spark обращается к HDFS NameNode за информацией о блоках?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5