Learning Platform
Глоссарий Troubleshooting
Урок 02.02 · 26 мин
Продвинутый
ArchitectureDriverExecutorDAGSchedulerCatalystTungstenMental model

Прежде чем нырнуть в любой конкретный модуль — shuffle internals, codegen, AQE — нужна цельная карта. Без неё каждый следующий урок будет набором изолированных фактов. С ней — каждый новый слой становится деталью уже понятного механизма.

Эта карта — mental model архитектуры Spark. Не полная спецификация движка, а структура, которая объясняет: кто за что отвечает, как запрос проходит сверху вниз, где именно живут проблемы из прошлого урока.

Три уровня: API, планирование, исполнение

Spark устроен в три крупных уровня:

  1. API-уровень — то, что пишет инженер: DataFrame/Dataset, SQL, RDD. Декларативное или функциональное описание трансформаций.
  2. Уровень планирования — то, что делает драйвер: принимает код, строит план, оптимизирует, разбивает на задачи.
  3. Уровень исполнения — то, что делают executor-ы: исполняют задачи, читают данные, пишут shuffle-файлы.

Первые два уровня живут в driver process. Третий — в executor process-ах на worker nodes.

Driver: мозг приложения

Driver — это ваша программа. Когда вы запускаете spark-submit или открываете Spark Connect сессию из Python, вы работаете с driver процессом.

Driver содержит:

  • SparkContext / SparkSession — входная точка API. Именно здесь строятся все DataFrame-трансформации.
  • Catalyst Optimizer — принимает логический план (дерево трансформаций), анализирует, оптимизирует, превращает в физический план.
  • DAGScheduler — принимает физический план, разбивает на stages по shuffle-границам, строит DAG зависимостей.
  • TaskScheduler — получает stages от DAGScheduler, разбивает каждую на tasks (по числу партиций), назначает tasks executor-ам с учётом data locality.
  • SchedulerBackend — коммуникация с cluster manager-ом: запрос ресурсов, heartbeats, регистрация executor-ов.

Driver — один на приложение. Он является single point of coordination: все metadata, все планы, весь DAG-граф живут здесь. Если driver упал — приложение мертво.

NOTE

В Spark 4.0 с Spark Connect часть работы переносится: клиентская библиотека строит unresolved logical plan на клиентской стороне и отправляет его на сервер через gRPC. Но Catalyst-анализ и DAGScheduler по-прежнему живут на server-стороне (Spark Connect Server), которая является фактическим driver-ом.

Executor: руки кластера

Executor — долгоживущий JVM-процесс на worker node. Один executor на приложение на одну машину (настраивается, но это default).

Executor содержит:

  • Thread pool — пул потоков, каждый поток исполняет одну task в каждый момент.
  • MemoryManager — управляет памятью между execution и storage регионами.
  • BlockManager — хранит partition data (cached RDDs, broadcast variables, shuffle data). Общается с другими BlockManager-ами через peer-to-peer.
  • ShuffleManager — пишет shuffle-файлы на map-стороне, читает shuffle-данные на reduce-стороне.

Executor-ы общаются с driver-ом через Netty RPC (в Spark 4.0 — по умолчанию, с TLS-шифрованием в production). Driver назначает задачи через RPC, executor отчитывается о прогрессе и результатах.

Cluster Manager: оркестратор ресурсов

Cluster manager стоит над и сбоку от Spark-приложения. Spark поддерживает три модели:

  • Standalone — собственный cluster manager Spark. Простой, хорошо для dev/test.
  • YARN — Resource Manager Hadoop. Стандарт для on-premise кластеров.
  • Kubernetes — в Spark 4.0 production-ready. Каждый driver и каждый executor — отдельный Pod.
  • Mesos — deprecated, удалён в Spark 4.0.

Driver запрашивает у cluster manager ресурсы (executor-ы), cluster manager аллоцирует их на worker nodes и запускает executor процессы. После этого driver работает напрямую с executor-ами — cluster manager больше не в критическом пути.

