Прежде чем нырнуть в любой конкретный модуль — shuffle internals, codegen, AQE — нужна цельная карта. Без неё каждый следующий урок будет набором изолированных фактов. С ней — каждый новый слой становится деталью уже понятного механизма.
Эта карта — mental model архитектуры Spark. Не полная спецификация движка, а структура, которая объясняет: кто за что отвечает, как запрос проходит сверху вниз, где именно живут проблемы из прошлого урока.
Три уровня: API, планирование, исполнение
Spark устроен в три крупных уровня:
- API-уровень — то, что пишет инженер: DataFrame/Dataset, SQL, RDD. Декларативное или функциональное описание трансформаций.
- Уровень планирования — то, что делает драйвер: принимает код, строит план, оптимизирует, разбивает на задачи.
- Уровень исполнения — то, что делают executor-ы: исполняют задачи, читают данные, пишут shuffle-файлы.
Первые два уровня живут в driver process. Третий — в executor process-ах на worker nodes.
Driver: мозг приложения
Driver — это ваша программа. Когда вы запускаете spark-submit или открываете Spark Connect сессию из Python, вы работаете с driver процессом.
Driver содержит:
- SparkContext / SparkSession — входная точка API. Именно здесь строятся все DataFrame-трансформации.
- Catalyst Optimizer — принимает логический план (дерево трансформаций), анализирует, оптимизирует, превращает в физический план.
- DAGScheduler — принимает физический план, разбивает на stages по shuffle-границам, строит DAG зависимостей.
- TaskScheduler — получает stages от DAGScheduler, разбивает каждую на tasks (по числу партиций), назначает tasks executor-ам с учётом data locality.
- SchedulerBackend — коммуникация с cluster manager-ом: запрос ресурсов, heartbeats, регистрация executor-ов.
Driver — один на приложение. Он является single point of coordination: все metadata, все планы, весь DAG-граф живут здесь. Если driver упал — приложение мертво.
В Spark 4.0 с Spark Connect часть работы переносится: клиентская библиотека строит unresolved logical plan на клиентской стороне и отправляет его на сервер через gRPC. Но Catalyst-анализ и DAGScheduler по-прежнему живут на server-стороне (Spark Connect Server), которая является фактическим driver-ом.
Executor: руки кластера
Executor — долгоживущий JVM-процесс на worker node. Один executor на приложение на одну машину (настраивается, но это default).
Executor содержит:
- Thread pool — пул потоков, каждый поток исполняет одну task в каждый момент.
- MemoryManager — управляет памятью между execution и storage регионами.
- BlockManager — хранит partition data (cached RDDs, broadcast variables, shuffle data). Общается с другими BlockManager-ами через peer-to-peer.
- ShuffleManager — пишет shuffle-файлы на map-стороне, читает shuffle-данные на reduce-стороне.
Executor-ы общаются с driver-ом через Netty RPC (в Spark 4.0 — по умолчанию, с TLS-шифрованием в production). Driver назначает задачи через RPC, executor отчитывается о прогрессе и результатах.
Cluster Manager: оркестратор ресурсов
Cluster manager стоит над и сбоку от Spark-приложения. Spark поддерживает три модели:
- Standalone — собственный cluster manager Spark. Простой, хорошо для dev/test.
- YARN — Resource Manager Hadoop. Стандарт для on-premise кластеров.
- Kubernetes — в Spark 4.0 production-ready. Каждый driver и каждый executor — отдельный Pod.
- Mesos — deprecated, удалён в Spark 4.0.
Driver запрашивает у cluster manager ресурсы (executor-ы), cluster manager аллоцирует их на worker nodes и запускает executor процессы. После этого driver работает напрямую с executor-ами — cluster manager больше не в критическом пути.
Полный путь запроса: от SQL до байтов
Теперь пройдём полный путь конкретного запроса:
result = spark.sql("""
SELECT department, SUM(salary) AS total
FROM employees
WHERE country = 'RU'
GROUP BY department
""")
result.write.parquet("/output/salaries")
Девять шагов от SQL-строки до байтов в хранилище. В каждом шаге — отдельный subsystem со своей логикой и своими точками отказа. Этот курс разбирает каждый из них.
Слои движка и 17 модулей курса
Посмотрим на ту же архитектуру через призму модулей курса:
Курс идёт по слоям сверху вниз до M05, затем возвращается вверх с более глубоким пониманием — оптимизационный стек (M06-M08) имеет смысл только после понимания базовой архитектуры.
RDD: базовая абстракция, которую скрывает DataFrame
DataFrame API — высокоуровневый и удобный. Но в runtime каждый DataFrame компилируется в RDD. RDD (Resilient Distributed Dataset) — это базовая абстракция данных Spark, и понимать её нужно даже если вы никогда не пишете RDD-код напрямую.
RDD — это:
- Коллекция партиций (
getPartitions(): Array[Partition]). - Функция вычисления каждой партиции (
compute(split: Partition, context: TaskContext): Iterator[T]). - Список зависимостей от родительских RDD (
getDependencies(): Seq[Dependency[_]]). - Опциональный Partitioner (для ключ-значение RDD).
- Предпочтительные locations для задач (
getPreferredLocations).
Когда вы пишете df.filter(...).groupBy(...).agg(...), Catalyst в конечном счёте генерирует RDD, где:
filterиproject— narrow transformations, они входят в один stage.groupBy— wide transformation (shuffle), она создаёт новый stage boundary.
DAGScheduler работает именно с RDD-графом, не с DataFrame. Он видит ShuffledRDD и знает: здесь граница stage.
Как data locality работает на практике
Data locality — попытка TaskScheduler запустить задачу там, где живут данные. Пять уровней по приоритету:
- PROCESS_LOCAL — данные в памяти того же JVM-процесса (кешированный RDD).
- NODE_LOCAL — данные на том же физическом узле (HDFS блок на той же машине).
- NO_PREF — данные не имеют предпочтения (например, S3 — все узлы равно далеко).
- RACK_LOCAL — данные на узле в том же rack.
- ANY — данные где угодно.
TaskScheduler ждёт spark.locality.wait (по умолчанию 3 секунды), надеясь на более высокий уровень locality. Если не дождался — понижает уровень и ждёт ещё. Это важно: при маленьких задачах на большом кластере, где S3 — основное хранилище (locality = NO_PREF для всех), ожидание locality добавляет задержку без выигрыша.
Знание этого механизма объясняет, почему иногда снижение spark.locality.wait до 0 ускоряет jobs на S3-based кластерах.
Память: два региона и их баланс
Executor memory делится на три части в Spark 4.0 с UnifiedMemoryManager:
Total executor memory = spark.executor.memory (heap)
+ spark.executor.memoryOverhead (off-heap)
Heap делится на:
- Reserved Memory (300 MB — захардкожено). Для системных нужд Spark.
- User Memory (
(1 - spark.memory.fraction)от usable heap). Для пользовательских структур данных в коде. - Spark Memory Pool (
spark.memory.fraction× usable heap, default 60%). Управляется UnifiedMemoryManager.
Spark Memory Pool динамически делится между:
- Execution Region — shuffle buffers, sort buffers, aggregation hash tables.
- Storage Region — кешированные RDD, broadcast variables.
Динамический баланс: execution может “занять” storage region (вытеснив кеш), если нужна память для shuffle. Это значит: при интенсивном shuffle ваш df.cache() может быть частично вытеснен, и при следующем обращении к кешированному DataFrame Spark пересчитает потерянные партиции. Без понимания этого механизма кажется, что cache работает непредсказуемо.
Spark Connect и будущее архитектуры
В Spark 4.0 Spark Connect — не экспериментальная фича, а основной путь подключения Python-клиентов. Это архитектурно важное изменение.
Традиционно: клиентский код Python -> PySpark -> Py4J -> Java driver в том же процессе. Spark Connect: клиентский код Python -> gRPC -> Spark Connect Server (отдельный процесс, фактически driver).
Это значит:
- Клиент и driver изолированы. Падение driver-а не убивает Python-процесс и наоборот.
- Клиент может быть на другой машине. Подключаться к shared Spark cluster.
- Логика построения плана теперь разделена: клиент строит unresolved plan (Relation proto), сервер получает, анализирует, оптимизирует.
Для internals это означает: часть Catalyst-работы (построение plan tree) теперь происходит в Python-клиенте (через pyspark-side plan building), финальный анализ — на сервере. Граница между “клиент” и “движок” сместилась.
Что будет в следующих модулях
После этого урока у вас есть карта. Следующий шаг — идти по этой карте:
Модуль 02 разберёт RDD-слой: как работает fault tolerance через lineage, как partitioner влияет на shuffle, как DataFrame компилируется в конкретные RDD-трансформации.
Модуль 03 — DAGScheduler и TaskScheduler internals: как строится DAG stages, как работает locality, что происходит при task failure и retry.
Модуль 04 — Shuffle internals: что именно происходит в shuffle, почему spill дорог, как настраивать.
Модуль 05 — Memory internals: UnifiedMemoryManager, как execution и storage борются за память, GC tuning.
Модули 06-08 — оптимизационный стек: к этому моменту вы будете понимать, что именно оптимизирует Catalyst и на каком уровне.
Не пытайтесь запомнить всю карту сейчас. Используйте этот урок как reference: когда в M04 вы будете читать про SortShuffleWriter, вы сможете вернуться сюда и понять, в каком слое это живёт. Mental model работает как индекс, а не как encyclopedia.
Попробуй сам
Это упражнение помогает сверить mental model с реальностью движка.
Откройте Spark shell (или PySpark notebook) с Spark 4.0:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark.sparkContext
Создайте простой DataFrame и посмотрите, как он компилируется:
df = spark.range(1000000)
filtered = df.filter("id % 7 == 0")
grouped = filtered.groupBy((filtered.id % 100).alias("bucket")).count()
# Посмотрите физический план
grouped.explain("formatted")
# Посмотрите RDD lineage
grouped.rdd.toDebugString()
Задачи для рефлексии:
-
В выводе
explain("formatted")найдите операторHashAggregate. Их два — partial и final. Почему два, а не один? Какой из них соответствует map-side задачам в Stage 0? -
Найдите в плане
Exchange(илиShuffleExchangeExec). Это граница stages. Сколько stages будет в этом job-е? -
Запустите
grouped.collect()и откройте Spark UI (обычно localhost:4040). Зайдите в вкладку Stages. Проверьте: совпадает ли число stages с вашей оценкой из пункта 2? -
В деталях Stage 0 посмотрите на колонку “Shuffle Write”. Это данные, которые были записаны на диск для Stage 1. Примерно сколько?
-
Запустите тот же запрос с
spark.sql.adaptive.enabled = falseи сtrue. Посмотрите, изменилось ли число партиций финального stage. Как AQE изменил план?
Если у вас есть Spark Connect (Spark 4.0 сервер), попробуйте то же самое через SparkSession.builder.remote("sc://localhost").getOrCreate() и сравните, как меняется путь построения плана.