Learning Platform
Глоссарий Troubleshooting
Урок 03.02 · 26 мин
Продвинутый
LineageFault ToleranceCheckpointRecoveryRDD DAG

Lineage и отказоустойчивость

Spark не хранит данные на диске по умолчанию — ни промежуточные результаты, ни shuffle-выходы после их потребления. При сбое executor-а данные исчезают. Тем не менее Spark восстанавливается без повтора всего Job. Механизм — lineage: каждый RDD помнит, из каких родителей и какой трансформацией он получен, и умеет пересчитать потерянную партицию.

Это архитектурный выбор с серьёзными trade-offs. Lineage даёт отказоустойчивость без репликации данных (сравни с Hadoop HDFS, который трижды реплицирует каждый блок), но делает стоимость восстановления зависимой от глубины и топологии графа. Senior data-инженер обязан понимать эту зависимость.

Что такое lineage graph

DAGScheduler в Spark 4.0 строит граф зависимостей, обходя цепочку RDD от конечного к источнику. Это не просто «граф зависимостей» — это инструкция восстановления. Если партиция p в RDD R потеряна, Spark смотрит на R.getDependencies(), находит родительский RDD и его партиции, от которых зависит p, и рекурсивно восстанавливает их.

Граф строится при вызове action и живёт на уровне Driver в DAGScheduler. На executor-ах хранится только текущий Task с замкнутой функцией — линейдж там не материализован.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Строим длинный lineage
raw = sc.textFile("hdfs:///logs/*.gz", minPartitions=200)
parsed = raw.map(parse_log_line)
filtered = parsed.filter(lambda r: r.level == "ERROR")
keyed = filtered.map(lambda r: (r.service, 1))
counted = keyed.reduceByKey(lambda a, b: a + b)

# Посмотрим на lineage
print(counted.toDebugString().decode())

Вывод toDebugString покажет вложенную структуру с отступами: каждый уровень отступа — это один RDD в lineage. Символ (200) указывает число партиций:

(200) PythonRDD[6] at RDD at PythonRDD.scala:53 []
 |    MapPartitionsRDD[5] at reduceByKey ...
 |    ShuffledRDD[4] at reduceByKey ...     <-- shuffle boundary
 +-(200) PythonRDD[3] at RDD at PythonRDD.scala:53 []
    |    MapPartitionsRDD[2] at filter ...
    |    MapPartitionsRDD[1] at map ...
    |    HadoopRDD[0] at textFile ...

ShuffledRDD — это граница Stage. До неё — одна Stage (Stage 0), после — другая (Stage 1).

Механизм восстановления при сбое executor-а

Когда executor падает в середине Stage, DAGScheduler получает FetchFailed или ExecutorLossReason. Он помечает все Tasks этого executor-а как FAILED и планирует их повтор. Число повторов контролируется spark.task.maxFailures (default 4 в Spark 4.0, был 4 и в Spark 3.x).

Повтор Task означает вызов compute() заново на другом executor-е. Поскольку compute() идёт по lineage, он рекурсивно вытягивает нужные партиции родительских RDD. Если родительские партиции уже в памяти (MEMORY_ONLY) — берёт оттуда. Если нет — снова вычисляет через compute() ещё выше по lineage.

Driver / DAGScheduler
Executor 1 (fallen)
Executor 2
Executor 3
Task 42 (Stage 1, Partition 42)FetchFailed / ExecLostMark Task 42 FAILED, resubmitResubmit Stage 0 tasks (если потеряны)Stage 0 tasks completeTask 42 retry (attempt 2)Task 42 SUCCESS

Narrow зависимости: дешёвое восстановление

При NarrowDependency каждая выходная партиция зависит от одной (или фиксированного числа) входных партиций. Восстановление партиции p требует пересчёта только одной «ветви» lineage — от исходного RDD до p.

Источник:     P0  P1  P2  P3  P4 ... P199
                |   |   |   |   |
  map:          P0  P1  P2  P3  P4 ... P199
                |   |   |   |   |
  filter:       P0  P1  P2  P3  P4 ... P199
                |       |
                P0  X  P2  <- Executor упал, P1 потеряна