Полный путь запроса: от SQL до байтов

Теперь пройдём полный путь конкретного запроса:

result = spark.sql("""
    SELECT department, SUM(salary) AS total
    FROM employees
    WHERE country = 'RU'
    GROUP BY department
""")
result.write.parquet("/output/salaries")
Путь запроса через движок Spark
1. SQL ParserANTLR4-парсер строит AbstractSyntaxTree из SQL-строки. Создаёт UnresolvedLogicalPlan: UnresolvedRelation('employees'), UnresolvedAttribute('country'), UnresolvedFunction('SUM'). Ничто ещё не resolved — это просто дерево символов.
2. AnalyzerРезолвит имена через Catalog. UnresolvedRelation('employees') становится конкретным HiveTableRelation или DataSourceV2Relation. Проверяет типы колонок, приводит типы при необходимости. Результат: Analyzed LogicalPlan.
3. Logical OptimizerПрименяет правила RBO: PredicatePushdown (WHERE country='RU' двигается ближе к scan), ColumnPruning (читаем только department, salary, country), ConstantFolding. Применяет CBO: собирает статистику, выбирает порядок join если нужен. До 100+ правил, применяются в фиксированной точке.
4. SparkPlannerПревращает LogicalPlan в PhysicalPlan: LogicalAggregate -> HashAggregateExec, LogicalScan -> FileSourceScanExec (Parquet). Несколько физических стратегий на каждую логическую операцию, выбирается по cost модели.
5. Whole-Stage CodeGenTungsten: объединяет несколько физических операторов (Filter + Project + Aggregate partial) в один Java-класс, генерирует исходный код, компилирует через Janino. Итог — один tight loop без виртуальных вызовов между операторами.
6. DAGSchedulerПринимает RDD DAG (который является представлением физического плана). Разбивает по shuffle-границам (wide dependencies) на Stages. Создаёт DAG зависимостей stage. Stage с groupBy потребует shuffle -> две stages: Stage 0 (scan+filter+partial_agg) и Stage 1 (final_agg после shuffle).
7. TaskScheduler + LocalityКаждый Stage -> набор Tasks (по числу партиций). TaskScheduler учитывает data locality: хочет запустить задачу на executor-е, который на том же узле что и данные (HDFS / S3 cache). При недоступности — RACK_LOCAL, затем ANY.
8. Executor: Task executionExecutor получает сериализованную TaskDescription. Десериализует, запускает в thread pool. Читает Parquet-файлы через FileSourceScanExec, применяет filter и projection через генерированный код, пишет shuffle-файлы для Stage 1.
9. Shuffle + Stage 1После Stage 0: shuffle read. Каждый reduce-task читает shuffle-данные со всех map-executor-ов (через HTTP или direct), сортирует по ключу, применяет final aggregation. Пишет результат в output location (Parquet).

Девять шагов от SQL-строки до байтов в хранилище. В каждом шаге — отдельный subsystem со своей логикой и своими точками отказа. Этот курс разбирает каждый из них.

Слои движка и 17 модулей курса

Посмотрим на ту же архитектуру через призму модулей курса:

