Bucketing: устранение shuffle при join
Каждый SortMergeJoin в Spark требует shuffle — перераспределение данных обеих таблиц по join key. Для таблиц, которые часто join-ятся по одному и тому же ключу, bucketing позволяет выполнить shuffle один раз при записи и навсегда устранить его при чтении.
Как работает bucketing
bucketBy(N, column) при записи распределяет данные по N bucket-файлам, используя hash(column) % N:
# Запись с bucketing
df_orders.write \
.bucketBy(16, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("orders_bucketed")
df_customers.write \
.bucketBy(16, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("customers_bucketed")
После записи Spark сохраняет metadata о bucketing в каталоге (Hive metastore). При чтении он знает, что данные уже распределены по customer_id в 16 buckets.
Shuffle-free join
Когда обе таблицы bucketed по одному ключу с одинаковым числом buckets, join выполняется без shuffle:
result = spark.table("orders_bucketed").join(
spark.table("customers_bucketed"), "customer_id"
)
result.explain()
Before (без bucketing):
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
: +- Exchange hashpartitioning(customer_id, 200) ← shuffle!
: +- *(1) FileScan parquet [orders]
+- *(2) Sort [customer_id ASC]
+- Exchange hashpartitioning(customer_id, 200) ← shuffle!
+- *(2) FileScan parquet [customers]
After (с bucketing):
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
: +- *(1) FileScan parquet default.orders_bucketed ← без Exchange!
+- *(2) Sort [customer_id ASC]
+- *(2) FileScan parquet default.customers_bucketed ← без Exchange!
Exchange исчез. Spark знает, что bucket 0 из orders содержит те же customer_id, что bucket 0 из customers. Join выполняется локально в каждом bucket — никакого сетевого обмена.
Экономия на shuffle. Для 500 ГБ orders JOIN 50 ГБ customers без bucketing Spark перемещает ~550 ГБ по сети. С bucketing — 0 ГБ. На кластере с 10 Gbit/s сетью это разница между 7+ минутами сетевого I/O и нулём.
sortBy() для merge-оптимизации
sortBy() в комбинации с bucketBy() дополнительно сортирует данные внутри каждого bucket:
df.write \
.bucketBy(16, "customer_id") \
.sortBy("customer_id") \ # сортировка внутри bucket
.saveAsTable("orders_bucketed")
Без sortBy Spark при SortMergeJoin должен сортировать данные каждого bucket перед merge. С sortBy данные уже отсортированы — Sort в физическом плане становится no-op.
Выбор числа buckets
| Размер таблицы | Рекомендуемое число buckets | Размер bucket |
|---|---|---|
| 1-10 ГБ | 8-16 | 60-1250 МБ |
| 10-100 ГБ | 16-64 | 150-6250 МБ |
| 100 ГБ - 1 ТБ | 64-256 | 400-16000 МБ |
| > 1 ТБ | 256-1024 | Зависит от данных |
Правила:
- Число buckets должно быть степенью двойки (8, 16, 32, 64…) — это не требование Spark, но хорошая практика для равномерного хеширования
- Каждый bucket должен содержать 128 МБ - 1 ГБ данных
- Число buckets у join-таблиц должно быть одинаковым (или кратным)
Анти-паттерн: несовпадение числа buckets
# ПРОБЛЕМА: разное число buckets
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_b")
customers.write.bucketBy(32, "customer_id").saveAsTable("customers_b")
# Spark НЕ может сделать shuffle-free join!
# bucket 0 из orders (hash % 16) не совпадает с bucket 0 из customers (hash % 32)
# Exchange вернётся в план
# ПРАВИЛЬНО: одинаковое число buckets
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_b")
customers.write.bucketBy(16, "customer_id").saveAsTable("customers_b")
# Shuffle-free join работает
Совет: если у вас 5 таблиц, которые часто join-ятся по customer_id, запишите все с одинаковым числом buckets (например, 32). Это устранит shuffle для любой комбинации join между ними.
Ограничения bucketing
Bucketing имеет несколько важных ограничений:
1. Только Hive tables
# РАБОТАЕТ: saveAsTable (Hive metastore)
df.write.bucketBy(16, "id").saveAsTable("bucketed_table")
# НЕ РАБОТАЕТ: write.parquet (нет metastore для metadata)
df.write.bucketBy(16, "id").parquet("/path/") # UnsupportedOperationException
Bucketing metadata хранится в Hive metastore. Без metastore Spark не знает, что файлы уже распределены по buckets.
2. Delta Lake и Iceberg
По умолчанию Delta Lake и Iceberg не поддерживают Hive bucketing. У них свои механизмы:
- Delta Lake: Z-ordering (zOrderBy) для data skipping
- Iceberg: partition evolution с bucket transform
3. Перезапись при изменении
Если вы добавляете данные в bucketed таблицу через append, новые данные тоже должны быть bucketed с тем же числом buckets. Иначе metadata становится inconsistent.
4. Не помогает для groupBy
Bucketing оптимизирует join, но не groupBy. Даже если таблица bucketed по city, groupBy("city") всё равно сделает partial aggregate + Exchange.
Когда использовать bucketing
| Сценарий | Bucketing полезен? |
|---|---|
| Часто join таблицы A и B по ключу X | Да — устраняет shuffle |
| Одноразовый join | Нет — overhead записи не окупится |
| Данные в Delta/Iceberg | Нет — используйте Z-ordering/partition evolution |
| groupBy оптимизация | Нет — bucketing не помогает для агрегации |
| Очень большие таблицы с частым join | Да — экономия на shuffle значительна |
Что дальше?
В следующем уроке разберём broadcast join и хинты оптимизатору — как подсказать Catalyst правильную стратегию join, когда автоматический выбор не оптимален.