Learning Platform
Глоссарий Troubleshooting
Урок 15.01 · 28 мин
Продвинутый
Source CodeRepository StructureDAGSchedulerCatalystNavigation

Тур по кодовой базе Spark

Читать исходники большого распределённого движка — навык, который резко отличает senior-инженера от того, кто работает только по документации. Документация запаздывает, описывает happy path и умалчивает о corner cases. Исходный код — это единственная истина о том, что происходит внутри. Этот урок — практический путеводитель по репозиторию apache/spark на базе Spark 4.0.


Верхнеуровневая структура репозитория

Репозиторий организован в модули первого уровня. Каждый модуль — это отдельный Maven/SBT-проект со своим pom.xml. После клонирования (git clone https://github.com/apache/spark) вы увидите:

apache/spark/
├── core/               # Spark Core: RDD, scheduler, executor, shuffle
├── sql/                # SQL/DataFrame движок (подмодули внутри)
│   ├── api/            # Публичный SQL API: SparkSession, DataFrame, Dataset
│   ├── catalyst/       # Оптимизатор: парсер, анализатор, оптимизатор, planner
│   ├── core/           # Физическое выполнение SQL: физические операторы, кодогенерация
│   └── hive/           # Hive-интеграция (Hive metastore, HiveQL)
├── streaming/          # Structured Streaming runtime
├── mllib/              # MLlib: базовые алгоритмы и pipeline API
├── connect/            # Spark Connect: gRPC-протокол клиент/сервер
├── resource-managers/  # Интеграции с YARN, Kubernetes, Mesos
│   ├── yarn/
│   └── kubernetes/
├── common/             # Общие утилиты, используемые несколькими модулями
│   ├── network-common/ # Netty-based транспортный слой
│   ├── unsafe/         # sun.misc.Unsafe обёртки, off-heap операции
│   └── sketch/         # вероятностные структуры данных (Count-Min Sketch, HLL)
├── python/             # PySpark: py4j bridge, pyspark.* пакеты
├── R/                  # SparkR
├── graphx/             # Graph processing
├── launcher/           # SparkLauncher API для программного запуска приложений
├── repl/               # spark-shell REPL
├── tools/              # spark-class, spark-submit runner scripts
├── bin/                # Исполняемые скрипты (spark-submit, spark-shell, spark-sql)
├── sbin/               # Сервисные скрипты (start-master.sh, start-worker.sh)
├── dev/                # Инструменты разработки: make-distribution.sh, lint, тесты CI
└── build/              # Embedded Maven wrapper (build/mvn)

Важная деталь Spark 4.0: Scala 2.12 удалена, всё написано на Scala 2.13. Соответственно, артефакты называются spark-core_2.13, spark-sql_2.13 и т.д.

Граф зависимостей модулей Spark 4.0

connect

spark-connect: gRPC-сервер Spark Connect. Принимает запросы от тонких клиентов (Python, Scala). Зависит от sql/api, sql/core.

mllib

spark-mllib: алгоритмы ML и pipeline API. Зависит от spark-core, sql/core.

streaming

spark-streaming: Structured Streaming runtime. Зависит от sql/core для микро-batch исполнения.
зависят от

sql/core

sql/core: физические операторы (SortMergeJoin, HashAgg, FileScan), whole-stage codegen, execution engine. Это основной execution layer поверх spark-core.

sql/api

sql/api: публичный API — SparkSession, DataFrame, Dataset, Column, Row. Минимум зависимостей, стабильный ABI.
зависит от

sql/catalyst

sql/catalyst: ядро оптимизатора. TreeNode, LogicalPlan, Expression, Rule, RuleExecutor, SparkPlanner. Не зависит от spark-core — чистая оптимизационная логика.
зависит от

core

spark-core: фундамент. RDD, DAGScheduler, TaskScheduler, Executor, BlockManager, ShuffleManager, SparkContext, SparkEnv. Самый большой модуль (~200K строк Scala).
зависит от

network-common

network-common: Netty-based RPC и streaming transport. Используется для driver-executor коммуникации, shuffle fetch, block transfer.

unsafe

unsafe: обёртки над sun.misc.Unsafe для off-heap операций. Platform.allocateMemory, MemoryBlock, UnsafeRow. Нет зависимостей от core.

Модуль core: сердце движка

core/src/main/scala/org/apache/spark/ — это самая важная директория для понимания рантайма. Навигация:

core/src/main/scala/org/apache/spark/
├── scheduler/
│   ├── DAGScheduler.scala       # ~2500 строк. Превращает RDD-граф в stages и TaskSets
│   ├── TaskScheduler.scala      # Интерфейс: submitTasks, killTask, executorLost
│   ├── TaskSchedulerImpl.scala  # Реализация: locality levels, speculation, blacklist
│   ├── TaskSetManager.scala     # Управление одним TaskSet: retry, locality-aware назначение
│   ├── Stage.scala              # Абстракция Stage (ShuffleMapStage, ResultStage)
│   ├── ShuffleMapStage.scala    # Stage с shuffle output
│   ├── ResultStage.scala        # Финальная Stage с action
│   ├── Job.scala                # Job = action + callback
│   └── SchedulerBackend.scala   # Интерфейс к cluster manager (YARN, K8s, Standalone)
├── executor/
│   ├── Executor.scala           # Worker process: запускает TaskRunner threads
│   ├── TaskRunner.scala         # Выполняет одну Task: десериализация, запуск, метрики
│   └── CoarseGrainedExecutorBackend.scala  # RPC-endpoint executor'а
├── storage/
│   ├── BlockManager.scala       # Управление блоками: RDD cache, broadcast, shuffle
│   ├── BlockManagerMaster.scala # Driver-side: реестр всех BlockManager'ов
│   ├── DiskStore.scala          # Запись/чтение блоков на диск
│   └── MemoryStore.scala        # In-memory хранение блоков (heap и off-heap)
├── shuffle/
│   ├── ShuffleManager.scala     # Интерфейс (SortShuffleManager по умолчанию)
│   ├── sort/SortShuffleManager.scala   # sort-based shuffle: IndexShuffleBlockResolver
│   └── BlockStoreShuffleReader.scala   # Читает shuffle blocks с executor'ов
├── memory/
│   ├── MemoryManager.scala      # Абстракция: execution vs storage memory
│   ├── UnifiedMemoryManager.scala  # Дефолтная реализация: unified pool с кражей памяти
│   └── TaskMemoryManager.scala  # Управляет памятью одной Task (spill логика)
└── rpc/
    ├── RpcEnv.scala             # Абстракция RPC environment
    └── netty/NettyRpcEnv.scala  # Netty-реализация: inbox/outbox, dispatcher

Центральный класс всего рантайма — DAGScheduler.scala. Его метод handleJobSubmitted — точка входа для любого action. Стоит прочесть хотя бы сигнатуры методов: submitStage, submitMissingTasks, handleTaskCompletion, handleExecutorLost. Это скелет всей логики отказоустойчивости.


Модуль sql/catalyst: мозг оптимизатора

sql/catalyst — чистый оптимизационный движок без зависимостей от рантайма Spark. Структура:

sql/catalyst/src/main/scala/org/apache/spark/sql/
├── catalyst/
│   ├── trees/
│   │   ├── TreeNode.scala       # Базовый класс всех AST-узлов: transform, collect, mapChildren
│   │   └── TreePattern.scala    # Enum паттернов для быстрой фильтрации в transform
│   ├── plans/
│   │   ├── logical/
│   │   │   ├── LogicalPlan.scala          # Абстрактный логический план
│   │   │   ├── basicLogicalOperators.scala # Filter, Project, Join, Aggregate, Union...
│   │   │   └── object/                    # Dataset API операторы
│   │   └── physical/
│   │       └── SparkPlan.scala            # Абстрактный физический план
│   ├── expressions/
│   │   ├── Expression.scala     # Базовый класс выражений (eval, genCode)
│   │   ├── arithmetic.scala     # Add, Subtract, Multiply...
│   │   ├── predicates.scala     # And, Or, Not, EqualTo, In...
│   │   ├── aggregate/           # Sum, Count, Max, First...
│   │   └── codegen/             # ExprCode, CodegenContext
│   ├── analysis/
│   │   ├── Analyzer.scala       # Резолвит UnresolvedAttribute, UnresolvedRelation
│   │   ├── FunctionRegistry.scala  # Реестр встроенных и UDF функций
│   │   └── Catalog.scala        # Абстракция метаданных
│   ├── optimizer/
│   │   ├── Optimizer.scala      # RuleExecutor с батчами правил (итеративно до fixed point)
│   │   ├── expressions.scala    # ConstantFolding, SimplifyBooleanExpressions...
│   │   ├── joins.scala          # ReorderJoin, EliminateOuterJoin, PushdownLeftSemiAntiJoin
│   │   └── subquery.scala       # PullupCorrelatedPredicates, RewriteCorrelatedScalarSubquery
│   └── rules/
│       └── RuleExecutor.scala   # Движок применения правил: runBatches, fixed-point iteration

Как работает RuleExecutor: он берёт список Batch-объектов (каждый содержит список Rule). Для каждого батча он применяет все правила по кругу, пока план не перестанет меняться или не будет достигнут maxIterations. Именно поэтому Catalyst называют fixed-point optimizer.

Структура sql/catalyst: путь запроса

Parser

Parser (AbstractSqlParser): ANTLR4-грамматика sql/catalyst/src/main/antlr4/. Превращает SQL-строку в UnresolvedLogicalPlan с UnresolvedRelation, UnresolvedAttribute.

Analyzer

Analyzer (Analyzer.scala): ResolvesRelations, ResolvesReferences, ResolveAggAliasInGroupBy... Около 50 правил в нескольких батчах. Использует SessionCatalog для резолвинга таблиц.

Optimizer

Optimizer (Optimizer.scala): наследует RuleExecutor[LogicalPlan]. Около 100+ правил в 10 батчах. ConstantFolding, PushDownPredicate, ColumnPruning, ReorderJoin (CBO).

SparkPlanner

SparkPlanner (sql/core): стратегии выбора физических операторов. FileSourceStrategy, JoinSelection, Aggregation, BasicOperators. Выдаёт SparkPlan.

Модуль sql/core: физическое выполнение

sql/core содержит физические операторы и инфраструктуру выполнения:

sql/core/src/main/scala/org/apache/spark/sql/
├── execution/
│   ├── SparkStrategies.scala    # Набор стратегий: FileSourceStrategy, JoinSelection...
│   ├── joins/
│   │   ├── SortMergeJoinExec.scala       # Sort-merge join физический оператор
│   │   ├── BroadcastHashJoinExec.scala   # Broadcast hash join
│   │   └── ShuffledHashJoinExec.scala    # Shuffled hash join
│   ├── aggregate/
│   │   ├── HashAggregateExec.scala       # Hash-based aggregation (Stage 1)
│   │   └── SortAggregateExec.scala       # Sort-based fallback
│   ├── exchange/
│   │   ├── ShuffleExchangeExec.scala     # Shuffle: создаёт shuffle boundaries
│   │   └── BroadcastExchangeExec.scala   # Broadcast: собирает данные и рассылает
│   ├── datasources/
│   │   ├── FileSourceScanExec.scala      # Чтение файлов: Parquet, ORC, CSV...
│   │   └── v2/                           # DataSource V2 API
│   └── WholeStageCodegenExec.scala       # Whole-stage codegen контейнер
├── catalyst/
│   └── optimizer/
│       └── SparkOptimizer.scala # Spark-специфичные правила поверх Catalyst Optimizer
└── SparkSession.scala           # Публичная точка входа: sql(), read, createDataFrame

Важный паттерн: WholeStageCodegenExec оборачивает цепочку CodegenSupport-операторов. Метод doCodeGen() рекурсивно обходит дерево и генерирует один Java-метод. Это объясняет, почему *(1) в выводе explain() охватывает несколько операторов.


Где искать конкретные классы

Вот конкретные цепочки поиска для типичных задач:

Задача: понять, как Spark решает, делать BroadcastHashJoin или SortMergeJoin.

Файл: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala, объект JoinSelection. Логика: если одна сторона меньше spark.sql.autoBroadcastJoinThreshold (дефолт 10 МБ) — BroadcastHashJoinExec. Если нет hint и стороны большие — SortMergeJoinExec.

Задача: понять, как применяется predicate pushdown.

Файл: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala, правило PushDownPredicates. И для datasource: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala, PushPredicateThroughNonJoin.

Задача: понять, как TaskSchedulerImpl выбирает locality level.

Файл: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala, метод resourceOfferSingleTaskSet. Порядок: PROCESS_LOCAL -> NODE_LOCAL -> RACK_LOCAL -> ANY. Ожидание между уровнями задаётся spark.locality.wait (дефолт 3 секунды).

Задача: понять, как UnifiedMemoryManager крадёт execution memory у storage.

Файл: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala, метод acquireExecutionMemory. Он вызывает evictBlocksToFreeSpace на MemoryStore, вытесняя наименее недавно используемые cached RDD-блоки.


Навигация: практические инструменты

Читать код Spark в браузере через github.com — мучение. Есть лучшие варианты.

IntelliJ IDEA (рекомендуется для Scala):

# Генерация IntelliJ проекта
./build/mvn -DskipTests -Pideaproj idea:idea

# Или через SBT (быстрее для навигации):
./build/sbt "project core" gen-idea

После открытия в IDEA — Ctrl+N (Navigate -> Class) позволяет прыгать прямо к DAGScheduler, SortMergeJoinExec и т.д. Ctrl+B (Go to Declaration) работает по всему мультимодульному проекту. Alt+F7 (Find Usages) показывает все места, где используется метод.

Поиск по исходникам:

# Найти все реализации ShuffleManager:
grep -r "extends ShuffleManager" core/src/main/scala/

# Найти все места, где submitStage вызывается:
grep -rn "submitStage(" core/src/main/scala/

# Найти все Rule[LogicalPlan]:
find sql/catalyst/src/main/scala -name "*.scala" \
    -exec grep -l "extends Rule\[LogicalPlan\]" {} \;

Чтение истории изменений конкретного класса:

git log --oneline -- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | head -20

Это показывает последние JIRA-номера, по которым можно найти контекст изменений.


Тесты как документация

Одно из лучших мест для понимания поведения движка — тесты. Они содержат конкретные сценарии с ожидаемым поведением.

core/src/test/scala/org/apache/spark/scheduler/
├── DAGSchedulerSuite.scala      # ~4000 строк. Тест на каждый corner case DAGScheduler
└── TaskSchedulerImplSuite.scala # Locality levels, blacklisting, speculation

sql/core/src/test/scala/org/apache/spark/sql/execution/
├── joins/SortMergeJoinSuite.scala
└── adaptive/AdaptiveQueryExecSuite.scala  # AQE: тесты skew join, dynamic coalesce

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/
├── FilterPushdownSuite.scala
└── JoinReorderSuite.scala

DAGSchedulerSuite особенно ценен: в нём симулируются отказы executor’ов, retry stage’ов, fetch failures при shuffle. Читая тест, вы одновременно видите API и ожидаемое поведение при различных failure mode.


Структура Spark Connect (новое в 4.0)

Spark Connect — полноценный gRPC-сервер, позволяющий тонким клиентам подключаться к Spark без зависимости от всего Spark jar. В 4.0 это стандартный способ использования Spark:

connect/
├── common/src/main/protobuf/spark/connect/
│   ├── relations.proto          # Логические планы: Scan, Filter, Project, Join...
│   ├── expressions.proto        # Выражения: Literal, Attribute, Call...
│   └── commands.proto           # CreateDataFrame, WriteData...
├── server/src/main/scala/org/apache/spark/sql/connect/service/
│   ├── SparkConnectService.scala        # gRPC-сервис: executePlan, analyzePlan
│   └── SparkConnectStreamHandler.scala  # Стриминг результатов
└── client/jvm/src/main/scala/org/apache/spark/sql/connect/client/
    └── SparkConnectClient.scala         # Клиентская библиотека

Интересная особенность: .proto-файлы — это фактически спецификация протокола между клиентом и сервером. Если вы понимаете relations.proto, вы понимаете, как каждая DataFrame-операция транслируется в сетевой протокол.


Попробуй сам

1. Найди реализацию конкретного оптимизатора. Клонируй репозиторий, найди файл с правилом ColumnPruning в sql/catalyst. Прочитай метод apply. Убедись, что понимаешь: когда срабатывает это правило и что именно оно делает с планом.

2. Трассировка action до физического плана. Открой SparkSession.scala в IntelliJ. Пройди по вызовам: sql() -> Dataset.ofRows() -> QueryExecution -> optimizedPlan -> sparkPlan. Запомни, в каком классе начинается физическое планирование.

3. Прочти тест для понимания retry. Открой DAGSchedulerSuite.scala, найди тест с FetchFailed. Убедись, что понимаешь, почему при FetchFailed Spark повторяет не только упавшую stage, но и parent stage.

# Быстрая проверка через grep:
grep -n "FetchFailed" \
  core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | head -10
Проверка знанийKnowledge check
Вы видите в production, что Spark-приложение крашится с ошибкой в соединении двух больших таблиц. Вы хотите понять, почему Spark выбрал SortMergeJoin вместо ShuffledHashJoin. В каком конкретном файле и классе искать логику этого решения? Каков дефолтный параметр, который контролирует выбор BroadcastHashJoin?
ОтветAnswer
Логика выбора между физическими стратегиями join находится в файле sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala, объект JoinSelection (extends Strategy). Именно там Spark смотрит на размер сторон join и hints (broadcast, merge, shuffle_hash). Для выбора BroadcastHashJoin параметр spark.sql.autoBroadcastJoinThreshold контролирует порог (дефолт 10 MB): если статистика одной стороны меньше этого значения, Spark выбирает BroadcastHashJoin. ShuffledHashJoin используется при определённых условиях (стороны не слишком большие, нет sort requirement). SortMergeJoin — безопасный fallback для произвольно больших входных данных. Если у вас нет статистик (ANALYZE TABLE не запущен) — Spark не может использовать CBO и при сомнении выбирает SortMergeJoin как наиболее предсказуемый по памяти.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Вы хотите понять, почему Spark для конкретного запроса выбрал SortMergeJoin вместо BroadcastHashJoin, хотя одна таблица кажется небольшой. В каком модуле и файле находится логика этого решения в репозитории apache/spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4