Learning Platform
Глоссарий Troubleshooting
Урок 04.05 · 16 мин
Продвинутый
Data SkewSaltingSkew JoinAQE Skew HandlingStraggler Tasks

Data Skew: диагностика и решения

Data skew — это неравномерное распределение данных по ключу. Один ключ содержит непропорционально много записей, и executor, обрабатывающий этот ключ, становится bottleneck всего job. 99 executors закончили за 30 секунд, один — работает 45 минут.

Визуализация skew

Partition Skew: До и после AQE
Москва
800K
Санкт-Петербург
80K
Казань
60K
Новосибирск
60K
Макс. партиция800K строк
Макс. время задачи42 сек
Skew factor10x
Skewed партиция -- обрабатывается в 10x дольше остальных

Реальный пример: Москва

Представьте таблицу заказов по городам России:

# Распределение данных
cities = spark.createDataFrame([
    ("Москва", 800000),
    ("Санкт-Петербург", 80000),
    ("Казань", 60000),
    ("Новосибирск", 60000),
], ["city", "order_count"])

При groupBy("city").agg(...) Spark распределяет данные по hash(city). Москва — 800K записей, остальные города — 60-80K. Партиция с Москвой в 10 раз больше.

# GroupBy по city с skew
orders.groupBy("city").agg(
    count("*").alias("total"),
    avg("amount").alias("avg_amount")
).show()
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1), avg(amount)])
+- Exchange hashpartitioning(city, 200)  ← Москва = 800K в одну партицию
   +- *(1) HashAggregate(keys=[city], functions=[partial_count(1), partial_avg(amount)])
      +- *(1) FileScan parquet [orders]

Диагностика skew через Spark UI

Симптомы в Spark UI

  1. Stages tab: одна stage выполняется значительно дольше остальных
  2. Tasks tab внутри stage: один task с duration 45 мин, остальные — 30 сек
  3. Summary Metrics: огромная разница между Min, Median и Max для:
    • Shuffle Read Size: median 50 МБ, max 5 ГБ
    • Duration: median 30 sec, max 2700 sec
    • GC Time: straggler task показывает высокий GC time

Анти-паттерн: игнорирование straggler tasks

Spark UI -> Stages -> Stage 2 -> Tasks

Задача | Duration | Shuffle Read | GC Time
-------|----------|-------------|--------
Task 0 |   28 sec |      48 МБ  |   1 sec
Task 1 |   31 sec |      52 МБ  |   1 sec
...
Task 47|   29 sec |      45 МБ  |   1 sec
Task 48| 42 мин   |    4.8 ГБ   |  3 мин  ← STRAGGLER!

Если вы видите одну задачу с duration в 50-100 раз больше median — это data skew. Не спекулятивное выполнение, не медленный node, а неравномерные данные.

Решение 1: Salting

Salting разбивает горячий ключ на N под-ключей, распределяя нагрузку:

from pyspark.sql.functions import col, concat, lit, floor, rand

# Шаг 1: Добавляем salt (0-3) к ключу
SALT_BUCKETS = 4
orders_salted = orders.withColumn(
    "salted_city",
    concat(col("city"), lit("_"), floor(rand() * SALT_BUCKETS).cast("int"))
)

# Было: city = "Москва" (800K записей)
# Стало: salted_city = "Москва_0" (~200K), "Москва_1" (~200K),
#         "Москва_2" (~200K), "Москва_3" (~200K)

Salting для GroupBy

# Шаг 2: Первая агрегация по salted key
partial = orders_salted.groupBy("salted_city").agg(
    count("*").alias("partial_count"),
    sum("amount").alias("partial_sum")
)

# Шаг 3: Убираем salt и финальная агрегация
from pyspark.sql.functions import split

result = partial.withColumn(
    "city", split(col("salted_city"), "_")[0]
).groupBy("city").agg(
    sum("partial_count").alias("total"),
    sum("partial_sum").alias("total_amount")
)

Теперь вместо одной партиции с 800K записей Москвы — 4 партиции по ~200K. Двухпроходная агрегация: partial aggregate с salt, затем final aggregate без salt.

Salting для Join

# Таблица фактов (skewed)
orders_salted = orders.withColumn(
    "salt", floor(rand() * SALT_BUCKETS).cast("int")
)

# Dimension таблица: реплицируем для каждого salt value
from pyspark.sql.functions import explode, array, lit

cities_replicated = cities.withColumn(
    "salt", explode(array([lit(i) for i in range(SALT_BUCKETS)]))
)

# Join по составному ключу
result = orders_salted.join(
    cities_replicated,
    (orders_salted.city == cities_replicated.city) &
    (orders_salted.salt == cities_replicated.salt)
)

Dimension таблица реплицируется SALT_BUCKETS раз (4 копии), но каждая partition обрабатывает ~1/4 данных Москвы. Компромисс: больше данных dimension table в обмен на равномерную нагрузку.

TIP

Выбор числа salt buckets. Число buckets зависит от степени skew. Если горячий ключ в 10 раз больше среднего — 4-8 buckets. Если в 100 раз — 16-32 buckets. Формула: buckets = ceil(skewed_key_size / median_key_size). Слишком много buckets увеличивает overhead dimension replication.

Решение 2: AQE Skew Join Optimization