Карта модулей курса по слоям движка
Модуль 00: Введение в курсФреймворк курса, как работать с материалом, инструменты.
Модуль 01: Философия internalsЗачем знать движок. Mental model архитектуры — этот урок.
DataFrame / Dataset / SQL APIВерхний слой — то что пишет инженер.
Модуль 06: Catalyst Optimizer + Tungsten EngineЧетыре фазы Catalyst: Parser -> Analyzer -> Optimizer -> Planner. Tungsten: memory layout, codegen. Как запрос превращается в физический план.
Модуль 07: Сериализация и row formatКак Spark представляет строки в памяти. InternalRow vs UnsafeRow. Encoders, Kryo, Java serialization. Off-heap row format в Tungsten.
Модуль 08: Adaptive Query ExecutionAQE в Spark 4.0: runtime plan changes, auto partition coalescing, skew join handling. Как optimizer работает с реальной статистикой, а не расчётной.
RDD LayerУстойчивые распределённые датасеты. Базовая абстракция для всего выше.
Модуль 02: RDD и модель выполненияRDD как структура данных: partitions, dependencies (narrow vs wide), preferred locations. Lineage, fault tolerance через recomputation. Как DataFrame компилируется в RDD.
DAGScheduler / TaskSchedulerПланирование stages и задач.
Модуль 03: Архитектура и scheduler internalsDAGScheduler: построение DAG, Stage boundaries, job/stage/task lifecycle. TaskScheduler: locality, speculative execution. Executor lifecycle. Spark UI как зеркало scheduler-а.
Shuffle + MemoryДанные в движении и в покое.
Модуль 04: Shuffle internalsSort-based shuffle vs hash shuffle. ShuffleWriter: sort, spill, merge. ShuffleReader: fetch, merge, sort. Shuffle файловая система. Tune: spark.shuffle.* параметры.
Модуль 05: Memory + Storage internalsUnifiedMemoryManager: execution region vs storage region, eviction. Off-heap memory. BlockManager: cache management, eviction policy. Tungsten off-heap. GC tuning.
Специальные рантаймыStreaming и новые execution paths.
Модуль 09: Structured Streaming internalsMicroBatchExecution vs ContinuousExecution. Epoch-based processing. Offset management. Stateful operators: MapGroupsWithState, flatMapGroupsWithState. WAL.
Модуль 10: Arrow + Spark ConnectApache Arrow как zero-copy exchange format. Spark Connect: gRPC protocol, client-server split, plan serialization. Python client internals в Spark 4.0.
Расширение и операцииКак расширять и эксплуатировать движок.
Модуль 11: Расширение SparkDataSourceV2, Catalyst extensions, custom rules. Как сторонние коннекторы интегрируются в движок. SparkSessionExtensions API.
Модуль 12: External Shuffle ServiceESS: зачем нужен, как работает, когда без него нельзя. Dynamic Resource Allocation с ESS. Kubernetes и ESS.
Модуль 13: Alternative EnginesPhoton (Databricks), Gluten+Velox, Native Spark. Почему они быстрее и чем платят за это. Как выбирать.
Модуль 14: Исходники и отладкаКак читать Spark source code. Ключевые пакеты. Remote debugging. Как воспроизводить production-проблемы локально.
Модуль 15: КапстоунПрактический дебаг internals: реальные production scenarios, анализ от симптомов к причинам.
Модуль 16: ПриложениеСправочник: ключевые конфиги, Spark UI метрики, чек-листы оптимизации.

Курс идёт по слоям сверху вниз до M05, затем возвращается вверх с более глубоким пониманием — оптимизационный стек (M06-M08) имеет смысл только после понимания базовой архитектуры.

RDD: базовая абстракция, которую скрывает DataFrame

DataFrame API — высокоуровневый и удобный. Но в runtime каждый DataFrame компилируется в RDD. RDD (Resilient Distributed Dataset) — это базовая абстракция данных Spark, и понимать её нужно даже если вы никогда не пишете RDD-код напрямую.

RDD — это:

  • Коллекция партиций (getPartitions(): Array[Partition]).
  • Функция вычисления каждой партиции (compute(split: Partition, context: TaskContext): Iterator[T]).
  • Список зависимостей от родительских RDD (getDependencies(): Seq[Dependency[_]]).
  • Опциональный Partitioner (для ключ-значение RDD).
  • Предпочтительные locations для задач (getPreferredLocations).

Когда вы пишете df.filter(...).groupBy(...).agg(...), Catalyst в конечном счёте генерирует RDD, где:

  • filter и project — narrow transformations, они входят в один stage.
  • groupBy — wide transformation (shuffle), она создаёт новый stage boundary.

DAGScheduler работает именно с RDD-графом, не с DataFrame. Он видит ShuffledRDD и знает: здесь граница stage.

Driver и Executor: базовая модель

