Learning Platform
Глоссарий Troubleshooting
Урок 04.03 · 12 мин
Средний
BucketingbucketBysortByShuffle-Free Join

Bucketing: устранение shuffle при join

Каждый SortMergeJoin в Spark требует shuffle — перераспределение данных обеих таблиц по join key. Для таблиц, которые часто join-ятся по одному и тому же ключу, bucketing позволяет выполнить shuffle один раз при записи и навсегда устранить его при чтении.

Как работает bucketing

bucketBy(N, column) при записи распределяет данные по N bucket-файлам, используя hash(column) % N:

# Запись с bucketing
df_orders.write \
    .bucketBy(16, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("orders_bucketed")

df_customers.write \
    .bucketBy(16, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("customers_bucketed")

После записи Spark сохраняет metadata о bucketing в каталоге (Hive metastore). При чтении он знает, что данные уже распределены по customer_id в 16 buckets.

Shuffle-free join

Когда обе таблицы bucketed по одному ключу с одинаковым числом buckets, join выполняется без shuffle:

result = spark.table("orders_bucketed").join(
    spark.table("customers_bucketed"), "customer_id"
)
result.explain()

Before (без bucketing):

== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
:  +- Exchange hashpartitioning(customer_id, 200)   ← shuffle!
:     +- *(1) FileScan parquet [orders]
+- *(2) Sort [customer_id ASC]
   +- Exchange hashpartitioning(customer_id, 200)   ← shuffle!
      +- *(2) FileScan parquet [customers]

After (с bucketing):

== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
:  +- *(1) FileScan parquet default.orders_bucketed   ← без Exchange!
+- *(2) Sort [customer_id ASC]
   +- *(2) FileScan parquet default.customers_bucketed  ← без Exchange!

Exchange исчез. Spark знает, что bucket 0 из orders содержит те же customer_id, что bucket 0 из customers. Join выполняется локально в каждом bucket — никакого сетевого обмена.

TIP

Экономия на shuffle. Для 500 ГБ orders JOIN 50 ГБ customers без bucketing Spark перемещает ~550 ГБ по сети. С bucketing — 0 ГБ. На кластере с 10 Gbit/s сетью это разница между 7+ минутами сетевого I/O и нулём.

sortBy() для merge-оптимизации

sortBy() в комбинации с bucketBy() дополнительно сортирует данные внутри каждого bucket:

df.write \
    .bucketBy(16, "customer_id") \
    .sortBy("customer_id") \        # сортировка внутри bucket
    .saveAsTable("orders_bucketed")

Без sortBy Spark при SortMergeJoin должен сортировать данные каждого bucket перед merge. С sortBy данные уже отсортированы — Sort в физическом плане становится no-op.

Выбор числа buckets

Размер таблицыРекомендуемое число bucketsРазмер bucket
1-10 ГБ8-1660-1250 МБ
10-100 ГБ16-64150-6250 МБ
100 ГБ - 1 ТБ64-256400-16000 МБ
> 1 ТБ256-1024Зависит от данных

Правила:

  1. Число buckets должно быть степенью двойки (8, 16, 32, 64…) — это не требование Spark, но хорошая практика для равномерного хеширования
  2. Каждый bucket должен содержать 128 МБ - 1 ГБ данных
  3. Число buckets у join-таблиц должно быть одинаковым (или кратным)

Анти-паттерн: несовпадение числа buckets

# ПРОБЛЕМА: разное число buckets
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_b")
customers.write.bucketBy(32, "customer_id").saveAsTable("customers_b")

# Spark НЕ может сделать shuffle-free join!
# bucket 0 из orders (hash % 16) не совпадает с bucket 0 из customers (hash % 32)
# Exchange вернётся в план
# ПРАВИЛЬНО: одинаковое число buckets
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_b")
customers.write.bucketBy(16, "customer_id").saveAsTable("customers_b")
# Shuffle-free join работает
TIP

Совет: если у вас 5 таблиц, которые часто join-ятся по customer_id, запишите все с одинаковым числом buckets (например, 32). Это устранит shuffle для любой комбинации join между ними.

Ограничения bucketing

Bucketing имеет несколько важных ограничений:

1. Только Hive tables

# РАБОТАЕТ: saveAsTable (Hive metastore)
df.write.bucketBy(16, "id").saveAsTable("bucketed_table")

# НЕ РАБОТАЕТ: write.parquet (нет metastore для metadata)
df.write.bucketBy(16, "id").parquet("/path/")  # UnsupportedOperationException

Bucketing metadata хранится в Hive metastore. Без metastore Spark не знает, что файлы уже распределены по buckets.

2. Delta Lake и Iceberg

По умолчанию Delta Lake и Iceberg не поддерживают Hive bucketing. У них свои механизмы:

  • Delta Lake: Z-ordering (zOrderBy) для data skipping
  • Iceberg: partition evolution с bucket transform

3. Перезапись при изменении

Если вы добавляете данные в bucketed таблицу через append, новые данные тоже должны быть bucketed с тем же числом buckets. Иначе metadata становится inconsistent.

4. Не помогает для groupBy

Bucketing оптимизирует join, но не groupBy. Даже если таблица bucketed по city, groupBy("city") всё равно сделает partial aggregate + Exchange.

Когда использовать bucketing

СценарийBucketing полезен?
Часто join таблицы A и B по ключу XДа — устраняет shuffle
Одноразовый joinНет — overhead записи не окупится
Данные в Delta/IcebergНет — используйте Z-ordering/partition evolution
groupBy оптимизацияНет — bucketing не помогает для агрегации
Очень большие таблицы с частым joinДа — экономия на shuffle значительна
Проверка знанийKnowledge check
Что произойдёт, если orders_bucketed имеет 16 buckets, а customers_bucketed -- 32 buckets?
ОтветAnswer
Spark не сможет выполнить shuffle-free join. При 16 buckets используется hash(key) % 16, при 32 -- hash(key) % 32. Bucket 0 первой таблицы содержит ключи с hash % 16 == 0, но bucket 0 второй таблицы содержит ключи с hash % 32 == 0 -- это разные наборы ключей. Spark будет вынужден вставить Exchange (shuffle) для перераспределения данных. Решение: всегда использовать одинаковое число buckets для таблиц, которые join-ятся друг с другом.
Проверка знанийKnowledge check
Почему bucketing не помогает оптимизировать groupBy, хотя данные уже распределены по ключу?
ОтветAnswer
Bucketing гарантирует, что одинаковые ключи находятся в одном bucket-файле. Для join это достаточно -- Spark сопоставляет bucket 0 из таблицы A с bucket 0 из таблицы B. Но для groupBy().agg() Spark выполняет partial aggregate на каждом task, а затем Exchange для финального объединения результатов из разных executors. Даже если ключи уже в одном bucket, partial aggregates из разных executors всё равно нужно объединить. Bucketing устраняет only shuffle для join, не для aggregation.

Что дальше?

В следующем уроке разберём broadcast join и хинты оптимизатору — как подсказать Catalyst правильную стратегию join, когда автоматический выбор не оптимален.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Как bucketing устраняет shuffle при join?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6