Adaptive Query Execution (AQE) автоматически обнаруживает и обрабатывает skew в runtime:

# AQE включён по умолчанию с Spark 3.2
spark.conf.get("spark.sql.adaptive.enabled")  # 'true'
spark.conf.get("spark.sql.adaptive.skewJoin.enabled")  # 'true'

Как AQE обнаруживает skew

AQE считает партицию skewed, если выполняются оба условия:

  1. Размер партиции > skewedPartitionFactor (5.0) * median размер всех партиций
  2. Размер партиции > skewedPartitionThresholdInBytes (256 МБ)
# Конфигурация AQE skew handling
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5.0")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

Пример: median partition = 50 МБ, Москва partition = 5 ГБ:

  • 5 ГБ > 5.0 * 50 МБ = 250 МБ — условие 1 выполняется
  • 5 ГБ > 256 МБ — условие 2 выполняется
  • AQE автоматически разбивает эту партицию

Что делает AQE с skewed partition

AQE разбивает skewed partition на sub-partitions размером ~advisoryPartitionSizeInBytes (64 МБ):

# 5 ГБ partition / 64 МБ = ~78 sub-partitions
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
Before AQE:
Partition 0: 50 МБ  ← normal
Partition 1: 5 ГБ   ← skewed (Москва)
Partition 2: 48 МБ  ← normal

After AQE:
Partition 0: 50 МБ    ← untouched
Partition 1a: 64 МБ   ← split from Москва
Partition 1b: 64 МБ   ← split from Москва
...
Partition 1x: 64 МБ   ← split from Москва (78 sub-partitions)
Partition 2: 48 МБ     ← untouched

Для join AQE реплицирует соответствующую партицию другой таблицы для каждого sub-partition. Это аналог salting, но полностью автоматический.

TIP

AQE vs ручной salting. AQE обрабатывает skew только для join, не для groupBy. Для skewed groupBy по-прежнему нужен ручной salting с двухпроходной агрегацией. Подробнее об AQE — в модуле M05 (AQE Deep-Dive).

Решение 3: Двухпроходная агрегация

Для skewed groupBy, когда salting неудобен, можно использовать двухпроходную агрегацию:

# Шаг 1: Partial aggregate с увеличенным числом партиций
# Это распределяет данные Москвы по нескольким tasks
partial = orders.repartition(1000, "city") \
    .groupBy("city") \
    .agg(count("*").alias("cnt"), sum("amount").alias("total"))

# Шаг 2: Final aggregate (малый объём данных)
result = partial.groupBy("city") \
    .agg(sum("cnt").alias("total_count"), sum("total").alias("total_amount"))

Этот подход работает, но salting обычно эффективнее, потому что repartition(1000) не гарантирует равномерное распределение горячего ключа.

Explain план со skew join optimization

== Physical Plan ==
*(2) SortMergeJoin [city], [city], Inner
   isSkew: true                                    ← AQE marker
:- *(1) Sort [city ASC]
:  +- Exchange hashpartitioning(city, 200)
:     +- *(1) FileScan parquet [orders]
+- *(1) Sort [city ASC]
   +- Exchange hashpartitioning(city, 200)
      +- *(1) FileScan parquet [dim_cities]

Маркер isSkew: true в explain() показывает, что AQE обнаружил и обработал skew.

Проверка знанийKnowledge check
В Spark UI вы видите, что 199 tasks завершились за 30 секунд, а 1 task выполняется 40 минут. Как вы определите, что это data skew, а не медленный node?
ОтветAnswer
Проверьте Shuffle Read Size в Summary Metrics: если median ~50 МБ, а max ~5 ГБ -- это data skew (один ключ доминирует). Для медленного node: (1) Shuffle Read Size будет одинаковым у всех tasks, но duration стагнера будет больше; (2) executor на том же node будет медленным для всех задач. Ключевое отличие: при data skew straggler task обрабатывает в 100 раз больше данных; при медленном node -- столько же данных, но дольше. Также проверьте GC Time: при skew straggler показывает высокий GC из-за большого объёма данных.
Проверка знанийKnowledge check
AQE обнаруживает partition с 5 ГБ при median 50 МБ. Какие два условия проверяет AQE и что он делает?
ОтветAnswer
AQE проверяет два условия: (1) размер партиции > skewedPartitionFactor (5.0) * median всех партиций (5 ГБ > 5 * 50 МБ = 250 МБ -- да); (2) размер партиции > skewedPartitionThresholdInBytes (256 МБ) (5 ГБ > 256 МБ -- да). Оба выполняются, поэтому AQE разбивает партицию на sub-partitions размером ~advisoryPartitionSizeInBytes (64 МБ). 5 ГБ / 64 МБ = ~78 sub-partitions. Для join AQE реплицирует соответствующую партицию другой таблицы для каждого sub-partition.

Как AQE разбивает перекошенные партиции в рантайме — на уровне исходников — в курсе Apache Spark Internals:

Spark Internals: AQE skew join

Что дальше?

В последнем уроке модуля разберём кэширование и persistence — как cache() и persist() помогают избежать повторного вычисления, какие StorageLevel существуют, и когда кэширование вредит производительности.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. В Spark UI вы видите: 199 tasks завершились за 30 секунд, 1 task выполняется 40 минут. Shuffle Read median = 50 МБ, max = 5 ГБ. Что это?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6