Стратегии партиционирования
Партиционирование — один из ключевых инструментов оптимизации в Spark. Правильное партиционирование может сократить объём сканируемых данных в десятки раз и устранить shuffle при join. Неправильное — создаёт тысячи мелких файлов и убивает производительность.
Два уровня партиционирования
В Spark существует два принципиально разных типа партиционирования:
| Тип | Где работает | Когда применяется |
|---|---|---|
| In-memory partitioning | Внутри Spark (DataFrame) | repartition(), coalesce(), после shuffle |
| Filesystem partitioning | На диске (Parquet/ORC) | partitionBy() при записи |
Эти два механизма независимы друг от друга. In-memory партиционирование определяет, как данные распределены между tasks. Filesystem партиционирование определяет структуру директорий на диске.
Hash Partitioning
Hash partitioning распределяет данные по формуле hash(key) % numPartitions:
# In-memory: repartition по столбцу использует hash
df.repartition(16, "customer_id")
# Все записи с одинаковым customer_id гарантированно
# попадают в одну партицию
Свойства:
- Равномерное распределение при высокой кардинальности ключа
- Гарантия: одинаковые ключи — одна партиция
- Используется в SortMergeJoin и GroupBy (Exchange hashpartitioning в explain())
Проблема: если один ключ имеет непропорционально много записей (data skew), одна партиция будет значительно больше остальных. Об этом подробнее — в уроке о Data Skew.
Range Partitioning
Range partitioning сортирует данные по ключу и делит на диапазоны:
# Range partitioning: данные сортируются и делятся на диапазоны
df.repartitionByRange(16, "order_date")
# Partition 0: 2024-01-01 .. 2024-01-23
# Partition 1: 2024-01-23 .. 2024-02-15
# ...
# Partition 15: 2024-11-10 .. 2024-12-31
Свойства:
- Данные внутри партиции отсортированы
- Границы определяются sampling (Spark берёт выборку для определения диапазонов)
- Полезен для range-запросов (
WHERE date BETWEEN ... AND ...)
Когда использовать:
- Данные часто фильтруются по диапазону (даты, числовые ID)
- Нужна глобальная сортировка (orderBy + write)
Filesystem Partitioning (partitionBy)
partitionBy() при записи создаёт директорию на каждое уникальное значение:
# Запись с filesystem partitioning
df.write.partitionBy("year", "month").parquet("/data/events/")
# Создаёт структуру:
# /data/events/year=2024/month=01/part-00000.parquet
# /data/events/year=2024/month=02/part-00000.parquet
# /data/events/year=2024/month=12/part-00000.parquet
# /data/events/year=2025/month=01/part-00000.parquet
Это не то же самое, что in-memory repartition. Filesystem partitioning определяет физическую структуру файлов на диске и включает partition pruning при чтении.
Partition Pruning
Partition pruning — это оптимизация, при которой Spark пропускает целые директории при фильтрации по partition column:
# Чтение partitioned таблицы с фильтром
df = spark.read.parquet("/data/events/")
result = df.filter(col("year") == 2024).filter(col("month") == 3)
result.explain()
== Physical Plan ==
*(1) FileScan parquet [event_id, event_type, ...]
PushedFilters: []
PartitionFilters: [year = 2024, month = 3]
ReadSchema: struct<event_id:long, event_type:string, ...>
PartitionFilters показывает, что Spark читает только /data/events/year=2024/month=03/. Если у вас 5 лет данных (60 месяцев), partition pruning пропускает 59 из 60 директорий — сканирование сокращается в 60 раз.
Before (без partitioning):
FileScan parquet [events] -- 500 ГБ, полное сканирование
Duration: 12 минут
After (с partitionBy year, month):
FileScan parquet [events] -- 8.3 ГБ (1/60), partition pruning
Duration: 12 секунд
Partition pruning работает только с точными предикатами. Фильтры типа col("year") == 2024 или col("month").isin(1, 2, 3) активируют pruning. Но col("event_date") > "2024-03-01" по не-partition столбцу event_date не активирует pruning, даже если данные partitioned по year/month. Фильтруйте именно по partition columns.
Выбор Partition Key: руководство по кардинальности
Выбор partition key — критическое решение. Неправильный выбор создаёт проблемы, которые сложно исправить без полной перезаписи данных.
| Кардинальность | Примеры | Результат partitionBy | Рекомендация |
|---|---|---|---|
| Очень низкая (2-10) | status, region | 2-10 директорий, файлы по 50+ ГБ | Хорошо как второй уровень |
| Низкая (10-100) | month, department | 10-100 директорий, файлы 1-10 ГБ | Идеально для partitionBy |
| Средняя (100-10K) | day, city | Сотни-тысячи директорий, файлы 10-100 МБ | Допустимо, следите за размером файлов |
| Высокая (10K-1M) | user_id, order_id | Миллионы мелких файлов | Не используйте для partitionBy |
Анти-паттерн: партиционирование по высокой кардинальности
# ОПАСНО: user_id имеет 10 миллионов уникальных значений
df.write.partitionBy("user_id").parquet("/data/events/")
# Результат: 10 миллионов директорий, каждая с файлами по 1 КБ
# - Listing этих директорий занимает 20+ минут
# - Namenode HDFS/S3 listing timeout
# - Каждый read создаёт миллионы tasks
# ПРАВИЛЬНО: партиционирование по году + месяцу
df.write.partitionBy("year", "month").parquet("/data/events/")
# Результат: 60 директорий (5 лет * 12 месяцев)
# - Каждый файл 1-10 ГБ
# - Partition pruning при фильтрации по дате
# - Быстрый listing
Комбинирование in-memory и filesystem partitioning
На практике часто нужно комбинировать оба подхода:
# 1. repartition для равномерного распределения по partition column
# 2. partitionBy для записи в директории
df.repartition("year", "month") \
.write \
.partitionBy("year", "month") \
.parquet("/data/events/")
Без repartition() перед partitionBy() вы получите по одному файлу на каждый task на каждую partition value. Если у вас 200 tasks и 12 месяцев — это 2400 файлов вместо 12.
# Контроль числа файлов на партицию
df.repartition(1, "year", "month") \
.write \
.partitionBy("year", "month") \
.parquet("/data/events/")
# Ровно 1 файл на каждую year/month комбинацию
Оптимальный размер файла Parquet — 128 МБ - 1 ГБ. Меньше 128 МБ — слишком много overhead от открытия/закрытия файлов и metadata. Больше 1 ГБ — читать придётся целиком даже для маленьких запросов (Parquet не поддерживает seek внутри row group эффективно для очень больших файлов).
Что дальше?
В следующем уроке мы подробно разберём bucketing — механизм, который устраняет shuffle при повторяющихся join по одному и тому же ключу. Вы узнаете, как bucketBy() работает, какие ограничения существуют, и когда bucketing оправдан.