Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 14 мин
Продвинутый
Skew JoinskewedPartitionFactorskewedPartitionThresholdInBytesPartition Splitting

Оптимизация skew join

Проблема: одна партиция на 80% данных

В модуле 04 мы разбирали data skew на примере городов: в таблице заказов 80% записей принадлежат Москве, 10% — Санкт-Петербургу, и оставшиеся 10% распределены между другими городами. При join по city партиция с ключом “Москва” получает в 8-16 раз больше данных, чем остальные.

# Типичный skew-сценарий: заказы по городам
df_orders = spark.read.parquet("/data/orders/")       # 100M строк
df_cities = spark.read.parquet("/data/city_info/")    # 200 строк

# 80M строк с city='Москва', 10M с city='СПб', 10M -- остальные
result = df_orders.join(df_cities, "city")

Без AQE при SortMergeJoin:

== Physical Plan ==
*(3) SortMergeJoin [city], [city]
:- *(1) Sort [city ASC]
:  +- Exchange hashpartitioning(city, 200)
:     +- *(1) FileScan parquet [orders]
+- *(2) Sort [city ASC]
   +- Exchange hashpartitioning(city, 200)
      +- *(2) FileScan parquet [city_info]

Результат: 199 tasks обрабатывают по ~100K строк за секунды. Один task (Москва) обрабатывает 80M строк в течение 40 минут. Общее время определяется самым медленным task — straggler task.

Task distribution (без AQE):
Task 0 (Москва):    ████████████████████████████████████████ 80M rows (40 min)
Task 1 (СПб):       ████ 10M rows (5 min)
Task 2 (Казань):    █ 500K rows (0.3 min)
...
Task 199 (Анадырь): ▏ 2K rows (0.01 min)

Как AQE обнаруживает skew

AQE использует два условия для определения skewed-партиции. Партиция считается skewed, если оба условия выполняются:

  1. Размер партиции > median * skewedPartitionFactor

    • skewedPartitionFactor = 5.0 (по умолчанию)
    • Медиана вычисляется из реальных размеров всех партиций данного stage
  2. Размер партиции > skewedPartitionThresholdInBytes

    • skewedPartitionThresholdInBytes = 256MB (по умолчанию)
    • Абсолютный порог — маленькие партиции не считаются skewed, даже если они больше медианы в 100 раз
Пример (100M строк, ~40 ГБ):
  Median partition size: 200 МБ
  Threshold: 256 МБ

  Москва: 32 ГБ   → 32000 МБ > 200 * 5 = 1000 МБ [OK] AND > 256 МБ [OK]  → SKEWED
  СПб:    4 ГБ    → 4000 МБ > 1000 МБ [OK] AND > 256 МБ [OK]              → SKEWED
  Казань: 200 МБ  → 200 МБ < 1000 МБ [NO]                              → normal
  Анадырь: 0.8 МБ → 0.8 МБ < 1000 МБ [NO]                              → normal

Стратегия: Split and Replicate

Когда AQE обнаруживает skewed-партицию в join, он применяет стратегию split-and-replicate:

  1. Split: Skewed-партиция разбивается на несколько под-партиций (по advisoryPartitionSizeInBytes = 64MB)
  2. Replicate: Соответствующая партиция с другой стороны join дублируется для каждой под-партиции
  3. Join: Каждая под-партиция обрабатывается отдельным task параллельно
Без AQE:
  orders[Москва] (32 ГБ) ↔ city_info[Москва] (1 строка) → 1 task, 40 мин

С AQE (split into 500 sub-partitions по 64 МБ):
  orders[Москва_0] (64 МБ) ↔ city_info[Москва] (копия) → task 0, 5 сек
  orders[Москва_1] (64 МБ) ↔ city_info[Москва] (копия) → task 1, 5 сек
  ...
  orders[Москва_499] (64 МБ) ↔ city_info[Москва] (копия) → task 499, 5 сек

В explain() это выглядит так:

