Оптимизация skew join
Проблема: одна партиция на 80% данных
В модуле 04 мы разбирали data skew на примере городов: в таблице заказов 80% записей принадлежат Москве, 10% — Санкт-Петербургу, и оставшиеся 10% распределены между другими городами. При join по city партиция с ключом “Москва” получает в 8-16 раз больше данных, чем остальные.
# Типичный skew-сценарий: заказы по городам
df_orders = spark.read.parquet("/data/orders/") # 100M строк
df_cities = spark.read.parquet("/data/city_info/") # 200 строк
# 80M строк с city='Москва', 10M с city='СПб', 10M -- остальные
result = df_orders.join(df_cities, "city")
Без AQE при SortMergeJoin:
== Physical Plan ==
*(3) SortMergeJoin [city], [city]
:- *(1) Sort [city ASC]
: +- Exchange hashpartitioning(city, 200)
: +- *(1) FileScan parquet [orders]
+- *(2) Sort [city ASC]
+- Exchange hashpartitioning(city, 200)
+- *(2) FileScan parquet [city_info]
Результат: 199 tasks обрабатывают по ~100K строк за секунды. Один task (Москва) обрабатывает 80M строк в течение 40 минут. Общее время определяется самым медленным task — straggler task.
Task distribution (без AQE):
Task 0 (Москва): ████████████████████████████████████████ 80M rows (40 min)
Task 1 (СПб): ████ 10M rows (5 min)
Task 2 (Казань): █ 500K rows (0.3 min)
...
Task 199 (Анадырь): ▏ 2K rows (0.01 min)
Как AQE обнаруживает skew
AQE использует два условия для определения skewed-партиции. Партиция считается skewed, если оба условия выполняются:
-
Размер партиции > median * skewedPartitionFactor
skewedPartitionFactor = 5.0(по умолчанию)- Медиана вычисляется из реальных размеров всех партиций данного stage
-
Размер партиции > skewedPartitionThresholdInBytes
skewedPartitionThresholdInBytes = 256MB(по умолчанию)- Абсолютный порог — маленькие партиции не считаются skewed, даже если они больше медианы в 100 раз
Пример (100M строк, ~40 ГБ):
Median partition size: 200 МБ
Threshold: 256 МБ
Москва: 32 ГБ → 32000 МБ > 200 * 5 = 1000 МБ [OK] AND > 256 МБ [OK] → SKEWED
СПб: 4 ГБ → 4000 МБ > 1000 МБ [OK] AND > 256 МБ [OK] → SKEWED
Казань: 200 МБ → 200 МБ < 1000 МБ [NO] → normal
Анадырь: 0.8 МБ → 0.8 МБ < 1000 МБ [NO] → normal
Стратегия: Split and Replicate
Когда AQE обнаруживает skewed-партицию в join, он применяет стратегию split-and-replicate:
- Split: Skewed-партиция разбивается на несколько под-партиций (по
advisoryPartitionSizeInBytes = 64MB) - Replicate: Соответствующая партиция с другой стороны join дублируется для каждой под-партиции
- Join: Каждая под-партиция обрабатывается отдельным task параллельно
Без AQE:
orders[Москва] (32 ГБ) ↔ city_info[Москва] (1 строка) → 1 task, 40 мин
С AQE (split into 500 sub-partitions по 64 МБ):
orders[Москва_0] (64 МБ) ↔ city_info[Москва] (копия) → task 0, 5 сек
orders[Москва_1] (64 МБ) ↔ city_info[Москва] (копия) → task 1, 5 сек
...
orders[Москва_499] (64 МБ) ↔ city_info[Москва] (копия) → task 499, 5 сек
В explain() это выглядит так:
== Physical Plan == (с AQE)
*(3) SortMergeJoin [city], [city], SkewJoin # <-- SkewJoin маркер!
:- AQEShuffleRead
: +- Exchange hashpartitioning(city, 200)
: +- *(1) FileScan parquet [orders]
+- AQEShuffleRead
+- Exchange hashpartitioning(city, 200)
+- *(2) FileScan parquet [city_info]
Обратите внимание на SkewJoin маркер в SortMergeJoin — он указывает, что AQE применил skew-оптимизацию.
Before/After: московский пример
| Метрика | Без AQE skew join | С AQE skew join |
|---|---|---|
| Самый большой task | 32 ГБ (Москва, 1 task) | 64 МБ (Москва sub-partition) |
| Время самого медленного task | ~40 мин | ~5 сек |
| Параллелизм для Москвы | 1 task | 500 tasks |
| Overhead (репликация) | 0 | ~200 строк * 500 = 100K копий (ничтожно) |
| Общее время stage | ~40 мин | ~30 сек |
| Ускорение | — | ~80x |
Стоимость репликации минимальна: маленькая сторона join (city_info) дублируется для каждой под-партиции, но это всего 200 строк — ничтожный объём по сравнению с выигрышем от параллелизма.
Важно: AQE skew join работает только с SortMergeJoin. Если Catalyst выбрал BroadcastHashJoin, skew не проблема — маленькая таблица уже целиком на каждом executor. ShuffleHashJoin также не поддерживает skew-оптимизацию через AQE.
Конфигурация skew join
| Параметр | Значение | Рекомендация |
|---|---|---|
skewJoin.enabled | true | Оставьте включённым |
skewedPartitionFactor | 5.0 | Увеличьте до 10, если AQE слишком агрессивно разбивает партиции |
skewedPartitionThresholdInBytes | 256MB | Уменьшите до 64MB для маленьких данных |
forceOptimizeSkewedJoin | false | Включите true для принудительной оптимизации (игнорирует threshold) |
# Для данных с умеренным skew -- увеличьте factor
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "10.0")
# Для маленьких данных с skew -- уменьшите threshold
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "64MB")
# Принудительная оптимизация -- для отладки
spark.conf.set("spark.sql.adaptive.forceOptimizeSkewedJoin", "true")
Анти-паттерн: слишком низкий skewedPartitionFactor
# ОПАСНО: factor = 1.5 -- почти любая партиция больше медианы в 1.5x считается skewed
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1.5")
Проблема: с factor = 1.5 AQE будет разбивать партиции, которые лишь немного больше медианы. Каждое разбиение — это дополнительная репликация данных с другой стороны join. Для нормально распределённых данных это создаёт больше overhead, чем экономит.
Правило: skewedPartitionFactor = 5.0 — хороший баланс. Уменьшайте только если вы точно знаете, что skew существует, но не достигает порога 5x.
Что дальше?
В следующем уроке мы разберём Dynamic Partition Pruning (DPP) — механизм, который позволяет AQE фильтровать партиции fact-таблицы на основе результатов join с dimension-таблицей, значительно сокращая объём сканируемых данных.