Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 27 мин
Продвинутый
DependencyNarrowWideShuffleDependencyStage boundariesPipelining

Narrow и wide зависимости

Каждый вызов трансформации над RDD создаёт новый объект Dependency. Именно через эти объекты DAGScheduler строит план: какие операции можно запустить параллельно без перемещения данных, где нужен shuffle, какие Task можно объединить в один Stage. Понимание иерархии классов зависимостей — это не академическое упражнение, а прямой инструмент чтения explain() и Spark UI.

Иерархия классов зависимостей

Все зависимости в Spark 4.0 — это подклассы абстрактного Dependency[T] из пакета org.apache.spark. Иерархия:

Dependency[T]
  ├── NarrowDependency[T]          -- абстрактный
  │     ├── OneToOneDependency[T]  -- map, filter, flatMap, coalesce (без shuffle)
  │     └── RangeDependency[T]     -- union (диапазоны партиций)
  └── ShuffleDependency[K, V, C]  -- groupByKey, reduceByKey, join, repartition

Единственный метод, который обязана определить NarrowDependency, — это getParents(partitionId: Int): Seq[Int]: какие партиции родителя нужны для вычисления данной выходной партиции. Для OneToOneDependency это всегда Seq(partitionId) — тривиально.

ShuffleDependency не имеет getParents — по определению каждая выходная партиция зависит от всех входных.

OneToOneDependency: самая частая narrow

OneToOneDependency используется в MapPartitionsRDD — это то, что возникает при map, filter, flatMap, mapPartitions. Реализация тривиальна:

// core/src/main/scala/org/apache/spark/Dependency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

getParents(42) возвращает List(42) — выходная партиция 42 зависит только от входной партиции 42. Это то самое свойство, которое позволяет пайплайнить операции: executor, обрабатывающий партицию 42, никогда не ждёт данных от другого executor-а.

При этом число партиций выхода всегда равно числу партиций входа. rdd.map(f) с 200 партициями даст RDD с 200 партициями.

RangeDependency: union двух RDD

union — специальный случай. Когда два RDD объединяются через union, результирующий UnionRDD не перемещает данные, просто объединяет партиции. Партиции RDD A занимают диапазон [0, A.numPartitions), партиции RDD B — [A.numPartitions, A.numPartitions + B.numPartitions).

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
    extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length)
      List(partitionId - outStart + inStart)
    else
      Nil
  }
}

Для union(rddA(200 partitions), rddB(300 partitions)):

  • Партиции 0..199 результата зависят от партиций 0..199 rddA (RangeDependency с outStart=0, inStart=0, length=200)
  • Партиции 200..499 результата зависят от партиций 0..299 rddB (RangeDependency с outStart=200, inStart=0, length=300)

Никакого shuffle. union — это O(1) операция, она только создаёт новый объект UnionRDD с двумя RangeDependency.

NarrowDependency: pipeline без shuffle
Source RDDHadoopRDD или ParallelCollectionRDD. Каждая Partition = один блок данных. Нет зависимостей.
map(f) -> MapPartitionsRDDOneToOneDependency: partition i зависит только от partition i родителя. Пайплайнится: compute P0 -> map P0 за один проход итератора.
filter(p) -> MapPartitionsRDDOneToOneDependency снова. Три операции в одном Stage без материализации промежуточных данных. Итератор: source.P0 -> map.P0 -> filter.P0 за один проход.

ShuffleDependency: граница Stage

ShuffleDependency — принципиально другой зверь. Он не реализует NarrowDependency и не имеет getParents. Вместо этого он содержит параметры shuffle: партиционер, сериализатор, агрегатор:

// Упрощённая сигнатура ShuffleDependency
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor
) extends Dependency[Product2[K, C]] {

  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)
}

shuffleId — уникальный идентификатор shuffle операции. По нему MapOutputTracker отслеживает, какие map-outputs доступны. ShuffleHandle — это регистрация в ShuffleManager (SortShuffleManager в Spark 4.0 по умолчанию), который управляет физическим записью и чтением shuffle-файлов.

При каждом создании ShuffleDependency (при вызове groupByKey, reduceByKey, join, repartition и т.д.) DAGScheduler видит это как границу Stage. Текущий Stage заканчивается ShuffleMapStage, следующий Stage начинается с ShuffledRDD.

Как DAGScheduler использует зависимости

Алгоритм построения Stage-графа в DAGScheduler.createResultStage:

1. Начать с finalRDD (тот, у которого вызвали action)
2. Обойти граф зависимостей рекурсивно (DFS)
3. При встрече ShuffleDependency:
   - Создать ShuffleMapStage для родительского RDD
   - Зарегистрировать текущий Stage как зависящий от ShuffleMapStage
4. При встрече NarrowDependency:
   - Продолжить обход внутри текущего Stage (pipelining)
5. ResultStage = Stage финального RDD

