Тур по кодовой базе Spark
Читать исходники большого распределённого движка — навык, который резко отличает senior-инженера от того, кто работает только по документации. Документация запаздывает, описывает happy path и умалчивает о corner cases. Исходный код — это единственная истина о том, что происходит внутри. Этот урок — практический путеводитель по репозиторию apache/spark на базе Spark 4.0.
Верхнеуровневая структура репозитория
Репозиторий организован в модули первого уровня. Каждый модуль — это отдельный Maven/SBT-проект со своим pom.xml. После клонирования (git clone https://github.com/apache/spark) вы увидите:
apache/spark/
├── core/ # Spark Core: RDD, scheduler, executor, shuffle
├── sql/ # SQL/DataFrame движок (подмодули внутри)
│ ├── api/ # Публичный SQL API: SparkSession, DataFrame, Dataset
│ ├── catalyst/ # Оптимизатор: парсер, анализатор, оптимизатор, planner
│ ├── core/ # Физическое выполнение SQL: физические операторы, кодогенерация
│ └── hive/ # Hive-интеграция (Hive metastore, HiveQL)
├── streaming/ # Structured Streaming runtime
├── mllib/ # MLlib: базовые алгоритмы и pipeline API
├── connect/ # Spark Connect: gRPC-протокол клиент/сервер
├── resource-managers/ # Интеграции с YARN, Kubernetes, Mesos
│ ├── yarn/
│ └── kubernetes/
├── common/ # Общие утилиты, используемые несколькими модулями
│ ├── network-common/ # Netty-based транспортный слой
│ ├── unsafe/ # sun.misc.Unsafe обёртки, off-heap операции
│ └── sketch/ # вероятностные структуры данных (Count-Min Sketch, HLL)
├── python/ # PySpark: py4j bridge, pyspark.* пакеты
├── R/ # SparkR
├── graphx/ # Graph processing
├── launcher/ # SparkLauncher API для программного запуска приложений
├── repl/ # spark-shell REPL
├── tools/ # spark-class, spark-submit runner scripts
├── bin/ # Исполняемые скрипты (spark-submit, spark-shell, spark-sql)
├── sbin/ # Сервисные скрипты (start-master.sh, start-worker.sh)
├── dev/ # Инструменты разработки: make-distribution.sh, lint, тесты CI
└── build/ # Embedded Maven wrapper (build/mvn)
Важная деталь Spark 4.0: Scala 2.12 удалена, всё написано на Scala 2.13. Соответственно, артефакты называются spark-core_2.13, spark-sql_2.13 и т.д.
connect
spark-connect: gRPC-сервер Spark Connect. Принимает запросы от тонких клиентов (Python, Scala). Зависит от sql/api, sql/core.mllib
spark-mllib: алгоритмы ML и pipeline API. Зависит от spark-core, sql/core.streaming
spark-streaming: Structured Streaming runtime. Зависит от sql/core для микро-batch исполнения.sql/core
sql/core: физические операторы (SortMergeJoin, HashAgg, FileScan), whole-stage codegen, execution engine. Это основной execution layer поверх spark-core.sql/api
sql/api: публичный API — SparkSession, DataFrame, Dataset, Column, Row. Минимум зависимостей, стабильный ABI.sql/catalyst
sql/catalyst: ядро оптимизатора. TreeNode, LogicalPlan, Expression, Rule, RuleExecutor, SparkPlanner. Не зависит от spark-core — чистая оптимизационная логика.core
spark-core: фундамент. RDD, DAGScheduler, TaskScheduler, Executor, BlockManager, ShuffleManager, SparkContext, SparkEnv. Самый большой модуль (~200K строк Scala).network-common
network-common: Netty-based RPC и streaming transport. Используется для driver-executor коммуникации, shuffle fetch, block transfer.unsafe
unsafe: обёртки над sun.misc.Unsafe для off-heap операций. Platform.allocateMemory, MemoryBlock, UnsafeRow. Нет зависимостей от core.Модуль core: сердце движка
core/src/main/scala/org/apache/spark/ — это самая важная директория для понимания рантайма. Навигация:
core/src/main/scala/org/apache/spark/
├── scheduler/
│ ├── DAGScheduler.scala # ~2500 строк. Превращает RDD-граф в stages и TaskSets
│ ├── TaskScheduler.scala # Интерфейс: submitTasks, killTask, executorLost
│ ├── TaskSchedulerImpl.scala # Реализация: locality levels, speculation, blacklist
│ ├── TaskSetManager.scala # Управление одним TaskSet: retry, locality-aware назначение
│ ├── Stage.scala # Абстракция Stage (ShuffleMapStage, ResultStage)
│ ├── ShuffleMapStage.scala # Stage с shuffle output
│ ├── ResultStage.scala # Финальная Stage с action
│ ├── Job.scala # Job = action + callback
│ └── SchedulerBackend.scala # Интерфейс к cluster manager (YARN, K8s, Standalone)
├── executor/
│ ├── Executor.scala # Worker process: запускает TaskRunner threads
│ ├── TaskRunner.scala # Выполняет одну Task: десериализация, запуск, метрики
│ └── CoarseGrainedExecutorBackend.scala # RPC-endpoint executor'а
├── storage/
│ ├── BlockManager.scala # Управление блоками: RDD cache, broadcast, shuffle
│ ├── BlockManagerMaster.scala # Driver-side: реестр всех BlockManager'ов
│ ├── DiskStore.scala # Запись/чтение блоков на диск
│ └── MemoryStore.scala # In-memory хранение блоков (heap и off-heap)
├── shuffle/
│ ├── ShuffleManager.scala # Интерфейс (SortShuffleManager по умолчанию)
│ ├── sort/SortShuffleManager.scala # sort-based shuffle: IndexShuffleBlockResolver
│ └── BlockStoreShuffleReader.scala # Читает shuffle blocks с executor'ов
├── memory/
│ ├── MemoryManager.scala # Абстракция: execution vs storage memory
│ ├── UnifiedMemoryManager.scala # Дефолтная реализация: unified pool с кражей памяти
│ └── TaskMemoryManager.scala # Управляет памятью одной Task (spill логика)
└── rpc/
├── RpcEnv.scala # Абстракция RPC environment
└── netty/NettyRpcEnv.scala # Netty-реализация: inbox/outbox, dispatcher
Центральный класс всего рантайма — DAGScheduler.scala. Его метод handleJobSubmitted — точка входа для любого action. Стоит прочесть хотя бы сигнатуры методов: submitStage, submitMissingTasks, handleTaskCompletion, handleExecutorLost. Это скелет всей логики отказоустойчивости.
Модуль sql/catalyst: мозг оптимизатора
sql/catalyst — чистый оптимизационный движок без зависимостей от рантайма Spark. Структура:
sql/catalyst/src/main/scala/org/apache/spark/sql/
├── catalyst/
│ ├── trees/
│ │ ├── TreeNode.scala # Базовый класс всех AST-узлов: transform, collect, mapChildren
│ │ └── TreePattern.scala # Enum паттернов для быстрой фильтрации в transform
│ ├── plans/
│ │ ├── logical/
│ │ │ ├── LogicalPlan.scala # Абстрактный логический план
│ │ │ ├── basicLogicalOperators.scala # Filter, Project, Join, Aggregate, Union...
│ │ │ └── object/ # Dataset API операторы
│ │ └── physical/
│ │ └── SparkPlan.scala # Абстрактный физический план
│ ├── expressions/
│ │ ├── Expression.scala # Базовый класс выражений (eval, genCode)
│ │ ├── arithmetic.scala # Add, Subtract, Multiply...
│ │ ├── predicates.scala # And, Or, Not, EqualTo, In...
│ │ ├── aggregate/ # Sum, Count, Max, First...
│ │ └── codegen/ # ExprCode, CodegenContext
│ ├── analysis/
│ │ ├── Analyzer.scala # Резолвит UnresolvedAttribute, UnresolvedRelation
│ │ ├── FunctionRegistry.scala # Реестр встроенных и UDF функций
│ │ └── Catalog.scala # Абстракция метаданных
│ ├── optimizer/
│ │ ├── Optimizer.scala # RuleExecutor с батчами правил (итеративно до fixed point)
│ │ ├── expressions.scala # ConstantFolding, SimplifyBooleanExpressions...
│ │ ├── joins.scala # ReorderJoin, EliminateOuterJoin, PushdownLeftSemiAntiJoin
│ │ └── subquery.scala # PullupCorrelatedPredicates, RewriteCorrelatedScalarSubquery
│ └── rules/
│ └── RuleExecutor.scala # Движок применения правил: runBatches, fixed-point iteration
Как работает RuleExecutor: он берёт список Batch-объектов (каждый содержит список Rule). Для каждого батча он применяет все правила по кругу, пока план не перестанет меняться или не будет достигнут maxIterations. Именно поэтому Catalyst называют fixed-point optimizer.
Parser
Parser (AbstractSqlParser): ANTLR4-грамматика sql/catalyst/src/main/antlr4/. Превращает SQL-строку в UnresolvedLogicalPlan с UnresolvedRelation, UnresolvedAttribute.Analyzer
Analyzer (Analyzer.scala): ResolvesRelations, ResolvesReferences, ResolveAggAliasInGroupBy... Около 50 правил в нескольких батчах. Использует SessionCatalog для резолвинга таблиц.Optimizer
Optimizer (Optimizer.scala): наследует RuleExecutor[LogicalPlan]. Около 100+ правил в 10 батчах. ConstantFolding, PushDownPredicate, ColumnPruning, ReorderJoin (CBO).SparkPlanner
SparkPlanner (sql/core): стратегии выбора физических операторов. FileSourceStrategy, JoinSelection, Aggregation, BasicOperators. Выдаёт SparkPlan.Модуль sql/core: физическое выполнение
sql/core содержит физические операторы и инфраструктуру выполнения:
sql/core/src/main/scala/org/apache/spark/sql/
├── execution/
│ ├── SparkStrategies.scala # Набор стратегий: FileSourceStrategy, JoinSelection...
│ ├── joins/
│ │ ├── SortMergeJoinExec.scala # Sort-merge join физический оператор
│ │ ├── BroadcastHashJoinExec.scala # Broadcast hash join
│ │ └── ShuffledHashJoinExec.scala # Shuffled hash join
│ ├── aggregate/
│ │ ├── HashAggregateExec.scala # Hash-based aggregation (Stage 1)
│ │ └── SortAggregateExec.scala # Sort-based fallback
│ ├── exchange/
│ │ ├── ShuffleExchangeExec.scala # Shuffle: создаёт shuffle boundaries
│ │ └── BroadcastExchangeExec.scala # Broadcast: собирает данные и рассылает
│ ├── datasources/
│ │ ├── FileSourceScanExec.scala # Чтение файлов: Parquet, ORC, CSV...
│ │ └── v2/ # DataSource V2 API
│ └── WholeStageCodegenExec.scala # Whole-stage codegen контейнер
├── catalyst/
│ └── optimizer/
│ └── SparkOptimizer.scala # Spark-специфичные правила поверх Catalyst Optimizer
└── SparkSession.scala # Публичная точка входа: sql(), read, createDataFrame
Важный паттерн: WholeStageCodegenExec оборачивает цепочку CodegenSupport-операторов. Метод doCodeGen() рекурсивно обходит дерево и генерирует один Java-метод. Это объясняет, почему *(1) в выводе explain() охватывает несколько операторов.
Где искать конкретные классы
Вот конкретные цепочки поиска для типичных задач:
Задача: понять, как Spark решает, делать BroadcastHashJoin или SortMergeJoin.
Файл: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala, объект JoinSelection. Логика: если одна сторона меньше spark.sql.autoBroadcastJoinThreshold (дефолт 10 МБ) — BroadcastHashJoinExec. Если нет hint и стороны большие — SortMergeJoinExec.
Задача: понять, как применяется predicate pushdown.
Файл: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala, правило PushDownPredicates. И для datasource: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala, PushPredicateThroughNonJoin.
Задача: понять, как TaskSchedulerImpl выбирает locality level.
Файл: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala, метод resourceOfferSingleTaskSet. Порядок: PROCESS_LOCAL -> NODE_LOCAL -> RACK_LOCAL -> ANY. Ожидание между уровнями задаётся spark.locality.wait (дефолт 3 секунды).
Задача: понять, как UnifiedMemoryManager крадёт execution memory у storage.
Файл: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala, метод acquireExecutionMemory. Он вызывает evictBlocksToFreeSpace на MemoryStore, вытесняя наименее недавно используемые cached RDD-блоки.
Навигация: практические инструменты
Читать код Spark в браузере через github.com — мучение. Есть лучшие варианты.
IntelliJ IDEA (рекомендуется для Scala):
# Генерация IntelliJ проекта
./build/mvn -DskipTests -Pideaproj idea:idea
# Или через SBT (быстрее для навигации):
./build/sbt "project core" gen-idea
После открытия в IDEA — Ctrl+N (Navigate -> Class) позволяет прыгать прямо к DAGScheduler, SortMergeJoinExec и т.д. Ctrl+B (Go to Declaration) работает по всему мультимодульному проекту. Alt+F7 (Find Usages) показывает все места, где используется метод.
Поиск по исходникам:
# Найти все реализации ShuffleManager:
grep -r "extends ShuffleManager" core/src/main/scala/
# Найти все места, где submitStage вызывается:
grep -rn "submitStage(" core/src/main/scala/
# Найти все Rule[LogicalPlan]:
find sql/catalyst/src/main/scala -name "*.scala" \
-exec grep -l "extends Rule\[LogicalPlan\]" {} \;
Чтение истории изменений конкретного класса:
git log --oneline -- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | head -20
Это показывает последние JIRA-номера, по которым можно найти контекст изменений.
Тесты как документация
Одно из лучших мест для понимания поведения движка — тесты. Они содержат конкретные сценарии с ожидаемым поведением.
core/src/test/scala/org/apache/spark/scheduler/
├── DAGSchedulerSuite.scala # ~4000 строк. Тест на каждый corner case DAGScheduler
└── TaskSchedulerImplSuite.scala # Locality levels, blacklisting, speculation
sql/core/src/test/scala/org/apache/spark/sql/execution/
├── joins/SortMergeJoinSuite.scala
└── adaptive/AdaptiveQueryExecSuite.scala # AQE: тесты skew join, dynamic coalesce
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/
├── FilterPushdownSuite.scala
└── JoinReorderSuite.scala
DAGSchedulerSuite особенно ценен: в нём симулируются отказы executor’ов, retry stage’ов, fetch failures при shuffle. Читая тест, вы одновременно видите API и ожидаемое поведение при различных failure mode.
Структура Spark Connect (новое в 4.0)
Spark Connect — полноценный gRPC-сервер, позволяющий тонким клиентам подключаться к Spark без зависимости от всего Spark jar. В 4.0 это стандартный способ использования Spark:
connect/
├── common/src/main/protobuf/spark/connect/
│ ├── relations.proto # Логические планы: Scan, Filter, Project, Join...
│ ├── expressions.proto # Выражения: Literal, Attribute, Call...
│ └── commands.proto # CreateDataFrame, WriteData...
├── server/src/main/scala/org/apache/spark/sql/connect/service/
│ ├── SparkConnectService.scala # gRPC-сервис: executePlan, analyzePlan
│ └── SparkConnectStreamHandler.scala # Стриминг результатов
└── client/jvm/src/main/scala/org/apache/spark/sql/connect/client/
└── SparkConnectClient.scala # Клиентская библиотека
Интересная особенность: .proto-файлы — это фактически спецификация протокола между клиентом и сервером. Если вы понимаете relations.proto, вы понимаете, как каждая DataFrame-операция транслируется в сетевой протокол.
Попробуй сам
1. Найди реализацию конкретного оптимизатора.
Клонируй репозиторий, найди файл с правилом ColumnPruning в sql/catalyst. Прочитай метод apply. Убедись, что понимаешь: когда срабатывает это правило и что именно оно делает с планом.
2. Трассировка action до физического плана.
Открой SparkSession.scala в IntelliJ. Пройди по вызовам: sql() -> Dataset.ofRows() -> QueryExecution -> optimizedPlan -> sparkPlan. Запомни, в каком классе начинается физическое планирование.
3. Прочти тест для понимания retry.
Открой DAGSchedulerSuite.scala, найди тест с FetchFailed. Убедись, что понимаешь, почему при FetchFailed Spark повторяет не только упавшую stage, но и parent stage.
# Быстрая проверка через grep:
grep -n "FetchFailed" \
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | head -10