Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 24 мин
Начальный
sparkrdddataframelazy evaluationpartitions

В прошлом уроке мы разобрались, зачем нужна распределённая обработка. Теперь — главный инструмент, который её реализует в большинстве компаний: Apache Spark. Spark родился в 2009 году в Berkeley AMPLab как академический проект, в 2014 стал top-level Apache, и с тех пор стал де-факто стандартом для batch и streaming обработки больших данных.

Этот урок даёт концептуальную картину: что такое Spark, как он распределяет работу, какие у него ключевые понятия. Это intro — мы НЕ погружаемся в Catalyst optimizer, Adaptive Query Execution, tuning производительности, production-grade конфигурации. Для глубины есть наш отдельный spark-course на платформе.

Что такое Spark на одной странице

Spark — это распределённый движок обработки данных, который умеет:

  • Читать огромные датасеты из распределённых хранилищ (HDFS, S3, Parquet/ORC файлы, Kafka, JDBC-источники).
  • Выполнять трансформации (filter, map, group, join, aggregate) параллельно на кластере.
  • Писать результат обратно в хранилища.
  • Поддерживать SQL, DataFrame API, MLlib, GraphX, Structured Streaming.

Spark работает поверх кластера машин. Один экземпляр Spark-программы охватывает:

Архитектура Spark: driver и executors

Driver — координатор, который читает код и строит план. Executors — рабочие машины, которые выполняют части плана параллельно.

DriverкоординаторГлавный процесс Spark-программы. Запускает приложение, парсит код, строит DAG задач, координирует выполнение, собирает результаты. На driver запускается SparkSession и main().
schedule
Cluster managerYARN / K8s / MesosВнешний планировщик ресурсов: YARN (Hadoop), Kubernetes, Standalone, Mesos. Driver запрашивает у него executors, manager выделяет машины.
Executor 1worker nodeJVM-процесс на физической машине. Имеет ядра CPU и память. Выполняет tasks, которые ему присылает driver.
Executor 2worker nodeЕщё один воркер. Параллельно с executor 1 обрабатывает свою партицию данных.
Executor Nworker nodeИ так далее — десятки или сотни executors в типичном production-кластере.
Partition AданныеКаждый executor обрабатывает один или несколько partition'ов входных данных. Партиция — порция данных, обрабатываемая одной task.
Partition Bданные
Partition Xданные

Когда ты запускаешь Spark-приложение, происходит:

  1. Driver-процесс стартует на одной машине (твой ноутбук, edge-нода кластера, контейнер K8s).
  2. Driver разбирает твой код (Python, Scala, Java, R) и строит logical plan трансформаций.
  3. Driver запрашивает у cluster manager (YARN, Kubernetes, standalone) executors — рабочие JVM-процессы на нескольких машинах.
  4. Driver разбивает план на tasks — единицы работы по одной партиции данных. Каждая task назначается на executor.
  5. Executors параллельно выполняют tasks. Иногда между стадиями делают shuffle — обмен данными между executors.
  6. Финальный результат собирается на driver или пишется в storage.

RDD, DataFrame, Spark SQL: эволюция API

Spark предлагает несколько уровней абстракции. Они появлялись исторически, и каждый следующий проще предыдущего.

RDD (Resilient Distributed Dataset) — самый низкий уровень. Это «коллекция» объектов, распределённая по executors. Можно вызывать map, filter, reduce — функциональное программирование над распределённым датасетом.

# RDD — низкоуровневый API
rdd = sc.textFile("s3://logs/2026-05-17/*.log")
filtered = rdd.filter(lambda line: "ERROR" in line)
counts = filtered.map(lambda line: 1).reduce(lambda a, b: a + b)

RDD даёт максимум контроля, но code менее читабелен, и оптимизатор Spark не может анализировать lambda-функции — он видит их как чёрный ящик. Сейчас RDD используют редко.

DataFrame — табличная абстракция с колонками и типами, как pandas или SQL-таблица. Это высокоуровневый API, который Spark оптимизирует через Catalyst optimizer.

