Learning Platform
Глоссарий Troubleshooting
Урок 17.01 · 18 мин
Продвинутый
GlossaryInternalsReferenceExecution ModelSchedulerShuffleMemoryCatalystStreaming

Глоссарий internals Spark

Этот справочник охватывает термины, необходимые для работы с кодовой базой Spark, чтения Spark UI, интерпретации explain() и диагностики production-проблем. Термины сгруппированы по подсистемам, а не по алфавиту: так проще понять, как они связаны между собой.


1. Модель выполнения и базовые абстракции

RDD (Resilient Distributed Dataset) Базовая абстракция Spark: неизменяемая, ленивая, распределённая коллекция записей. RDD описывает как вычислить данные (lineage), а не хранит сами данные. Живёт в пакете org.apache.spark.rdd.

Lineage (родословная) Граф зависимостей RDD, позволяющий восстановить потерянный раздел без глобального checkpoint. Хранится как DAG объектов Dependency. При потере партиции Spark поднимается по lineage и пересчитывает только нужную ветку.

Narrow dependency (узкая зависимость) Зависимость, при которой каждый раздел дочернего RDD зависит не более чем от одного раздела родительского. Примеры: map, filter, union. Позволяет объединять операции в одну стадию без shuffle.

Wide dependency (широкая зависимость) Зависимость, при которой раздел дочернего RDD зависит от нескольких разделов родительского. Возникает при groupByKey, join, repartition. Порождает границу стадии и shuffle.

Stage (стадия) Набор задач, которые можно выполнить без shuffle. Стадии разделяются wide dependencies. Различают ShuffleMapStage (пишет данные на диск для следующей стадии) и ResultStage (возвращает результат драйверу или записывает в sink).

Task (задача) Единица работы, выполняемая на одной партиции одного executor’а. Соответствует одному разделу выходных данных стадии. Сериализуется и отправляется с драйвера через TaskScheduler.

Job (задание) Полный граф вычислений от action-вызова (collect, save) до источников данных. Один вызов action порождает ровно один job. Разбивается на стадии DAGScheduler’ом.

Partition (партиция, раздел) Логический фрагмент данных, обрабатываемый одной задачей. Число партиций определяет параллелизм. Физически может соответствовать блоку HDFS, диапазону Kafka offset’ов или произвольному диапазону строк.

Partitioner Функция, отображающая ключ на номер раздела. Реализации в Spark: HashPartitioner (ключ % N) и RangePartitioner (выборочная сортировка для диапазонного разбиения). Сохраняется как метаданные RDD, позволяя Spark пропускать shuffle при join двух RDD с одним и тем же Partitioner.

Action (действие) Операция, материализующая результат RDD или DataFrame: collect, count, save, foreach. Инициирует submit job’а в SparkContext. Противопоставляется трансформациям (lazy).

Transformation (трансформация) Ленивая операция, строящая новый RDD/Dataset поверх существующего. Выполняется только при вызове action. Делятся на narrow (map, filter) и wide (groupBy, join).


2. Scheduler и планировщик задач

DAGScheduler Компонент драйвера, разбивающий job на стадии по wide dependency-границам и отправляющий их в TaskScheduler. Живёт в org.apache.spark.scheduler.DAGScheduler. Отвечает за fault tolerance на уровне стадий: при потере shuffle output перезапускает ShuffleMapStage.

TaskScheduler Компонент драйвера, принимающий TaskSet от DAGScheduler и назначающий задачи конкретным executor’ам с учётом locality preferences. Интерфейс TaskScheduler, реализация TaskSchedulerImpl. Работает поверх SchedulerBackend.

SchedulerBackend Интерфейс взаимодействия TaskScheduler с менеджером кластера (YARN, Kubernetes, Standalone). Реализации: YarnSchedulerBackend, KubernetesClusterSchedulerBackend. Отвечает за запрос executor’ов и передачу им задач через RPC.

TaskSet Набор задач одной стадии, переданный DAGScheduler в TaskScheduler за один раз. При повторном запуске стадии создаётся новый TaskSet с теми же задачами.

TaskSetManager Управляет очередью задач внутри одного TaskSet: отслеживает завершение, повторные запуски, locality уровни. Реализует алгоритм locality-aware scheduling.

Locality level (уровень локальности) Приоритет размещения задачи рядом с данными. Порядок убывающей предпочтительности: PROCESS_LOCAL (данные в JVM executor’а) -> NODE_LOCAL (данные на том же узле) -> RACK_LOCAL (тот же rack) -> ANY (любой узел). Spark ждёт spark.locality.wait перед понижением уровня.

