Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 14 мин
Средний
Hash PartitioningRange PartitioningCustom PartitionerPartition Pruning

Стратегии партиционирования

Партиционирование — один из ключевых инструментов оптимизации в Spark. Правильное партиционирование может сократить объём сканируемых данных в десятки раз и устранить shuffle при join. Неправильное — создаёт тысячи мелких файлов и убивает производительность.

Два уровня партиционирования

В Spark существует два принципиально разных типа партиционирования:

ТипГде работаетКогда применяется
In-memory partitioningВнутри Spark (DataFrame)repartition(), coalesce(), после shuffle
Filesystem partitioningНа диске (Parquet/ORC)partitionBy() при записи

Эти два механизма независимы друг от друга. In-memory партиционирование определяет, как данные распределены между tasks. Filesystem партиционирование определяет структуру директорий на диске.

Hash Partitioning

Hash partitioning распределяет данные по формуле hash(key) % numPartitions:

# In-memory: repartition по столбцу использует hash
df.repartition(16, "customer_id")

# Все записи с одинаковым customer_id гарантированно
# попадают в одну партицию

Свойства:

  • Равномерное распределение при высокой кардинальности ключа
  • Гарантия: одинаковые ключи — одна партиция
  • Используется в SortMergeJoin и GroupBy (Exchange hashpartitioning в explain())

Проблема: если один ключ имеет непропорционально много записей (data skew), одна партиция будет значительно больше остальных. Об этом подробнее — в уроке о Data Skew.

Range Partitioning

Range partitioning сортирует данные по ключу и делит на диапазоны:

# Range partitioning: данные сортируются и делятся на диапазоны
df.repartitionByRange(16, "order_date")

# Partition 0: 2024-01-01 .. 2024-01-23
# Partition 1: 2024-01-23 .. 2024-02-15
# ...
# Partition 15: 2024-11-10 .. 2024-12-31

Свойства:

  • Данные внутри партиции отсортированы
  • Границы определяются sampling (Spark берёт выборку для определения диапазонов)
  • Полезен для range-запросов (WHERE date BETWEEN ... AND ...)

Когда использовать:

  • Данные часто фильтруются по диапазону (даты, числовые ID)
  • Нужна глобальная сортировка (orderBy + write)

Filesystem Partitioning (partitionBy)

partitionBy() при записи создаёт директорию на каждое уникальное значение:

# Запись с filesystem partitioning
df.write.partitionBy("year", "month").parquet("/data/events/")

# Создаёт структуру:
# /data/events/year=2024/month=01/part-00000.parquet
# /data/events/year=2024/month=02/part-00000.parquet
# /data/events/year=2024/month=12/part-00000.parquet
# /data/events/year=2025/month=01/part-00000.parquet

Это не то же самое, что in-memory repartition. Filesystem partitioning определяет физическую структуру файлов на диске и включает partition pruning при чтении.

Partition Pruning

Partition pruning — это оптимизация, при которой Spark пропускает целые директории при фильтрации по partition column:

# Чтение partitioned таблицы с фильтром
df = spark.read.parquet("/data/events/")
result = df.filter(col("year") == 2024).filter(col("month") == 3)
result.explain()
== Physical Plan ==
*(1) FileScan parquet [event_id, event_type, ...]
     PushedFilters: []
     PartitionFilters: [year = 2024, month = 3]
     ReadSchema: struct<event_id:long, event_type:string, ...>

PartitionFilters показывает, что Spark читает только /data/events/year=2024/month=03/. Если у вас 5 лет данных (60 месяцев), partition pruning пропускает 59 из 60 директорий — сканирование сокращается в 60 раз.

Before (без partitioning):

FileScan parquet [events] -- 500 ГБ, полное сканирование
Duration: 12 минут

After (с partitionBy year, month):

FileScan parquet [events] -- 8.3 ГБ (1/60), partition pruning
Duration: 12 секунд
TIP

Partition pruning работает только с точными предикатами. Фильтры типа col("year") == 2024 или col("month").isin(1, 2, 3) активируют pruning. Но col("event_date") > "2024-03-01" по не-partition столбцу event_date не активирует pruning, даже если данные partitioned по year/month. Фильтруйте именно по partition columns.