== Physical Plan == (с AQE)
*(3) SortMergeJoin [city], [city], SkewJoin        # <-- SkewJoin маркер!
:- AQEShuffleRead
:  +- Exchange hashpartitioning(city, 200)
:     +- *(1) FileScan parquet [orders]
+- AQEShuffleRead
   +- Exchange hashpartitioning(city, 200)
      +- *(2) FileScan parquet [city_info]

Обратите внимание на SkewJoin маркер в SortMergeJoin — он указывает, что AQE применил skew-оптимизацию.

Before/After: московский пример

МетрикаБез AQE skew joinС AQE skew join
Самый большой task32 ГБ (Москва, 1 task)64 МБ (Москва sub-partition)
Время самого медленного task~40 мин~5 сек
Параллелизм для Москвы1 task500 tasks
Overhead (репликация)0~200 строк * 500 = 100K копий (ничтожно)
Общее время stage~40 мин~30 сек
Ускорение~80x

Стоимость репликации минимальна: маленькая сторона join (city_info) дублируется для каждой под-партиции, но это всего 200 строк — ничтожный объём по сравнению с выигрышем от параллелизма.

WARNING

Важно: AQE skew join работает только с SortMergeJoin. Если Catalyst выбрал BroadcastHashJoin, skew не проблема — маленькая таблица уже целиком на каждом executor. ShuffleHashJoin также не поддерживает skew-оптимизацию через AQE.

Конфигурация skew join

ПараметрЗначениеРекомендация
skewJoin.enabledtrueОставьте включённым
skewedPartitionFactor5.0Увеличьте до 10, если AQE слишком агрессивно разбивает партиции
skewedPartitionThresholdInBytes256MBУменьшите до 64MB для маленьких данных
forceOptimizeSkewedJoinfalseВключите true для принудительной оптимизации (игнорирует threshold)
# Для данных с умеренным skew -- увеличьте factor
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "10.0")

# Для маленьких данных с skew -- уменьшите threshold
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "64MB")

# Принудительная оптимизация -- для отладки
spark.conf.set("spark.sql.adaptive.forceOptimizeSkewedJoin", "true")

Анти-паттерн: слишком низкий skewedPartitionFactor

# ОПАСНО: factor = 1.5 -- почти любая партиция больше медианы в 1.5x считается skewed
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1.5")

Проблема: с factor = 1.5 AQE будет разбивать партиции, которые лишь немного больше медианы. Каждое разбиение — это дополнительная репликация данных с другой стороны join. Для нормально распределённых данных это создаёт больше overhead, чем экономит.

Правило: skewedPartitionFactor = 5.0 — хороший баланс. Уменьшайте только если вы точно знаете, что skew существует, но не достигает порога 5x.

Проверка знанийKnowledge check
Почему для определения skew используются два условия (factor И threshold), а не одно?
ОтветAnswer
Factor (5x медианы) обнаруживает относительный перекос, но при маленьких данных медиана может быть 1 КБ, и партиция в 5 КБ будет считаться skewed — разбивать такие партиции бессмысленно, overhead от split превысит выигрыш. Absolute threshold (256 МБ) гарантирует, что AQE разбивает только реально большие партиции, где straggler task действительно замедляет весь stage. Оба условия должны выполняться одновременно.
Проверка знанийKnowledge check
Что произойдёт, если обе стороны join имеют skew по одному и тому же ключу?
ОтветAnswer
AQE разбивает skewed-партицию на одной стороне и реплицирует соответствующую партицию с другой стороны. Если обе стороны skewed по ключу 'Москва', AQE разбивает бОльшую сторону и реплицирует меньшую. В худшем случае (обе стороны одинаково большие) AQE разбивает обе и выполняет cartesian product между под-партициями — это всё ещё лучше, чем один straggler task, но overhead от репликации растёт.

Что дальше?

В следующем уроке мы разберём Dynamic Partition Pruning (DPP) — механизм, который позволяет AQE фильтровать партиции fact-таблицы на основе результатов join с dimension-таблицей, значительно сокращая объём сканируемых данных.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какую стратегию AQE применяет для skewed-партиции при join?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5