Speculative execution (спекулятивное выполнение) Механизм запуска дублирующей копии медленной задачи на другом executor’е. Включается при spark.speculation=true. Задача считается медленной, если выполняется дольше медианного времени задач стадии, умноженного на spark.speculation.multiplier. Победитель отменяет проигравшего.

Dynamic allocation (динамическое выделение) Механизм добавления и освобождения executor’ов в зависимости от длины очереди задач. При наличии ожидающих задач запрашивает новые executor’ы; при простое idle executor’а дольше spark.dynamicAllocation.executorIdleTimeout — освобождает его. Требует External Shuffle Service для сохранения shuffle файлов после смерти executor’а.


3. Shuffle

Shuffle (перемешивание) Перераспределение данных между партициями через запись на диск и последующее чтение по сети. Самая дорогая операция в Spark: требует сериализации, записи map output, передачи по сети, десериализации на стороне reduce.

ShuffleManager Pluggable-интерфейс управления shuffle. Единственная реализация в Spark 4.x: SortShuffleManager. Прежний HashShuffleManager удалён в Spark 2.0.

SortShuffleManager Стандартный shuffle-менеджер Spark. Использует внешнюю сортировку (ExternalSorter): данные сортируются по partition ID, накапливаются в памяти, при переполнении spillируются на диск и сливаются в один файл на mapper. Результат: один .data-файл и один .index-файл на mapper.

Bypass merge-sort shuffle writer Оптимизированный путь внутри SortShuffleManager для случаев с малым числом output-партиций (менее spark.shuffle.sort.bypassMergeThreshold, по умолчанию 200) и без map-side aggregation. Каждая партиция пишется во временный файл, затем файлы конкатенируются. Позволяет избежать сортировки.

Serialized shuffle (UnsafeShuffleWriter) Оптимизированный shuffle-writer для случаев, когда сериализатор поддерживает relocation объектов (Kryo, Java-сериализатор). Работает с сериализованными двоичными записями напрямую, не десериализуя их при spill. Снижает GC-давление.

Map output (map output) Файлы, записанные mapper’ом на локальный диск executor’а. Адреса файлов регистрируются в MapOutputTracker. Reducer запрашивает эти адреса у MapOutputTracker и загружает нужные блоки по сети через BlockTransferService.

MapOutputTracker Компонент драйвера (MapOutputTrackerMaster) и executor’а (MapOutputTrackerWorker), хранящий информацию о расположении map output блоков. Драйверская часть авторитетна; worker’ы кэшируют результаты lookup’ов.

ExternalShuffleService (ESS) Отдельный демон на каждом узле кластера, хранящий shuffle файлы после завершения или смерти executor’а. Обязателен для Dynamic Allocation. В Kubernetes-режиме заменяется на Remote Shuffle Service или Push-based shuffle.

Push-based shuffle Механизм (Spark 3.2+), при котором mapper’ы проталкивают shuffle блоки на ESS заранее, до запуска reducer’ов. ESS мержит блоки одной партиции и отдаёт одним куском. Снижает число random IO при чтении и количество соединений. Управляется spark.shuffle.push.enabled.

Shuffle spill (сброс на диск) Запись накопленных в памяти данных shuffle на диск при нехватке памяти. Реализуется через ExternalSorter.spill(). Каждый spill создаёт временный файл; после завершения mapper’а все spill-файлы и данные в памяти мержатся в итоговый файл.

BlockTransferService Сервис передачи блоков данных между executor’ами по сети. Реализация: NettyBlockTransferService (на основе Netty). Используется как для shuffle fetch, так и для передачи broadcast-переменных и кэшированных блоков.


4. Память

Unified Memory Manager Менеджер памяти Spark (с версии 1.6): разделяет память executor’а на Execution Memory и Storage Memory с динамической границей между ними. Оба пула конкурируют за общую “unified” область. Живёт в org.apache.spark.memory.UnifiedMemoryManager.

Execution Memory Часть unified-пула, используемая для вычислений: hash tables join’ов, буферы сортировки, spill-буферы. При нехватке может вытеснять Storage Memory, если та не закреплена.

Storage Memory Часть unified-пула для кэшированных RDD/Dataset, broadcast-переменных и unroll-буферов. При нехватке execution memory может сжиматься путём выгрузки некритичных блоков из кэша.

spark.memory.fraction Доля heap JVM (после вычета Reserved Memory ~300 MB), отведённая под unified-пул. По умолчанию 0.6. Оставшиеся 0.4 отведены под пользовательские объекты (UDF, overhead).