# DataFrame — рекомендуемый API
df = spark.read.parquet("s3://orders/")
result = df.filter(df.country == "RU") \
    .groupBy("category") \
    .agg(F.sum("amount").alias("revenue"))
result.show()

DataFrame — стандарт для большинства задач в 2026 году. Catalyst анализирует план, переставляет операции, выбирает оптимальные join-стратегии, делает predicate pushdown в источник.

Spark SQL — то же самое, но в виде SQL-запроса.

df.createOrReplaceTempView("orders")
result = spark.sql("""
    SELECT category, SUM(amount) AS revenue
    FROM orders
    WHERE country = 'RU'
    GROUP BY category
""")

SQL и DataFrame API эквивалентны — компилируются в одинаковый план. Выбор — вопрос вкуса и контекста.

NOTE

В большинстве production-кода DataFrame и Spark SQL используются вперемешку. Сложные трансформации удобно писать DataFrame, простые SELECT — SQL. Catalyst всё равно превратит их в одинаковый оптимизированный план.

Lazy evaluation: ключевая особенность

Spark ленив. Когда ты пишешь:

df = spark.read.parquet("s3://orders/")
filtered = df.filter(df.country == "RU")
grouped = filtered.groupBy("category").agg(F.sum("amount"))

— Spark ничего не выполняет. Он только строит план. Никакие данные не читаются, executors не работают.

Выполнение начинается только когда ты вызываешь action — операцию, которая возвращает результат или пишет данные:

grouped.show()                              # action — печатает в stdout
grouped.write.parquet("s3://result/")       # action — пишет файлы
result = grouped.collect()                  # action — собирает на driver
count = grouped.count()                     # action — возвращает число

Только в этот момент Spark анализирует весь накопленный план, оптимизирует его (через Catalyst), и запускает на executors.

Transformations vs actions

  • Transformation — операция, которая возвращает новый DataFrame, не выполняя работу. filter, select, groupBy, join, withColumn. Ленивая.
  • Action — операция, которая запускает выполнение. show, collect, count, write, take. Не ленивая.
# Это transformation chain — ничего не выполняется
big_df = spark.read.parquet("...")
step1 = big_df.filter(...)
step2 = step1.groupBy(...).agg(...)
step3 = step2.join(other_df, "id")

# Это action — все три трансформации выполнятся за один проход
step3.write.parquet("...")
Lazy evaluation Spark

Трансформации только строят план. Action запускает оптимизатор и выполнение.

readparquetСоздание DataFrame из файлов. Это transformation, ничего не читается.
filtercountry = RUДобавляет фильтр к плану. Не выполняется.
groupBycategoryДобавляет агрегацию к плану.
aggsum(amount)Добавляет агрегатную функцию.
actionwrite.parquet()Action триггерит Catalyst оптимизатор и физическое выполнение. Только сейчас начинается работа на кластере.
Catalystoptimize planCatalyst переставляет операции, делает predicate pushdown, выбирает join-стратегии.
Tasksexecutors runDriver разбивает план на tasks и рассылает по executors. Те читают данные, фильтруют, агрегируют, пишут результат.

Partitions

Partition в Spark — это порция данных, обрабатываемая одной task на одном executor. Это базовая единица параллелизма.

Когда Spark читает 100 ГБ parquet-файлов, он разбивает их на партиции (обычно по размеру блока, например 128 МБ). При 100 ГБ это примерно 800 партиций. Если в кластере 100 executors с 8 ядрами каждый, есть 800 параллельных слотов — каждый executor параллельно обрабатывает 8 партиций.

Количество партиций критично для производительности:

  • Слишком мало партиций — слабый параллелизм, executors простаивают.
  • Слишком много партиций — overhead на координацию, маленькие tasks.

Партиции можно изменять явно: df.repartition(N) создаёт новый DataFrame с N партициями. df.coalesce(N) уменьшает число партиций без shuffle.