Для восстановления filter P1 нужен только map P1, а для map P1 — только source P1. Остальные 199 партиций не затронуты. Это O(lineage_depth) пересчёт одной ветки — дёшево.

Примеры операций с narrow зависимостью: map, flatMap, filter, mapPartitions, sample, union, coalesce (без shuffle).

Wide зависимости: дорогое восстановление

При ShuffleDependency каждая выходная партиция зависит от всех входных партиций — она содержит данные из каждой из них. Для восстановления одной выходной партиции нужны shuffle-данные от всех M map-tasks.

Если map-side executor пал после завершения Stage 0 (shuffle write) — его shuffle files недоступны. FetchFailed при чтении заставляет DAGScheduler переосмыслить план: возможно, нужно пересчитать часть Stage 0.

Stage 0 (map-side):   P0  P1  P2  P3  ... P199
Shuffle write:     ↓   ↓   ↓   ↓           ↓
Reduce task P0 читает: [bucket из P0] + [bucket из P1] + ... + [bucket из P199]
           = данные от ВСЕХ P0..P199 Stage 0

Если P42 Stage 0 потеряна — P0 reduce не может завершиться
Нужно пересчитать P42 Stage 0 (одну map-task)

В Spark 4.0 при FetchFailed DAGScheduler реализует fine-grained recomputation: пересчитывает только те партиции Stage 0, shuffle output которых недоступен (через MapOutputTracker). До Spark 3.0 пересчитывалась вся Stage 0. Это значительное улучшение для больших shuffle.

Стоимость восстановления: narrow vs wide
Narrow: восстановить P3OneToOneDependency. P3 зависит только от P3 родителя. Пересчёт: одна цепочка compute() вверх по lineage. O(depth) - очень дёшево.
Пересчитываем: 1 веткуsrc.P3 -> map.P3 -> filter.P3 -> result.P3. 3 вызова compute(). Остальные 199 партиций не трогаем.
Wide: восстановить P3ShuffleDependency. P3 reduce-партиция зависит от bucket-3 в каждой из 200 map-партиций. Если map-executor упал — нужно пересчитать его map-партиции.
Пересчитываем: все map-partitions с потерямиSpark 4.0: fine-grained — только потерянные map partitions. До Spark 3.0: вся предыдущая Stage. Стоимость пропорциональна числу потерянных map tasks.

RDD Checkpointing: обрезаем lineage

Если lineage очень длинный (итеративные алгоритмы — ML, GraphX) или содержит широкие зависимости в нескольких местах, стоимость восстановления становится неприемлемой. Для этого существует checkpoint — механизм материализации RDD в надёжное хранилище с обрезкой lineage.

Spark 4.0 поддерживает два вида checkpoint:

Reliable checkpoint (в HDFS или S3)

sc.setCheckpointDir("hdfs:///spark-checkpoints/my-job")

iterative_rdd = start_rdd
for i in range(100):  # 100 итераций PageRank
    iterative_rdd = iterative_rdd.map(pagerank_step)
    if i % 10 == 0:
        # Материализуем в HDFS, обрезаем lineage
        iterative_rdd.checkpoint()
        iterative_rdd.count()  # ВАЖНО: нужно вызвать action!

checkpoint() только помечает RDD как «должен быть checkpointed». Фактическая запись происходит при первом action после вызова checkpoint(). Поэтому паттерн rdd.checkpoint(); rdd.count() — обязательный: иначе следующий action пересчитает lineage дважды (один раз для checkpoint, один раз для результата), что вдвое дороже.

После успешного checkpoint в hdfs:///spark-checkpoints/ создаётся директория с Parquet/sequence файлами партиций. Поле rdd.dependencies заменяется на ReliableCheckpointRDD — новый RDD, который читает из HDFS вместо пересчёта lineage.

Local checkpoint (в памяти executor-а)

# Более быстрый, но не надёжный
iterative_rdd = iterative_rdd.localCheckpoint()
iterative_rdd.count()

