Data Skew: диагностика и решения
Data skew — это неравномерное распределение данных по ключу. Один ключ содержит непропорционально много записей, и executor, обрабатывающий этот ключ, становится bottleneck всего job. 99 executors закончили за 30 секунд, один — работает 45 минут.
Визуализация skew
Реальный пример: Москва
Представьте таблицу заказов по городам России:
# Распределение данных
cities = spark.createDataFrame([
("Москва", 800000),
("Санкт-Петербург", 80000),
("Казань", 60000),
("Новосибирск", 60000),
], ["city", "order_count"])
При groupBy("city").agg(...) Spark распределяет данные по hash(city). Москва — 800K записей, остальные города — 60-80K. Партиция с Москвой в 10 раз больше.
# GroupBy по city с skew
orders.groupBy("city").agg(
count("*").alias("total"),
avg("amount").alias("avg_amount")
).show()
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1), avg(amount)])
+- Exchange hashpartitioning(city, 200) ← Москва = 800K в одну партицию
+- *(1) HashAggregate(keys=[city], functions=[partial_count(1), partial_avg(amount)])
+- *(1) FileScan parquet [orders]
Диагностика skew через Spark UI
Симптомы в Spark UI
- Stages tab: одна stage выполняется значительно дольше остальных
- Tasks tab внутри stage: один task с duration 45 мин, остальные — 30 сек
- Summary Metrics: огромная разница между Min, Median и Max для:
- Shuffle Read Size: median 50 МБ, max 5 ГБ
- Duration: median 30 sec, max 2700 sec
- GC Time: straggler task показывает высокий GC time
Анти-паттерн: игнорирование straggler tasks
Spark UI -> Stages -> Stage 2 -> Tasks
Задача | Duration | Shuffle Read | GC Time
-------|----------|-------------|--------
Task 0 | 28 sec | 48 МБ | 1 sec
Task 1 | 31 sec | 52 МБ | 1 sec
...
Task 47| 29 sec | 45 МБ | 1 sec
Task 48| 42 мин | 4.8 ГБ | 3 мин ← STRAGGLER!
Если вы видите одну задачу с duration в 50-100 раз больше median — это data skew. Не спекулятивное выполнение, не медленный node, а неравномерные данные.
Решение 1: Salting
Salting разбивает горячий ключ на N под-ключей, распределяя нагрузку:
from pyspark.sql.functions import col, concat, lit, floor, rand
# Шаг 1: Добавляем salt (0-3) к ключу
SALT_BUCKETS = 4
orders_salted = orders.withColumn(
"salted_city",
concat(col("city"), lit("_"), floor(rand() * SALT_BUCKETS).cast("int"))
)
# Было: city = "Москва" (800K записей)
# Стало: salted_city = "Москва_0" (~200K), "Москва_1" (~200K),
# "Москва_2" (~200K), "Москва_3" (~200K)
Salting для GroupBy
# Шаг 2: Первая агрегация по salted key
partial = orders_salted.groupBy("salted_city").agg(
count("*").alias("partial_count"),
sum("amount").alias("partial_sum")
)
# Шаг 3: Убираем salt и финальная агрегация
from pyspark.sql.functions import split
result = partial.withColumn(
"city", split(col("salted_city"), "_")[0]
).groupBy("city").agg(
sum("partial_count").alias("total"),
sum("partial_sum").alias("total_amount")
)
Теперь вместо одной партиции с 800K записей Москвы — 4 партиции по ~200K. Двухпроходная агрегация: partial aggregate с salt, затем final aggregate без salt.
Salting для Join
# Таблица фактов (skewed)
orders_salted = orders.withColumn(
"salt", floor(rand() * SALT_BUCKETS).cast("int")
)
# Dimension таблица: реплицируем для каждого salt value
from pyspark.sql.functions import explode, array, lit
cities_replicated = cities.withColumn(
"salt", explode(array([lit(i) for i in range(SALT_BUCKETS)]))
)
# Join по составному ключу
result = orders_salted.join(
cities_replicated,
(orders_salted.city == cities_replicated.city) &
(orders_salted.salt == cities_replicated.salt)
)
Dimension таблица реплицируется SALT_BUCKETS раз (4 копии), но каждая partition обрабатывает ~1/4 данных Москвы. Компромисс: больше данных dimension table в обмен на равномерную нагрузку.
Выбор числа salt buckets. Число buckets зависит от степени skew. Если горячий ключ в 10 раз больше среднего — 4-8 buckets. Если в 100 раз — 16-32 buckets. Формула: buckets = ceil(skewed_key_size / median_key_size). Слишком много buckets увеличивает overhead dimension replication.
Решение 2: AQE Skew Join Optimization
Adaptive Query Execution (AQE) автоматически обнаруживает и обрабатывает skew в runtime:
# AQE включён по умолчанию с Spark 3.2
spark.conf.get("spark.sql.adaptive.enabled") # 'true'
spark.conf.get("spark.sql.adaptive.skewJoin.enabled") # 'true'
Как AQE обнаруживает skew
AQE считает партицию skewed, если выполняются оба условия:
- Размер партиции >
skewedPartitionFactor(5.0) * median размер всех партиций - Размер партиции >
skewedPartitionThresholdInBytes(256 МБ)
# Конфигурация AQE skew handling
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5.0")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
Пример: median partition = 50 МБ, Москва partition = 5 ГБ:
- 5 ГБ > 5.0 * 50 МБ = 250 МБ — условие 1 выполняется
- 5 ГБ > 256 МБ — условие 2 выполняется
- AQE автоматически разбивает эту партицию
Что делает AQE с skewed partition
AQE разбивает skewed partition на sub-partitions размером ~advisoryPartitionSizeInBytes (64 МБ):
# 5 ГБ partition / 64 МБ = ~78 sub-partitions
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
Before AQE:
Partition 0: 50 МБ ← normal
Partition 1: 5 ГБ ← skewed (Москва)
Partition 2: 48 МБ ← normal
After AQE:
Partition 0: 50 МБ ← untouched
Partition 1a: 64 МБ ← split from Москва
Partition 1b: 64 МБ ← split from Москва
...
Partition 1x: 64 МБ ← split from Москва (78 sub-partitions)
Partition 2: 48 МБ ← untouched
Для join AQE реплицирует соответствующую партицию другой таблицы для каждого sub-partition. Это аналог salting, но полностью автоматический.
AQE vs ручной salting. AQE обрабатывает skew только для join, не для groupBy. Для skewed groupBy по-прежнему нужен ручной salting с двухпроходной агрегацией. Подробнее об AQE — в модуле M05 (AQE Deep-Dive).
Решение 3: Двухпроходная агрегация
Для skewed groupBy, когда salting неудобен, можно использовать двухпроходную агрегацию:
# Шаг 1: Partial aggregate с увеличенным числом партиций
# Это распределяет данные Москвы по нескольким tasks
partial = orders.repartition(1000, "city") \
.groupBy("city") \
.agg(count("*").alias("cnt"), sum("amount").alias("total"))
# Шаг 2: Final aggregate (малый объём данных)
result = partial.groupBy("city") \
.agg(sum("cnt").alias("total_count"), sum("total").alias("total_amount"))
Этот подход работает, но salting обычно эффективнее, потому что repartition(1000) не гарантирует равномерное распределение горячего ключа.
Explain план со skew join optimization
== Physical Plan ==
*(2) SortMergeJoin [city], [city], Inner
isSkew: true ← AQE marker
:- *(1) Sort [city ASC]
: +- Exchange hashpartitioning(city, 200)
: +- *(1) FileScan parquet [orders]
+- *(1) Sort [city ASC]
+- Exchange hashpartitioning(city, 200)
+- *(1) FileScan parquet [dim_cities]
Маркер isSkew: true в explain() показывает, что AQE обнаружил и обработал skew.
Как AQE разбивает перекошенные партиции в рантайме — на уровне исходников — в курсе Apache Spark Internals:
Spark Internals: AQE skew joinЧто дальше?
В последнем уроке модуля разберём кэширование и persistence — как cache() и persist() помогают избежать повторного вычисления, какие StorageLevel существуют, и когда кэширование вредит производительности.