Простой пример: count by group

Давай посмотрим конкретный сценарий end-to-end. Есть лог заказов в S3 в формате Parquet, нужно посчитать количество заказов по странам.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Стартуем Spark-сессию (driver)
spark = SparkSession.builder \
    .appName("orders-by-country") \
    .getOrCreate()

# Читаем DataFrame (transformation, lazy)
orders = spark.read.parquet("s3://my-bucket/orders/dt=2026-05-17/")

# Фильтр и группировка (transformations, lazy)
result = orders \
    .filter(orders.status == "completed") \
    .groupBy("country") \
    .agg(F.count("*").alias("orders_count"))

# Action — запускает выполнение
result.show()

Что произойдёт:

  1. Driver запускает сессию.
  2. Catalyst анализирует план: read parquet -> filter status -> groupBy country -> count.
  3. Catalyst оптимизирует: filter pushed down к parquet (если есть partition column status, читаются только нужные файлы).
  4. Driver делит работу: каждая task читает свою партицию parquet (например, 800 task на 100 ГБ).
  5. Executors параллельно читают, фильтруют, считают локальные count по country.
  6. Shuffle: данные перераспределяются по country между executors (все строки с country=RU попадают на один executor).
  7. Финальный count на каждом executor по своей country.
  8. Driver собирает результат и печатает.

Это и есть основной flow Spark-job-а.

Что осталось за кадром

Этот урок — обзорный. Глубокий Spark — это:

  • Catalyst optimizer — детали работы, как читать explain plan, как помогать оптимизатору.
  • Adaptive Query Execution (AQE) — runtime-перепланировка запроса.
  • Shuffle internals — как минимизировать дорогой shuffle.
  • Join strategies — broadcast, sort-merge, shuffle-hash, выбор стратегии.
  • Memory management — executor memory, off-heap, spill to disk.
  • Tuning — конфигурации, partition sizing, skew handling.
  • Structured Streaming — Spark в streaming-режиме.
  • Production patterns — мониторинг, retries, idempotency, schema evolution.

Всё это — отдельная большая тема. В нашем spark-course на платформе мы разбираем это поэтапно с диаграммами, real-world примерами и hands-on заданиями. Если ты планируешь работать с Spark серьёзно — следующий шаг туда.

Spark internals: Catalyst optimizer, shuffle и memory management Spark Structured Streaming: тот же API для streaming задач
TIP

Учить Spark поверхностно через DataFrame API — нормальный старт. Но в продакшене ты быстро упрёшься в shuffle, skew, OOM, медленные joins. Это всё решается через понимание внутренностей, и без этого Spark-инженер не может профессионально работать. Это и есть smysl углублённого курса.

Попробуй сам

Подумай, как бы ты использовал Spark в задаче, которую раньше делал в Python+pandas. Например, у тебя есть 50 файлов CSV по 1 ГБ каждый, нужно их объединить, отфильтровать, посчитать агрегаты, записать в Parquet. В pandas это бы заняло часы и упёрлось в RAM. В Spark на 10 executors — минуты. Какие transformations и actions ты бы написал? Где было бы shuffle? Это и есть базовый workflow Spark-инженера.

Проверка знанийKnowledge check
В чём ключевая особенность lazy evaluation в Spark и зачем она нужна?
ОтветAnswer
Lazy evaluation означает, что transformations (filter, groupBy, join, agg) только строят план выполнения, но не запускают работу. Реальное вычисление начинается только при вызове action (show, write, count, collect). Это нужно для оптимизации: Catalyst optimizer видит весь накопленный план целиком и может его переставить — сделать predicate pushdown, объединить операции, выбрать оптимальную join-стратегию. Если бы Spark выполнял каждый transformation немедленно, такая оптимизация была бы невозможна. Lazy evaluation — это плата за то, что Spark может обрабатывать терабайты быстрее, чем «наивный» eager-engine.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какую роль играют driver и executors в архитектуре Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4