spark.memory.storageFraction Доля unified-пула, которую Storage Memory защищает от вытеснения Execution Memory. По умолчанию 0.5. Это нижняя граница, но не фиксированное разделение: при свободной памяти оба пула могут использовать весь unified-объём.

Off-heap memory Память вне JVM heap, управляемая через sun.misc.Unsafe. Не облагается GC. Используется UnsafeRow, Tungsten-аллокатором. Активируется через spark.memory.offHeap.enabled=true с указанием spark.memory.offHeap.size.

MemoryConsumer Абстракция внутри TaskMemoryManager для компонентов, потребляющих память: ExternalSorter, BytesToBytesMap (hash join), ShuffleExternalSorter. Каждый MemoryConsumer реализует spill() для освобождения памяти по запросу.

TaskMemoryManager Управляет памятью одной задачи: выделяет и освобождает страницы, треккает потребителей. При нехватке памяти просит MemoryConsumer’ов выполнить spill.

MemoryMode Перечисление: ON_HEAP или OFF_HEAP. Определяет, откуда аллоцируется память для конкретного MemoryConsumer.

Reserved Memory Фиксированный резерв (~300 MB в Spark 3+), исключаемый из heap перед расчётом unified-пула. Гарантирует, что Spark-компоненты не конкурируют с unified-пулом.


5. Catalyst и Tungsten

Catalyst Optimizer Расширяемый оптимизатор запросов Spark SQL. Работает с деревьями (TreeNode) и правилами (Rule[T]). Применяет наборы правил итеративно до fixed point. Состоит из Analyzer, Optimizer, SparkPlanner.

Logical Plan Абстрактное дерево реляционных операторов (Scan, Filter, Project, Join, Aggregate) без привязки к физическим деталям выполнения. Проходит стадии Unresolved -> Analyzed -> Optimized.

Physical Plan (SparkPlan) Дерево физических операторов с конкретными стратегиями выполнения: SortMergeJoin, BroadcastHashJoin, HashAggregate. Порождается SparkPlanner из оптимизированного логического плана. Реализует метод execute(), возвращающий RDD[InternalRow].

Analyzer Фаза Catalyst, разрешающая имена таблиц и колонок через Catalog. Превращает UnresolvedRelation в конкретный LogicalRelation, UnresolvedAttribute в AttributeReference.

Optimizer Фаза Catalyst, применяющая RBO-правила (predicate pushdown, column pruning, constant folding) и CBO-правила (порядок join’ов при spark.sql.cbo.enabled). Работает с оптимизированным logical планом.

Rule-based optimization (RBO) Оптимизации, применяемые всегда независимо от статистик: предикатный pushdown, column pruning, constant folding, boolean simplification. Детерминированы и безопасны в любых условиях.

Cost-based optimization (CBO) Оптимизации, основанные на статистиках таблиц (размер, число строк, NDV колонок). Включают выбор порядка join’ов (ReorderJoin). Требует предварительного ANALYZE TABLE. Управляется spark.sql.cbo.enabled.

AQE (Adaptive Query Execution) Механизм переоптимизации плана в runtime на основе реальной статистики shuffle. Включён по умолчанию (Spark 3.2+). Три основные фичи: partition coalescing, dynamic join switching, skew join handling.

QueryStage Единица выполнения в AQE: фрагмент физического плана до shuffle-границы. После завершения стадии AQE собирает её статистики и переоптимизирует оставшийся план.

ExchangeCoordinator (устарел) Предшественник AQE в Spark 2.x для coalescing shuffle партиций. Заменён полноценным AQE framework’ом в Spark 3.0.

Tungsten Проект внутри Spark по ручному управлению памятью и кодогенерации. Включает: UnsafeRow (бинарный row-format), off-heap allocation, cache-friendly алгоритмы (hash map с linear probing), Whole-Stage CodeGen.

UnsafeRow Бинарный row-формат Tungsten: null bitmap + fixed-length fields + variable-length data в одном непрерывном байтовом буфере. Позволяет передавать строки без сериализации/десериализации между операторами и складывать в off-heap память. Поля доступны по fixed offset.

Whole-Stage CodeGen (WSCG) Оптимизация Tungsten, генерирующая единый Java-метод для цепочки операторов (обычно map+filter+project). Устраняет virtual dispatch между операторами и позволяет JIT компилятору оптимизировать весь pipeline. Управляется spark.sql.codegen.wholeStage.

InternalRow Абстрактный базовый класс строки внутри Spark SQL: UnsafeRow (off-heap бинарный), GenericInternalRow (JVM-объекты). Используется как единица обмена данными между физическими операторами.