localCheckpoint() сохраняет партиции в памяти/диске executor-ов (через StorageLevel.MEMORY_AND_DISK). Это не гарантирует надёжность — при сбое executor-а checkpoint потеряется и Spark снова обратится к lineage. Зато значительно быстрее, чем запись в HDFS. Подходит для коротких итеративных алгоритмов, где полная надёжность не нужна.

Реализован через LocalRDDCheckpointData в core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala.

WARNING

Типичная ошибка с checkpoint: вызов rdd.checkpoint() без последующего action. В этом случае при следующем action над этим RDD Spark вычислит lineage дважды: первый раз — для записи checkpoint, второй — для получения результата. Это практически удваивает время выполнения. Всегда делай rdd.checkpoint() -> rdd.count() (или любой другой action) -> затем используй rdd дальше.

Когда checkpoint необходим: производственные сценарии

Сценарий 1: Итеративные ML-алгоритмы

MLlib реализует алгоритмы (ALS, LDA, LogisticRegression с SGD) через итеративные операции над RDD. После 50-100 итераций lineage становится настолько длинным, что при сбое пересчёт занимает часы. MLlib автоматически checkpoint-ит RDD каждые checkpointInterval итераций (default 10 для большинства алгоритмов):

// Внутри MLlib (ALS.scala):
if (sc.getCheckpointDir.isDefined && (iter % checkpointInterval == 0)) {
  factors.checkpoint()
}

Сценарий 2: Каскады wide зависимостей

Если пайплайн содержит несколько join или groupByKey, каждый из которых создаёт ShuffleDependency, то при сбое executor-а в последней стадии Spark может пересчитать несколько предыдущих стадий. Checkpoint после дорогого join «фиксирует» результат и обрезает lineage:

# Дорогой многошаговый пайплайн
step1 = raw.groupByKey(200)         # ShuffleDependency
step2 = step1.join(reference, 200)  # ShuffleDependency
step3 = step2.map(enrich)           # NarrowDependency

# Checkpoint после дорогих широких операций
sc.setCheckpointDir("hdfs:///checkpoints")
step2.checkpoint()
step2.count()  # материализуем

# Теперь step3.lineage начинается от HDFS, не от raw
result = step3.filter(is_valid).reduceByKey(merge)

Сценарий 3: Длинные streaming-сессии

Spark Streaming (DStream) автоматически checkpoint-ит metadata каждые checkpointInterval батчей. Без этого при рестарте стриминг-приложения невозможно восстановить состояние (offset Kafka, состояние updateStateByKey).

persist() vs checkpoint(): важное различие

persist(StorageLevel) и checkpoint() — разные механизмы с разными гарантиями:

Аспектpersist()checkpoint()
Где хранитсяПамять / диск executorHDFS / S3 (reliable) или executor (local)
Обрезает lineageНетДа
Гарантии при сбое executorНет (репересчёт через lineage)Да (reliable checkpoint)
Скорость первого вычисленияТакое же (вычисляет при первом action)Дополнительный pass (запись в HDFS)
ИспользованиеПовторное использование RDDИтеративные алгоритмы, длинный lineage

persist() кешируют данные, но lineage остаётся нетронутым. Если executor с кешем падает, Spark пересчитывает через lineage. checkpoint() (reliable) убирает lineage полностью — после checkpoint Spark не знает, как данные были получены, только где они лежат.

Из-за этого важен порядок: rdd.persist().checkpoint(). Сначала persist в памяти, потом checkpoint в HDFS. Тогда checkpoint читает из памяти (быстро), а не пересчитывает через lineage.

MapOutputTracker и локация shuffle-данных

Когда Stage завершается, каждый map-Task записывает shuffle-данные на локальный диск executor-а и регистрирует их локацию в MapOutputTracker (Driver-компонент). При FetchFailed DAGScheduler запрашивает у MapOutputTracker, какие map-outputs потеряны, и перезапускает именно их.

MapOutputTrackerMaster (Driver):
  stageId=0 -> mapId=42 -> BlockManagerId(executor-1, host1, port)
              mapId=43 -> BlockManagerId(executor-2, host2, port)
              ...