Как data locality работает на практике

Data locality — попытка TaskScheduler запустить задачу там, где живут данные. Пять уровней по приоритету:

  • PROCESS_LOCAL — данные в памяти того же JVM-процесса (кешированный RDD).
  • NODE_LOCAL — данные на том же физическом узле (HDFS блок на той же машине).
  • NO_PREF — данные не имеют предпочтения (например, S3 — все узлы равно далеко).
  • RACK_LOCAL — данные на узле в том же rack.
  • ANY — данные где угодно.

TaskScheduler ждёт spark.locality.wait (по умолчанию 3 секунды), надеясь на более высокий уровень locality. Если не дождался — понижает уровень и ждёт ещё. Это важно: при маленьких задачах на большом кластере, где S3 — основное хранилище (locality = NO_PREF для всех), ожидание locality добавляет задержку без выигрыша.

Знание этого механизма объясняет, почему иногда снижение spark.locality.wait до 0 ускоряет jobs на S3-based кластерах.

Память: два региона и их баланс

Executor memory делится на три части в Spark 4.0 с UnifiedMemoryManager:

Total executor memory = spark.executor.memory (heap)
                      + spark.executor.memoryOverhead (off-heap)

Heap делится на:

  • Reserved Memory (300 MB — захардкожено). Для системных нужд Spark.
  • User Memory ((1 - spark.memory.fraction) от usable heap). Для пользовательских структур данных в коде.
  • Spark Memory Pool (spark.memory.fraction × usable heap, default 60%). Управляется UnifiedMemoryManager.

Spark Memory Pool динамически делится между:

  • Execution Region — shuffle buffers, sort buffers, aggregation hash tables.
  • Storage Region — кешированные RDD, broadcast variables.

Динамический баланс: execution может “занять” storage region (вытеснив кеш), если нужна память для shuffle. Это значит: при интенсивном shuffle ваш df.cache() может быть частично вытеснен, и при следующем обращении к кешированному DataFrame Spark пересчитает потерянные партиции. Без понимания этого механизма кажется, что cache работает непредсказуемо.

Spark Connect и будущее архитектуры

В Spark 4.0 Spark Connect — не экспериментальная фича, а основной путь подключения Python-клиентов. Это архитектурно важное изменение.

Традиционно: клиентский код Python -> PySpark -> Py4J -> Java driver в том же процессе. Spark Connect: клиентский код Python -> gRPC -> Spark Connect Server (отдельный процесс, фактически driver).

Это значит:

  • Клиент и driver изолированы. Падение driver-а не убивает Python-процесс и наоборот.
  • Клиент может быть на другой машине. Подключаться к shared Spark cluster.
  • Логика построения плана теперь разделена: клиент строит unresolved plan (Relation proto), сервер получает, анализирует, оптимизирует.

Для internals это означает: часть Catalyst-работы (построение plan tree) теперь происходит в Python-клиенте (через pyspark-side plan building), финальный анализ — на сервере. Граница между “клиент” и “движок” сместилась.

Что будет в следующих модулях

После этого урока у вас есть карта. Следующий шаг — идти по этой карте:

Модуль 02 разберёт RDD-слой: как работает fault tolerance через lineage, как partitioner влияет на shuffle, как DataFrame компилируется в конкретные RDD-трансформации.

Модуль 03 — DAGScheduler и TaskScheduler internals: как строится DAG stages, как работает locality, что происходит при task failure и retry.

Модуль 04 — Shuffle internals: что именно происходит в shuffle, почему spill дорог, как настраивать.

Модуль 05 — Memory internals: UnifiedMemoryManager, как execution и storage борются за память, GC tuning.

Модули 06-08 — оптимизационный стек: к этому моменту вы будете понимать, что именно оптимизирует Catalyst и на каком уровне.

WARNING

Не пытайтесь запомнить всю карту сейчас. Используйте этот урок как reference: когда в M04 вы будете читать про SortShuffleWriter, вы сможете вернуться сюда и понять, в каком слое это живёт. Mental model работает как индекс, а не как encyclopedia.

