Narrow и wide зависимости
Каждый вызов трансформации над RDD создаёт новый объект Dependency. Именно через эти объекты DAGScheduler строит план: какие операции можно запустить параллельно без перемещения данных, где нужен shuffle, какие Task можно объединить в один Stage. Понимание иерархии классов зависимостей — это не академическое упражнение, а прямой инструмент чтения explain() и Spark UI.
Иерархия классов зависимостей
Все зависимости в Spark 4.0 — это подклассы абстрактного Dependency[T] из пакета org.apache.spark. Иерархия:
Dependency[T]
├── NarrowDependency[T] -- абстрактный
│ ├── OneToOneDependency[T] -- map, filter, flatMap, coalesce (без shuffle)
│ └── RangeDependency[T] -- union (диапазоны партиций)
└── ShuffleDependency[K, V, C] -- groupByKey, reduceByKey, join, repartition
Единственный метод, который обязана определить NarrowDependency, — это getParents(partitionId: Int): Seq[Int]: какие партиции родителя нужны для вычисления данной выходной партиции. Для OneToOneDependency это всегда Seq(partitionId) — тривиально.
ShuffleDependency не имеет getParents — по определению каждая выходная партиция зависит от всех входных.
OneToOneDependency: самая частая narrow
OneToOneDependency используется в MapPartitionsRDD — это то, что возникает при map, filter, flatMap, mapPartitions. Реализация тривиальна:
// core/src/main/scala/org/apache/spark/Dependency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
getParents(42) возвращает List(42) — выходная партиция 42 зависит только от входной партиции 42. Это то самое свойство, которое позволяет пайплайнить операции: executor, обрабатывающий партицию 42, никогда не ждёт данных от другого executor-а.
При этом число партиций выхода всегда равно числу партиций входа. rdd.map(f) с 200 партициями даст RDD с 200 партициями.
RangeDependency: union двух RDD
union — специальный случай. Когда два RDD объединяются через union, результирующий UnionRDD не перемещает данные, просто объединяет партиции. Партиции RDD A занимают диапазон [0, A.numPartitions), партиции RDD B — [A.numPartitions, A.numPartitions + B.numPartitions).
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length)
List(partitionId - outStart + inStart)
else
Nil
}
}
Для union(rddA(200 partitions), rddB(300 partitions)):
- Партиции 0..199 результата зависят от партиций 0..199 rddA (RangeDependency с outStart=0, inStart=0, length=200)
- Партиции 200..499 результата зависят от партиций 0..299 rddB (RangeDependency с outStart=200, inStart=0, length=300)
Никакого shuffle. union — это O(1) операция, она только создаёт новый объект UnionRDD с двумя RangeDependency.
ShuffleDependency: граница Stage
ShuffleDependency — принципиально другой зверь. Он не реализует NarrowDependency и не имеет getParents. Вместо этого он содержит параметры shuffle: партиционер, сериализатор, агрегатор:
// Упрощённая сигнатура ShuffleDependency
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor
) extends Dependency[Product2[K, C]] {
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
}
shuffleId — уникальный идентификатор shuffle операции. По нему MapOutputTracker отслеживает, какие map-outputs доступны. ShuffleHandle — это регистрация в ShuffleManager (SortShuffleManager в Spark 4.0 по умолчанию), который управляет физическим записью и чтением shuffle-файлов.
При каждом создании ShuffleDependency (при вызове groupByKey, reduceByKey, join, repartition и т.д.) DAGScheduler видит это как границу Stage. Текущий Stage заканчивается ShuffleMapStage, следующий Stage начинается с ShuffledRDD.
Как DAGScheduler использует зависимости
Алгоритм построения Stage-графа в DAGScheduler.createResultStage:
1. Начать с finalRDD (тот, у которого вызвали action)
2. Обойти граф зависимостей рекурсивно (DFS)
3. При встрече ShuffleDependency:
- Создать ShuffleMapStage для родительского RDD
- Зарегистрировать текущий Stage как зависящий от ShuffleMapStage
4. При встрече NarrowDependency:
- Продолжить обход внутри текущего Stage (pipelining)
5. ResultStage = Stage финального RDD
В коде это DAGScheduler.getOrCreateShuffleMapStage (вызывается рекурсивно). Поиск ShuffleDependency реализован в RDD.getNarrowAncestors — метод обходит только Narrow-зависимости в рамках одного Stage.
Пример с тремя операциями:
rdd = sc.textFile("hdfs:///data/", 200) # HadoopRDD, 200 partitions
words = rdd.flatMap(lambda l: l.split()) # MapPartitionsRDD, OneToOneDependency
pairs = words.map(lambda w: (w, 1)) # MapPartitionsRDD, OneToOneDependency
counted = pairs.reduceByKey(lambda a,b: a+b) # ShuffledRDD + MapPartitionsRDD
counted.saveAsTextFile("hdfs:///output/")
Stage-граф:
- Stage 0 (ShuffleMapStage):
HadoopRDD -> flatMap -> map— три RDD, два OneToOneDependency, пайплайнятся в один Task. - Stage 1 (ResultStage):
ShuffledRDD -> reduceByKey result— один ShuffleMapStage сначала, потом чтение и запись.
Граница между Stage 0 и Stage 1 — это ShuffleDependency внутри reduceByKey.
Pipelining narrow-операций
Самое важное следствие narrow зависимостей — пайплайнинг. Когда executor получает Task для Stage 0, он не читает все данные в память, не применяет flatMap, не записывает промежуточный результат, затем не читает снова для map. Вместо этого:
// Псевдокод Task execution для Stage 0:
for each record in HadoopRDD.compute(partition):
for each word in flatMap(record): // стрим через flatMap
yield (word, 1) // и сразу map
// результат идёт в ShuffleWriter
// никакой материализации в памяти
Это возможно благодаря тому, что MapPartitionsRDD.compute(partition) просто оборачивает итератор родителя:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
// firstParent.iterator вернёт Iterator от HadoopRDD
// f оборачивает его своей логикой (flatMap)
// результат — ещё один ленивый Iterator
Весь пайплайн — это вложенные итераторы. Данные «текут» через цепочку трансформаций по одному элементу (или по chunk-у, если оператор буферизует). Память нужна только под текущий chunk, а не под всю партицию.
Это объясняет разницу в потреблении памяти между rdd.map(f).filter(p) и rdd.map(f).persist().filter(p). Первый вариант держит в памяти только current element (O(1)). Второй — всю партицию после map (O(N)). В production persist() оправдан только если RDD используется многократно.
Pipelining в контексте DataFrame
Когда Catalyst компилирует DataFrame в физический план, он тоже опирается на narrow vs wide. Физические операторы FilterExec, ProjectExec, UnionExec — все реализуют doExecute() через MapPartitionsRDD (OneToOneDependency). Операторы SortMergeJoinExec, HashAggregateExec с предшествующим Exchange — через ShuffleDependency.
Но в DataFrame есть дополнительный уровень оптимизации поверх итераторного пайплайнинга — whole-stage codegen. Вместо цепочки вложенных итераторов Catalyst генерирует один Java-метод, который обрабатывает все операторы одного Stage в одном tight loop. Это устраняет накладные расходы на виртуальные вызовы next() между операторами. Подробно — в уроке 05.
Диагностика через Spark UI
В Spark UI вкладка DAG Visualization рисует граф Stage-ов. Каждый блок — Stage, стрелки со словом «shuffle» — это ShuffleDependency. Внутри каждого блока видны операции, пайплайнированные в один Stage.
Для чтения через explain:
df = spark.read.parquet("hdfs:///data/*.parquet")
result = df.filter(df.amount > 100) \
.groupBy("city") \
.agg({"amount": "sum"}) \
.filter(df.city != "Unknown")
result.explain("formatted")
В выводе explain("formatted") секции с *(N) обозначают codegen-стадии (внутри одного Stage), Exchange — это ShuffleDependency, всё до Exchange пайплайнится.
Repartition vs coalesce: почему важен тип зависимости
repartition(N) создаёт ShuffleDependency (перемешивает данные), coalesce(N, shuffle=False) создаёт narrow зависимость (просто объединяет партиции на одном executor-е).
# repartition: shuffle, данные перемешиваются равномерно
rdd.repartition(50)
# -> ShuffledRDD с ShuffleDependency
# -> Новый Stage, shuffle по сети
# -> Равномерное распределение
# coalesce без shuffle: narrow, объединяет партиции
rdd.coalesce(50)
# -> CoalescedRDD с NarrowDependency (объединяет N:1)
# -> Тот же Stage, нет сетевого трафика
# -> Может быть неравномерное распределение
coalesce(shuffle=False) — важная оптимизация при уменьшении числа партиций: нет shuffle, нет нового Stage. Но если текущие партиции неравномерны по размеру, coalesce просто объединит маленькие и большие вместе, что создаст data skew. repartition равномерен, но требует полного shuffle.
В Spark 4.0 AQE умеет автоматически коалесцировать shuffle-партиции после shuffle, без явного coalesce — через spark.sql.adaptive.coalescePartitions.enabled=true (default true).
Диагностика: выявление ненужных shuffle
Частая проблема в production: shuffle там, где его быть не должно. Например, два groupByKey подряд с одинаковым числом партиций могут привести к двум shuffle, хотя второй лишний, если данные уже правильно партиционированы.
# Лишний shuffle: первый groupByKey устанавливает HashPartitioner(100)
# Второй groupByKey видит, что партиционер уже подходящий — shuffle НЕ нужен
rdd1 = pairs.groupByKey(100) # ShuffleDependency -> HashPartitioner(100)
rdd2 = rdd1.mapValues(list) # сохраняет партиционер
rdd3 = rdd2.groupByKey(100) # Spark проверяет: partitioner уже HashPartitioner(100)?
# Если да — никакого shuffle!
# Проверить:
print(rdd1.partitioner) # HashPartitioner(100)
print(rdd3.partitioner) # HashPartitioner(100), и НЕ было shuffle
# НО: если изменить число партиций:
rdd4 = rdd2.groupByKey(200) # Другое число партиций -> новый shuffle!
Полная диагностика: explain("formatted") для DataFrame или toDebugString() для RDD. Ищите количество Exchange (shuffle) в плане — каждый Exchange стоит дорого.
Попробуй сам
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("dependencies").getOrCreate()
sc = spark.sparkContext
# Создаём RDD с разными типами зависимостей
data = sc.parallelize(range(1000), 20)
# Narrow dependencies: все в одном Stage
step1 = data.map(lambda x: (x % 10, x)) # OneToOneDependency
step2 = step1.filter(lambda kv: kv[1] > 100) # OneToOneDependency
step3 = step2.mapValues(lambda v: v * 2) # OneToOneDependency, сохраняет partitioner
# ShuffleDependency: новый Stage
step4 = step3.groupByKey(10) # ShuffleDependency
# Narrow после shuffle
step5 = step4.mapValues(list) # OneToOneDependency, сохраняет partitioner
# Второй shuffle
step6 = step5.flatMap(lambda kv: [(kv[0], v) for v in kv[1]]) # OneToOneDependency -- НО теряет партиционер!
step7 = step6.reduceByKey(lambda a, b: a + b, 10) # ShuffleDependency (нужен shuffle!)
# Посмотрим на lineage
print(step7.toDebugString().decode())
# Должны увидеть 2 ShuffledRDD в lineage
# Проверим партиционеры
print(f"step3 partitioner: {step3.partitioner}") # None (после mapValues без groupByKey)
print(f"step4 partitioner: {step4.partitioner}") # HashPartitioner(10)
print(f"step5 partitioner: {step5.partitioner}") # HashPartitioner(10) -- сохранён!
print(f"step6 partitioner: {step6.partitioner}") # None -- flatMap сбросил!
print(f"step7 partitioner: {step7.partitioner}") # HashPartitioner(10)
# Посчитаем shuffle операции через explain на DataFrame-версии
df_step7 = step7.toDF(["key", "value"])
df_step7.explain("formatted")
# Найдите Exchange в плане -- их должно быть 2