При падении executor-1:
  Все map outputs executor-1 помечаются недоступными
  DAGScheduler resubmit только потерянные map tasks (Spark 4.0 fine-grained recovery)

До Spark 3.0 потеря одного executor в Stage 0 приводила к полному resubmit Stage 0. В Spark 3.1+ (SPARK-23243) реализован partial recomputation: только потерянные partition из предыдущей Stage пересчитываются.

Попробуй сам

Эксперимент с длинным lineage и checkpoint:

from pyspark import SparkContext
import time

sc = SparkContext.getOrCreate()
sc.setCheckpointDir("/tmp/spark-checkpoints")

# Создаём намеренно длинный lineage (50 итераций map)
rdd = sc.parallelize(range(1000), 10)
for i in range(50):
    rdd = rdd.map(lambda x: x + 1)

print("Lineage без checkpoint:")
print(rdd.toDebugString().decode()[:500])  # Первые 500 символов
# Увидим 50+ уровней вложенности

# Теперь с checkpoint каждые 10 итераций
rdd2 = sc.parallelize(range(1000), 10)
for i in range(50):
    rdd2 = rdd2.map(lambda x: x + 1)
    if (i + 1) % 10 == 0:
        rdd2.checkpoint()
        rdd2.count()  # Материализация checkpoint

print("\nLineage с checkpoint каждые 10 итераций:")
print(rdd2.toDebugString().decode()[:500])
# Lineage начнётся от последнего checkpoint, не от parallelize

# Сравним время вычисления после "потери" (simulate через unpersist)
start = time.time()
result1 = rdd.sum()
print(f"\nrdd.sum() без checkpoint: {time.time() - start:.2f}s")

start = time.time()
result2 = rdd2.sum()
print(f"rdd2.sum() с checkpoint: {time.time() - start:.2f}s")
# Второй должен быть заметно быстрее (читает из /tmp вместо пересчёта)
Проверка знанийKnowledge check
Production-сценарий: ваш Spark-Job делает groupByKey(500) -> map(enrich) -> join(reference, 500) -> reduceByKey(500) на 100GB данных. В конце Stage 3 (reduceByKey) один executor падает при 80% завершённости. Опишите: что именно пересчитает Spark (Spark 4.0), и как правильно расставить checkpoint, чтобы минимизировать стоимость пересчёта при следующем сбое.
ОтветAnswer
При сбое executor в Stage 3 (reduceByKey): Spark помечает потерянные Tasks Stage 3 как FAILED. DAGScheduler проверяет MapOutputTracker — у потерянного executor были map-outputs Stage 3 (shuffle write для reduceByKey). Эти outputs недоступны. Fine-grained recomputation (Spark 4.0): пересчитываются только потерянные partitions Stage 3 (не вся stage). Но для пересчёта partition в Stage 3 нужны shuffle-данные Stage 2 (join output). Если executor Stage 2 жив — читает оттуда. Если тоже упал — DAGScheduler смотрит выше. В итоге: пересчёт может каскадироваться до Stage 0 (groupByKey). Правильная расстановка checkpoint: (1) После groupByKey: rdd_grouped.persist(MEMORY_AND_DISK).checkpoint(); rdd_grouped.count() — этот RDD вычислительно дорогой (широкая зависимость) и используется дальше. (2) После join: rdd_joined.persist(MEMORY_AND_DISK).checkpoint(); rdd_joined.count() — join — самая дорогая операция, его результат стоит фиксировать. С этими двумя checkpoint при сбое в Stage 3 Spark пересчитает только потерянные reduce-partitions, читая join-результат из HDFS checkpoint — без каскадного подъёма к Stage 0. Экономия: вместо пересчёта 3 дорогих шаффлов — только один partial recompute Stage 3.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Spark 4.0 Job: Stage 0 (200 map tasks) -> shuffle -> Stage 1 (200 reduce tasks). Stage 1 завершена на 90% (180 tasks). Executor с 5 map-outputs Stage 0 падает. Что именно пересчитает Spark при fine-grained recomputation?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5