Попробуй сам

Это упражнение помогает сверить mental model с реальностью движка.

Откройте Spark shell (или PySpark notebook) с Spark 4.0:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark.sparkContext

Создайте простой DataFrame и посмотрите, как он компилируется:

df = spark.range(1000000)
filtered = df.filter("id % 7 == 0")
grouped = filtered.groupBy((filtered.id % 100).alias("bucket")).count()

# Посмотрите физический план
grouped.explain("formatted")

# Посмотрите RDD lineage
grouped.rdd.toDebugString()

Задачи для рефлексии:

  1. В выводе explain("formatted") найдите оператор HashAggregate. Их два — partial и final. Почему два, а не один? Какой из них соответствует map-side задачам в Stage 0?

  2. Найдите в плане Exchange (или ShuffleExchangeExec). Это граница stages. Сколько stages будет в этом job-е?

  3. Запустите grouped.collect() и откройте Spark UI (обычно localhost:4040). Зайдите в вкладку Stages. Проверьте: совпадает ли число stages с вашей оценкой из пункта 2?

  4. В деталях Stage 0 посмотрите на колонку “Shuffle Write”. Это данные, которые были записаны на диск для Stage 1. Примерно сколько?

  5. Запустите тот же запрос с spark.sql.adaptive.enabled = false и с true. Посмотрите, изменилось ли число партиций финального stage. Как AQE изменил план?

Если у вас есть Spark Connect (Spark 4.0 сервер), попробуйте то же самое через SparkSession.builder.remote("sc://localhost").getOrCreate() и сравните, как меняется путь построения плана.

Spark UI: полный разбор
Проверка знанийKnowledge check
Вы запускаете следующий PySpark job: читаете таблицу orders из S3 (300 GB Parquet, 1200 файлов), джойните с таблицей customers (800 MB Parquet), фильтруете по дате, группируете по customer_id и считаете SUM(amount). Explain plan показывает SortMergeJoin (не broadcast). Cluster: 20 executor-ов, 4 core каждый. spark.sql.shuffle.partitions = 200 (default). Нарисуйте (опишите словами) DAG этого job-а: сколько stages, что происходит в каждом, где shuffle boundaries. Затем: почему Catalyst выбрал SortMergeJoin, и при каком условии он выбрал бы BroadcastHashJoin?
ОтветAnswer
DAG из 3 stages: Stage 0 — scan orders (1200 Parquet-файлов) + filter по дате + project нужных колонок. 1200 задач (по файлу). Shuffle write в конце: данные сортируются по join key (customer_id) и записываются в 200 shuffle-файлов. Stage 1 — scan customers (параллельно со Stage 0) + project. N задач (по числу файлов customers, ~800 MB / avg file size). Shuffle write: сортировка по customer_id, 200 файлов. Stage 2 (зависит от Stage 0 + Stage 1): 200 задач (spark.sql.shuffle.partitions). Каждая задача читает данные orders и customers для своего диапазона customer_id (shuffle read), делает sort-merge join, применяет groupBy + SUM(amount), пишет результат на S3. Итого: 3 stages, 2 shuffle boundaries (join и groupBy могут быть объединены оптимизатором в один shuffle если ключи совпадают). Catalyst выбрал SortMergeJoin потому что customers 800 MB > autoBroadcastJoinThreshold (по умолчанию 10 MB в Spark 4.0). BroadcastHashJoin был бы выбран если бы customers был меньше порога, или если явно указать hint: customers.hint('broadcast'). Важно: статистика должна быть актуальной — если ANALYZE TABLE не запускался, Catalyst может ошибиться с размером.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Вы запускаете запрос с двумя join-операциями и одной groupBy. Explain plan показывает: FileScan -> Filter -> Exchange -> SortMergeJoin -> Exchange -> HashAggregate (partial) -> Exchange -> HashAggregate (final). Сколько stages создаст DAGScheduler?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 2