В коде это DAGScheduler.getOrCreateShuffleMapStage (вызывается рекурсивно). Поиск ShuffleDependency реализован в RDD.getNarrowAncestors — метод обходит только Narrow-зависимости в рамках одного Stage.

Пример с тремя операциями:

rdd = sc.textFile("hdfs:///data/", 200)      # HadoopRDD, 200 partitions
words = rdd.flatMap(lambda l: l.split())     # MapPartitionsRDD, OneToOneDependency
pairs = words.map(lambda w: (w, 1))          # MapPartitionsRDD, OneToOneDependency
counted = pairs.reduceByKey(lambda a,b: a+b) # ShuffledRDD + MapPartitionsRDD

counted.saveAsTextFile("hdfs:///output/")

Stage-граф:

  • Stage 0 (ShuffleMapStage): HadoopRDD -> flatMap -> map — три RDD, два OneToOneDependency, пайплайнятся в один Task.
  • Stage 1 (ResultStage): ShuffledRDD -> reduceByKey result — один ShuffleMapStage сначала, потом чтение и запись.

Граница между Stage 0 и Stage 1 — это ShuffleDependency внутри reduceByKey.

Pipelining narrow-операций

Самое важное следствие narrow зависимостей — пайплайнинг. Когда executor получает Task для Stage 0, он не читает все данные в память, не применяет flatMap, не записывает промежуточный результат, затем не читает снова для map. Вместо этого:

// Псевдокод Task execution для Stage 0:
for each record in HadoopRDD.compute(partition):
    for each word in flatMap(record):          // стрим через flatMap
        yield (word, 1)                         // и сразу map
        // результат идёт в ShuffleWriter
        // никакой материализации в памяти

Это возможно благодаря тому, что MapPartitionsRDD.compute(partition) просто оборачивает итератор родителя:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))
  // firstParent.iterator вернёт Iterator от HadoopRDD
  // f оборачивает его своей логикой (flatMap)
  // результат — ещё один ленивый Iterator

Весь пайплайн — это вложенные итераторы. Данные «текут» через цепочку трансформаций по одному элементу (или по chunk-у, если оператор буферизует). Память нужна только под текущий chunk, а не под всю партицию.

TIP

Это объясняет разницу в потреблении памяти между rdd.map(f).filter(p) и rdd.map(f).persist().filter(p). Первый вариант держит в памяти только current element (O(1)). Второй — всю партицию после map (O(N)). В production persist() оправдан только если RDD используется многократно.

Pipelining в контексте DataFrame

Когда Catalyst компилирует DataFrame в физический план, он тоже опирается на narrow vs wide. Физические операторы FilterExec, ProjectExec, UnionExec — все реализуют doExecute() через MapPartitionsRDD (OneToOneDependency). Операторы SortMergeJoinExec, HashAggregateExec с предшествующим Exchange — через ShuffleDependency.

Но в DataFrame есть дополнительный уровень оптимизации поверх итераторного пайплайнинга — whole-stage codegen. Вместо цепочки вложенных итераторов Catalyst генерирует один Java-метод, который обрабатывает все операторы одного Stage в одном tight loop. Это устраняет накладные расходы на виртуальные вызовы next() между операторами. Подробно — в уроке 05.

Диагностика через Spark UI

В Spark UI вкладка DAG Visualization рисует граф Stage-ов. Каждый блок — Stage, стрелки со словом «shuffle» — это ShuffleDependency. Внутри каждого блока видны операции, пайплайнированные в один Stage.

Для чтения через explain:

df = spark.read.parquet("hdfs:///data/*.parquet")
result = df.filter(df.amount > 100) \
           .groupBy("city") \
           .agg({"amount": "sum"}) \
           .filter(df.city != "Unknown")

result.explain("formatted")

В выводе explain("formatted") секции с *(N) обозначают codegen-стадии (внутри одного Stage), Exchange — это ShuffleDependency, всё до Exchange пайплайнится.

Repartition vs coalesce: почему важен тип зависимости

repartition(N) создаёт ShuffleDependency (перемешивает данные), coalesce(N, shuffle=False) создаёт narrow зависимость (просто объединяет партиции на одном executor-е).

# repartition: shuffle, данные перемешиваются равномерно
rdd.repartition(50)
# -> ShuffledRDD с ShuffleDependency
# -> Новый Stage, shuffle по сети
# -> Равномерное распределение

# coalesce без shuffle: narrow, объединяет партиции
rdd.coalesce(50)
# -> CoalescedRDD с NarrowDependency (объединяет N:1)
# -> Тот же Stage, нет сетевого трафика
# -> Может быть неравномерное распределение

coalesce(shuffle=False) — важная оптимизация при уменьшении числа партиций: нет shuffle, нет нового Stage. Но если текущие партиции неравномерны по размеру, coalesce просто объединит маленькие и большие вместе, что создаст data skew. repartition равномерен, но требует полного shuffle.