Выбор Partition Key: руководство по кардинальности

Выбор partition key — критическое решение. Неправильный выбор создаёт проблемы, которые сложно исправить без полной перезаписи данных.

КардинальностьПримерыРезультат partitionByРекомендация
Очень низкая (2-10)status, region2-10 директорий, файлы по 50+ ГБХорошо как второй уровень
Низкая (10-100)month, department10-100 директорий, файлы 1-10 ГБИдеально для partitionBy
Средняя (100-10K)day, cityСотни-тысячи директорий, файлы 10-100 МБДопустимо, следите за размером файлов
Высокая (10K-1M)user_id, order_idМиллионы мелких файловНе используйте для partitionBy

Анти-паттерн: партиционирование по высокой кардинальности

# ОПАСНО: user_id имеет 10 миллионов уникальных значений
df.write.partitionBy("user_id").parquet("/data/events/")

# Результат: 10 миллионов директорий, каждая с файлами по 1 КБ
# - Listing этих директорий занимает 20+ минут
# - Namenode HDFS/S3 listing timeout
# - Каждый read создаёт миллионы tasks
# ПРАВИЛЬНО: партиционирование по году + месяцу
df.write.partitionBy("year", "month").parquet("/data/events/")

# Результат: 60 директорий (5 лет * 12 месяцев)
# - Каждый файл 1-10 ГБ
# - Partition pruning при фильтрации по дате
# - Быстрый listing

Комбинирование in-memory и filesystem partitioning

На практике часто нужно комбинировать оба подхода:

# 1. repartition для равномерного распределения по partition column
# 2. partitionBy для записи в директории
df.repartition("year", "month") \
  .write \
  .partitionBy("year", "month") \
  .parquet("/data/events/")

Без repartition() перед partitionBy() вы получите по одному файлу на каждый task на каждую partition value. Если у вас 200 tasks и 12 месяцев — это 2400 файлов вместо 12.

# Контроль числа файлов на партицию
df.repartition(1, "year", "month") \
  .write \
  .partitionBy("year", "month") \
  .parquet("/data/events/")
# Ровно 1 файл на каждую year/month комбинацию
TIP

Оптимальный размер файла Parquet — 128 МБ - 1 ГБ. Меньше 128 МБ — слишком много overhead от открытия/закрытия файлов и metadata. Больше 1 ГБ — читать придётся целиком даже для маленьких запросов (Parquet не поддерживает seek внутри row group эффективно для очень больших файлов).

Проверка знанийKnowledge check
У вас таблица events с 500 ГБ данных за 5 лет. Вы партиционируете по year и month. Запрос: SELECT * FROM events WHERE year = 2024 AND month = 3. Сколько данных просканирует Spark?
ОтветAnswer
Spark просканирует примерно 1/60 от общего объёма -- около 8.3 ГБ. Partition pruning пропустит все директории кроме year=2024/month=03. При равномерном распределении данных по месяцам (60 месяцев за 5 лет) каждая partition содержит ~8.3 ГБ. Ключевое условие: фильтр должен быть по partition columns (year, month), а не по произвольному столбцу.
Проверка знанийKnowledge check
Почему partitionBy('user_id') для таблицы с 10 миллионами пользователей -- анти-паттерн?
ОтветAnswer
partitionBy('user_id') создаст 10 миллионов директорий на файловой системе. Каждая будет содержать крошечные файлы (килобайты вместо мегабайт). Это приводит к: (1) listing directory занимает минуты/часы на HDFS и S3; (2) Namenode перегружен миллионами entries; (3) каждый read создаёт миллионы tasks с огромным scheduling overhead; (4) маленькие файлы не используют преимущества columnar format (compression, predicate pushdown). Правильный подход -- партиционировать по low-cardinality столбцу (year, month, region) и использовать bucketing для join optimization по user_id.

Что дальше?

В следующем уроке мы подробно разберём bucketing — механизм, который устраняет shuffle при повторяющихся join по одному и тому же ключу. Вы узнаете, как bucketBy() работает, какие ограничения существуют, и когда bucketing оправдан.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие два уровня партиционирования существуют в Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6