Lineage и отказоустойчивость
Spark не хранит данные на диске по умолчанию — ни промежуточные результаты, ни shuffle-выходы после их потребления. При сбое executor-а данные исчезают. Тем не менее Spark восстанавливается без повтора всего Job. Механизм — lineage: каждый RDD помнит, из каких родителей и какой трансформацией он получен, и умеет пересчитать потерянную партицию.
Это архитектурный выбор с серьёзными trade-offs. Lineage даёт отказоустойчивость без репликации данных (сравни с Hadoop HDFS, который трижды реплицирует каждый блок), но делает стоимость восстановления зависимой от глубины и топологии графа. Senior data-инженер обязан понимать эту зависимость.
Что такое lineage graph
DAGScheduler в Spark 4.0 строит граф зависимостей, обходя цепочку RDD от конечного к источнику. Это не просто «граф зависимостей» — это инструкция восстановления. Если партиция p в RDD R потеряна, Spark смотрит на R.getDependencies(), находит родительский RDD и его партиции, от которых зависит p, и рекурсивно восстанавливает их.
Граф строится при вызове action и живёт на уровне Driver в DAGScheduler. На executor-ах хранится только текущий Task с замкнутой функцией — линейдж там не материализован.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Строим длинный lineage
raw = sc.textFile("hdfs:///logs/*.gz", minPartitions=200)
parsed = raw.map(parse_log_line)
filtered = parsed.filter(lambda r: r.level == "ERROR")
keyed = filtered.map(lambda r: (r.service, 1))
counted = keyed.reduceByKey(lambda a, b: a + b)
# Посмотрим на lineage
print(counted.toDebugString().decode())
Вывод toDebugString покажет вложенную структуру с отступами: каждый уровень отступа — это один RDD в lineage. Символ (200) указывает число партиций:
(200) PythonRDD[6] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[5] at reduceByKey ...
| ShuffledRDD[4] at reduceByKey ... <-- shuffle boundary
+-(200) PythonRDD[3] at RDD at PythonRDD.scala:53 []
| MapPartitionsRDD[2] at filter ...
| MapPartitionsRDD[1] at map ...
| HadoopRDD[0] at textFile ...
ShuffledRDD — это граница Stage. До неё — одна Stage (Stage 0), после — другая (Stage 1).
Механизм восстановления при сбое executor-а
Когда executor падает в середине Stage, DAGScheduler получает FetchFailed или ExecutorLossReason. Он помечает все Tasks этого executor-а как FAILED и планирует их повтор. Число повторов контролируется spark.task.maxFailures (default 4 в Spark 4.0, был 4 и в Spark 3.x).
Повтор Task означает вызов compute() заново на другом executor-е. Поскольку compute() идёт по lineage, он рекурсивно вытягивает нужные партиции родительских RDD. Если родительские партиции уже в памяти (MEMORY_ONLY) — берёт оттуда. Если нет — снова вычисляет через compute() ещё выше по lineage.
Narrow зависимости: дешёвое восстановление
При NarrowDependency каждая выходная партиция зависит от одной (или фиксированного числа) входных партиций. Восстановление партиции p требует пересчёта только одной «ветви» lineage — от исходного RDD до p.
Источник: P0 P1 P2 P3 P4 ... P199
| | | | |
map: P0 P1 P2 P3 P4 ... P199
| | | | |
filter: P0 P1 P2 P3 P4 ... P199
| |
P0 X P2 <- Executor упал, P1 потеряна
Для восстановления filter P1 нужен только map P1, а для map P1 — только source P1. Остальные 199 партиций не затронуты. Это O(lineage_depth) пересчёт одной ветки — дёшево.
Примеры операций с narrow зависимостью: map, flatMap, filter, mapPartitions, sample, union, coalesce (без shuffle).
Wide зависимости: дорогое восстановление
При ShuffleDependency каждая выходная партиция зависит от всех входных партиций — она содержит данные из каждой из них. Для восстановления одной выходной партиции нужны shuffle-данные от всех M map-tasks.
Если map-side executor пал после завершения Stage 0 (shuffle write) — его shuffle files недоступны. FetchFailed при чтении заставляет DAGScheduler переосмыслить план: возможно, нужно пересчитать часть Stage 0.
Stage 0 (map-side): P0 P1 P2 P3 ... P199
Shuffle write: ↓ ↓ ↓ ↓ ↓
Reduce task P0 читает: [bucket из P0] + [bucket из P1] + ... + [bucket из P199]
= данные от ВСЕХ P0..P199 Stage 0
Если P42 Stage 0 потеряна — P0 reduce не может завершиться
Нужно пересчитать P42 Stage 0 (одну map-task)
В Spark 4.0 при FetchFailed DAGScheduler реализует fine-grained recomputation: пересчитывает только те партиции Stage 0, shuffle output которых недоступен (через MapOutputTracker). До Spark 3.0 пересчитывалась вся Stage 0. Это значительное улучшение для больших shuffle.
RDD Checkpointing: обрезаем lineage
Если lineage очень длинный (итеративные алгоритмы — ML, GraphX) или содержит широкие зависимости в нескольких местах, стоимость восстановления становится неприемлемой. Для этого существует checkpoint — механизм материализации RDD в надёжное хранилище с обрезкой lineage.
Spark 4.0 поддерживает два вида checkpoint:
Reliable checkpoint (в HDFS или S3)
sc.setCheckpointDir("hdfs:///spark-checkpoints/my-job")
iterative_rdd = start_rdd
for i in range(100): # 100 итераций PageRank
iterative_rdd = iterative_rdd.map(pagerank_step)
if i % 10 == 0:
# Материализуем в HDFS, обрезаем lineage
iterative_rdd.checkpoint()
iterative_rdd.count() # ВАЖНО: нужно вызвать action!
checkpoint() только помечает RDD как «должен быть checkpointed». Фактическая запись происходит при первом action после вызова checkpoint(). Поэтому паттерн rdd.checkpoint(); rdd.count() — обязательный: иначе следующий action пересчитает lineage дважды (один раз для checkpoint, один раз для результата), что вдвое дороже.
После успешного checkpoint в hdfs:///spark-checkpoints/ создаётся директория с Parquet/sequence файлами партиций. Поле rdd.dependencies заменяется на ReliableCheckpointRDD — новый RDD, который читает из HDFS вместо пересчёта lineage.
Local checkpoint (в памяти executor-а)
# Более быстрый, но не надёжный
iterative_rdd = iterative_rdd.localCheckpoint()
iterative_rdd.count()
localCheckpoint() сохраняет партиции в памяти/диске executor-ов (через StorageLevel.MEMORY_AND_DISK). Это не гарантирует надёжность — при сбое executor-а checkpoint потеряется и Spark снова обратится к lineage. Зато значительно быстрее, чем запись в HDFS. Подходит для коротких итеративных алгоритмов, где полная надёжность не нужна.
Реализован через LocalRDDCheckpointData в core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala.
Типичная ошибка с checkpoint: вызов rdd.checkpoint() без последующего action. В этом случае при следующем action над этим RDD Spark вычислит lineage дважды: первый раз — для записи checkpoint, второй — для получения результата. Это практически удваивает время выполнения. Всегда делай rdd.checkpoint() -> rdd.count() (или любой другой action) -> затем используй rdd дальше.
Когда checkpoint необходим: производственные сценарии
Сценарий 1: Итеративные ML-алгоритмы
MLlib реализует алгоритмы (ALS, LDA, LogisticRegression с SGD) через итеративные операции над RDD. После 50-100 итераций lineage становится настолько длинным, что при сбое пересчёт занимает часы. MLlib автоматически checkpoint-ит RDD каждые checkpointInterval итераций (default 10 для большинства алгоритмов):
// Внутри MLlib (ALS.scala):
if (sc.getCheckpointDir.isDefined && (iter % checkpointInterval == 0)) {
factors.checkpoint()
}
Сценарий 2: Каскады wide зависимостей
Если пайплайн содержит несколько join или groupByKey, каждый из которых создаёт ShuffleDependency, то при сбое executor-а в последней стадии Spark может пересчитать несколько предыдущих стадий. Checkpoint после дорогого join «фиксирует» результат и обрезает lineage:
# Дорогой многошаговый пайплайн
step1 = raw.groupByKey(200) # ShuffleDependency
step2 = step1.join(reference, 200) # ShuffleDependency
step3 = step2.map(enrich) # NarrowDependency
# Checkpoint после дорогих широких операций
sc.setCheckpointDir("hdfs:///checkpoints")
step2.checkpoint()
step2.count() # материализуем
# Теперь step3.lineage начинается от HDFS, не от raw
result = step3.filter(is_valid).reduceByKey(merge)
Сценарий 3: Длинные streaming-сессии
Spark Streaming (DStream) автоматически checkpoint-ит metadata каждые checkpointInterval батчей. Без этого при рестарте стриминг-приложения невозможно восстановить состояние (offset Kafka, состояние updateStateByKey).
persist() vs checkpoint(): важное различие
persist(StorageLevel) и checkpoint() — разные механизмы с разными гарантиями:
| Аспект | persist() | checkpoint() |
|---|---|---|
| Где хранится | Память / диск executor | HDFS / S3 (reliable) или executor (local) |
| Обрезает lineage | Нет | Да |
| Гарантии при сбое executor | Нет (репересчёт через lineage) | Да (reliable checkpoint) |
| Скорость первого вычисления | Такое же (вычисляет при первом action) | Дополнительный pass (запись в HDFS) |
| Использование | Повторное использование RDD | Итеративные алгоритмы, длинный lineage |
persist() кешируют данные, но lineage остаётся нетронутым. Если executor с кешем падает, Spark пересчитывает через lineage. checkpoint() (reliable) убирает lineage полностью — после checkpoint Spark не знает, как данные были получены, только где они лежат.
Из-за этого важен порядок: rdd.persist().checkpoint(). Сначала persist в памяти, потом checkpoint в HDFS. Тогда checkpoint читает из памяти (быстро), а не пересчитывает через lineage.
MapOutputTracker и локация shuffle-данных
Когда Stage завершается, каждый map-Task записывает shuffle-данные на локальный диск executor-а и регистрирует их локацию в MapOutputTracker (Driver-компонент). При FetchFailed DAGScheduler запрашивает у MapOutputTracker, какие map-outputs потеряны, и перезапускает именно их.
MapOutputTrackerMaster (Driver):
stageId=0 -> mapId=42 -> BlockManagerId(executor-1, host1, port)
mapId=43 -> BlockManagerId(executor-2, host2, port)
...
При падении executor-1:
Все map outputs executor-1 помечаются недоступными
DAGScheduler resubmit только потерянные map tasks (Spark 4.0 fine-grained recovery)
До Spark 3.0 потеря одного executor в Stage 0 приводила к полному resubmit Stage 0. В Spark 3.1+ (SPARK-23243) реализован partial recomputation: только потерянные partition из предыдущей Stage пересчитываются.
Попробуй сам
Эксперимент с длинным lineage и checkpoint:
from pyspark import SparkContext
import time
sc = SparkContext.getOrCreate()
sc.setCheckpointDir("/tmp/spark-checkpoints")
# Создаём намеренно длинный lineage (50 итераций map)
rdd = sc.parallelize(range(1000), 10)
for i in range(50):
rdd = rdd.map(lambda x: x + 1)
print("Lineage без checkpoint:")
print(rdd.toDebugString().decode()[:500]) # Первые 500 символов
# Увидим 50+ уровней вложенности
# Теперь с checkpoint каждые 10 итераций
rdd2 = sc.parallelize(range(1000), 10)
for i in range(50):
rdd2 = rdd2.map(lambda x: x + 1)
if (i + 1) % 10 == 0:
rdd2.checkpoint()
rdd2.count() # Материализация checkpoint
print("\nLineage с checkpoint каждые 10 итераций:")
print(rdd2.toDebugString().decode()[:500])
# Lineage начнётся от последнего checkpoint, не от parallelize
# Сравним время вычисления после "потери" (simulate через unpersist)
start = time.time()
result1 = rdd.sum()
print(f"\nrdd.sum() без checkpoint: {time.time() - start:.2f}s")
start = time.time()
result2 = rdd2.sum()
print(f"rdd2.sum() с checkpoint: {time.time() - start:.2f}s")
# Второй должен быть заметно быстрее (читает из /tmp вместо пересчёта)