В Spark 4.0 AQE умеет автоматически коалесцировать shuffle-партиции после shuffle, без явного coalesce — через spark.sql.adaptive.coalescePartitions.enabled=true (default true).

Диагностика: выявление ненужных shuffle

Частая проблема в production: shuffle там, где его быть не должно. Например, два groupByKey подряд с одинаковым числом партиций могут привести к двум shuffle, хотя второй лишний, если данные уже правильно партиционированы.

# Лишний shuffle: первый groupByKey устанавливает HashPartitioner(100)
# Второй groupByKey видит, что партиционер уже подходящий — shuffle НЕ нужен
rdd1 = pairs.groupByKey(100)   # ShuffleDependency -> HashPartitioner(100)
rdd2 = rdd1.mapValues(list)    # сохраняет партиционер
rdd3 = rdd2.groupByKey(100)    # Spark проверяет: partitioner уже HashPartitioner(100)?
                                # Если да — никакого shuffle!

# Проверить:
print(rdd1.partitioner)  # HashPartitioner(100)
print(rdd3.partitioner)  # HashPartitioner(100), и НЕ было shuffle

# НО: если изменить число партиций:
rdd4 = rdd2.groupByKey(200)    # Другое число партиций -> новый shuffle!

Полная диагностика: explain("formatted") для DataFrame или toDebugString() для RDD. Ищите количество Exchange (shuffle) в плане — каждый Exchange стоит дорого.

Попробуй сам

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("dependencies").getOrCreate()
sc = spark.sparkContext

# Создаём RDD с разными типами зависимостей
data = sc.parallelize(range(1000), 20)

# Narrow dependencies: все в одном Stage
step1 = data.map(lambda x: (x % 10, x))          # OneToOneDependency
step2 = step1.filter(lambda kv: kv[1] > 100)      # OneToOneDependency
step3 = step2.mapValues(lambda v: v * 2)           # OneToOneDependency, сохраняет partitioner

# ShuffleDependency: новый Stage
step4 = step3.groupByKey(10)  # ShuffleDependency

# Narrow после shuffle
step5 = step4.mapValues(list)  # OneToOneDependency, сохраняет partitioner

# Второй shuffle
step6 = step5.flatMap(lambda kv: [(kv[0], v) for v in kv[1]])  # OneToOneDependency -- НО теряет партиционер!
step7 = step6.reduceByKey(lambda a, b: a + b, 10)               # ShuffleDependency (нужен shuffle!)

# Посмотрим на lineage
print(step7.toDebugString().decode())
# Должны увидеть 2 ShuffledRDD в lineage

# Проверим партиционеры
print(f"step3 partitioner: {step3.partitioner}")   # None (после mapValues без groupByKey)
print(f"step4 partitioner: {step4.partitioner}")   # HashPartitioner(10)
print(f"step5 partitioner: {step5.partitioner}")   # HashPartitioner(10) -- сохранён!
print(f"step6 partitioner: {step6.partitioner}")   # None -- flatMap сбросил!
print(f"step7 partitioner: {step7.partitioner}")   # HashPartitioner(10)

# Посчитаем shuffle операции через explain на DataFrame-версии
df_step7 = step7.toDF(["key", "value"])
df_step7.explain("formatted")
# Найдите Exchange в плане -- их должно быть 2
Проверка знанийKnowledge check
Вы анализируете production-джоб: rdd_orders.groupByKey(200).join(rdd_customers.groupByKey(200), 200). В Spark UI видите 4 Stage вместо ожидаемых 3. Что могло создать лишний Stage? Как переписать код, чтобы было ровно 3 Stage?
ОтветAnswer
Четыре Stage вместо трёх — скорее всего из-за двух отдельных groupByKey перед join. План: Stage 0 = groupByKey на orders (ShuffleDependency, HashPartitioner(200)); Stage 1 = groupByKey на customers (ShuffleDependency, HashPartitioner(200)); Stage 2 = join результатов Stage 0 и Stage 1 (ещё один ShuffleDependency, потому что join видит два отдельных RDD с результатами groupByKey). Stage 3 = ResultStage с финальными вычислениями. Причина лишнего Stage: groupByKey создаёт ShuffleDependency, а потом join тоже создаёт ShuffleDependency поверх — итого 3 shuffle вместо 2. Правильное решение: не делать groupByKey перед join, если join и так выполнит shuffle. Если нужен reduceByKey перед join — использовать co-partitioning: rdd_orders.partitionBy(HashPartitioner(200)) и rdd_customers.partitionBy(HashPartitioner(200)) сохранить через persist(). Тогда join увидит одинаковые partitioner у обоих RDD и не создаст новый shuffle. Итого: 2 Stage для partitionBy + 1 Stage для join = 3 Stage. Либо использовать DataFrame API с broadcast hint, если customers маленький — тогда вообще один Stage.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Пайплайн: rdd.map(f).filter(p).union(other_rdd).groupByKey(200).mapValues(g). Сколько Stage создаст DAGScheduler, и какая операция является границей Stage?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5