SparkPlanner Преобразует оптимизированный LogicalPlan в SparkPlan, применяя Strategy объекты (наборы паттернов). Примеры стратегий: JoinSelection, BasicOperators, DataSourceStrategy.


6. Structured Streaming

Micro-batch execution Модель выполнения Structured Streaming: входящие данные разбиваются на маленькие batch’и фиксированного trigger-интервала. Каждый batch — полноценный Spark job. Латентность определяется размером batch. Управляется StreamExecution.

Continuous processing Альтернативная модель выполнения Structured Streaming (экспериментальная): каждая запись обрабатывается немедленно без накопления в batch. Поддерживает только простые stateless трансформации и map-like операции. Латентность — единицы миллисекунд.

StreamExecution Главный координатор streaming job’а: управляет trigger’ами, запрашивает новые данные у source, планирует и запускает batch’и, записывает offsets в checkpoint. Реализации: MicroBatchExecution и ContinuousExecution.

Offset (смещение) Позиция в источнике данных, определяющая, какие данные были обработаны. Конкретный тип зависит от источника: KafkaSourceOffset, FileStreamSourceOffset. Сохраняется в checkpoint для обеспечения exactly-once семантики.

Checkpoint (контрольная точка) Директория в надёжном хранилище (HDFS, S3, GCS), куда Structured Streaming записывает offset’ы, метаданные стадий и state store snapshots для восстановления после сбоя. Без checkpoint не гарантируется exactly-once.

State Store Хранилище состояния для stateful операций (windowed aggregation, stream-stream join, mapGroupsWithState). По умолчанию реализован как HDFSBackedStateStore (in-memory hash map + дельта-файлы на HDFS). Живёт по одному экземпляру на партицию.

Watermark (водяной знак) Оценка максимального отставания событий во времени. При достижении watermark W Spark гарантирует, что все события с event time <= W уже получены, и финализирует соответствующие окна. Управляется withWatermark().

Trigger (триггер) Настройка, определяющая периодичность запуска batch’ей в micro-batch режиме. Типы: ProcessingTime (фиксированный интервал), Once (один batch и стоп), AvailableNow (все накопленные данные за N batch’ей), Continuous (continuous processing).


7. Расширение движка

SparkSessionExtensions API для добавления пользовательских правил, функций и стратегий в Catalyst без форка Spark. Позволяет: инжектировать Rule[LogicalPlan], Rule[SparkPlan], Strategy, custom parsers, custom functions. Регистрируется через spark.withExtensions() или spark.sql.extensions.

DataSource V2 (DSv2) Современный API источников данных (FLIP-203): TableProvider, ScanBuilder, ReadPartition. Поддерживает filter pushdown, column pruning, partitioning pushdown на уровне API. Заменяет устаревший RelationProvider / BaseRelation.

Catalog Plugin API API регистрации внешних каталогов (Spark 3.0+): CatalogPlugin, TableCatalog, FunctionCatalog. Позволяет использовать Hive Metastore, Apache Iceberg catalog, Delta Lake catalog как первоклассные Spark-каталоги.

Exchange (обмен) Физический оператор Spark SQL, вставляемый Catalyst в план для обеспечения требуемого распределения данных: ShuffleExchangeExec (shuffle), BroadcastExchangeExec (broadcast join). AQE может переиспользовать Exchange (Exchange reuse), вставляя ReusedExchangeExec.

CustomShuffleReader Физический оператор AQE, оборачивающий ShuffleQueryStage и изменяющий число/размер читаемых партиций. Используется для coalescing (CoalesceShufflePartitions) и skew handling (OptimizeSkewedJoin).

Spark Connect RPC-протокол (Spark 3.4+) на основе gRPC/Protocol Buffers, разделяющий клиент и сервер. Клиентская библиотека отправляет сериализованные Unresolved Logical Plan’ы на сервер, который выполняет их и стримит результаты. Позволяет клиентам на Python/Go/Rust работать с Spark без JVM.

Проверка знанийKnowledge check
Чем ShuffleMapStage отличается от ResultStage, и почему это различие важно для fault tolerance?
ОтветAnswer
ShuffleMapStage производит промежуточные данные (map output) и записывает их на диск для следующей стадии. ResultStage непосредственно возвращает результат драйверу или записывает в sink. При потере shuffle output файлов (например, при смерти executor) DAGScheduler перезапускает ShuffleMapStage, но не ResultStage -- это ключевое различие. ResultStage перезапускается только при провале самой задачи, а не при потере данных downstream. Именно поэтому External Shuffle Service важен для Dynamic Allocation: он хранит map output после освобождения executor, предотвращая перезапуск ShuffleMapStage.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 2