Learning Platform
Глоссарий Troubleshooting
Урок 16.02 · 35 мин
Продвинутый
Data SkewShuffle InternalsAQESpark UIДиагностика

Диагностика skew и shuffle

В предыдущем уроке мы зафиксировали симптомы: Stage 3 с outlier task в 62 минуты, 15 GB disk spill, нарастающая деградация после рекламных кампаний. Теперь разбираем механизм: что именно происходит внутри shuffle и почему AQE не устранил проблему.

Обработка data skew: практические подходы

Что происходит внутри shuffle при skew

Когда Spark выполняет events.join(users, "user_id"), DAG Scheduler создаёт shuffle: оба DataFrame партиционируются по user_id через HashPartitioning. Физически это означает, что каждая строка с user_id = X попадает в партицию hash(X) % numPartitions.

Проблема начинается, если распределение user_id неравномерно. В нашем случае данные событий за 12 мая включали результаты крупной рекламной кампании с одним celebrity-endorser — тысячи новых пользователей зарегистрировались под одним campaign_id, и значительная часть событий принадлежала горстке «вирусных» user_id.

# Гипотетическое распределение user_id в событиях
user_id_123456:  45_000_000 строк  (5.6% всех событий)
user_id_789012:  31_000_000 строк  (3.9%)
user_id_345678:  18_000_000 строк  (2.2%)
остальные ~25M:  706_000_000 строк (88.3%) равномерно

После hash(user_id_123456) % 200 = partition_73 — всё 5.6% данных в одной партиции из 200. Эта партиция будет в ~112 раз больше медианы. Task, обрабатывающий partition_73, получит 6.7 GB строк вместо медианных 60 MB.

Shuffle skew: распределение данных по партициям
Медианная партиция~60 MB200 партиций равномерно делят 12 GB (после filter). Нормальный task обрабатывает 60 MB за 4 мин.
Skewed partition_73~6.7 GBuser_id_123456 попадает в одну партицию. Размер в 112 раз больше медианы. Task работает 62 мин вместо 4 мин.
Spill partition_7315 GB diskHash table для 6.7 GB events + соответствующих users не влезает в execution memory. UnsafeExternalSorter сбрасывает на диск через DiskBlockManager.

Как читать shuffle metrics в Spark UI

Spark UI предоставляет три уровня детализации для диагностики skew.

Уровень 1 — Stage summary. На странице Stage детали видим агрегированные метрики:

Stage 3: ShuffledHashJoin
  Duration: 1h 2min
  Input Size: 120.0 GB
  Shuffle Read Size: 120.0 GB
  Shuffle Write Size: 0 B     (это join, не shuffle write stage)
  Shuffle spill (memory): 42.0 GB
  Shuffle spill (disk): 15.0 GB
  GC Time: 8.3 min

Shuffle spill (memory) — сколько данных прошло через in-memory sort/hash buffer перед записью на диск. Shuffle spill (disk) — финальный объём на диске. Соотношение 42:15 означает, что данные сжимались при сбросе, но масштаб катастрофический.

Уровень 2 — Task metrics distribution. Кнопка «Show Additional Metrics» в Spark UI открывает гистограмму task metrics:

Task Duration (ms):
  5th percentile:   210,000  ms  (3.5 мин)
  25th percentile:  220,000  ms  (3.7 мин)
  Median:           238,000  ms  (4.0 мин)
  75th percentile:  245,000  ms  (4.1 мин)
  Max:            3,720,000  ms  (62.0 мин)   <-- outlier

Shuffle Read Size/Records:
  Median:    61.2 MB / 512K records
  Max:       6.7 GB / 45M records             <-- тот же outlier task

Видно, что 75th percentile почти совпадает с медианой — это равномерное распределение. Outlier один, и он совершенно отдельно. Это не «вся data skewed», это один конкретный хотспот.

Уровень 3 — Individual task. Кликаем на outlier task в списке:

Task ID: 45  Status: SUCCESS
  Duration: 62 min 0 s
  GC Time: 14 min 32 s       <- 23% времени в GC!
  Peak Execution Memory: 47.2 GB
  Shuffle Read Metrics:
    Remote Bytes Read: 6.72 GB
    Remote Records Read: 45,312,441
  Spill Metrics:
    Memory Spill: 42.1 GB
    Disk Spill: 15.0 GB

Peak Execution Memory: 47.2 GB при spark.executor.memory=96g — это 49% всей heap одного executor. GC Time: 14 мин — G1GC под давлением. Всё сходится: task получил огромную partition, построил hash table в execution memory, которая начала занимать пространство у storage pool, затем G1GC начал агрессивно собирать мусор.


AQE Skew Join Optimization: механизм и почему он не сработал

AQE skew join optimization (SPARK-29544) — это механизм обнаружения и устранения skew во время выполнения. Разберём его точный механизм через исходный код.

При завершении map phase AQE имеет полную картину shuffle partition sizes через MapOutputStatistics. Класс SkewedJoinOptimization (в пакете org.apache.spark.sql.execution.adaptive) вычисляет, является ли партиция skewed:

// Упрощённая логика из SkewedJoinOptimization.scala
def isSkewed(partitionSize: Long, 
             medianPartitionSize: Long,
             totalPartitionSize: Long): Boolean = {
  partitionSize > skewedPartitionFactor * medianPartitionSize &&
  partitionSize > skewedPartitionThresholdInBytes
}
// spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5 (default)
// spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB (default)

Если partition_73 = 6.7 GB и медиана = 61 MB: 6700 > 5 * 61 = 305 MB — TRUE. 6700 > 256 MB — TRUE. AQE должен обнаружить skew.

Обнаружив skewed partition, AQE разбивает её на подпартиции размером spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB) и обрабатывает каждую подпартицию как отдельный task. Partition_73 размером 6.7 GB разбилась бы на ~105 подзадач по 64 MB каждая.

Почему это не сработало в нашем случае? Смотрим на Physical Plan ещё раз:

ShuffledHashJoin [user_id#78], [user_id#112], LeftOuter, BuildRight

Ключевое слово — BuildRight. AQE выбрал ShuffledHashJoin с build side = правая таблица (users). Skew join optimization работает для SortMergeJoin, но не для ShuffledHashJoin. Это ограничение AQE 4.0: SkewedJoinOptimization применяется только к SortMergeJoin.

// Из OptimizeSkewedJoin.scala
object OptimizeSkewedJoin extends Rule[SparkPlan] {
  def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
    case j: SortMergeJoinExec if j.isSkewJoin => ...
    // ShuffledHashJoinExec здесь нет
  }
}

Причина, по которой AQE переключился на ShuffledHashJoin: spark.sql.adaptive.localShuffleReader.enabled=true и размер build side (users после filter = 4 GB) попал под порог spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold. AQE принял решение перейти на hash join, но это решение деактивировало skew optimization.

AQE decision flow: SortMergeJoin vs ShuffledHashJoin
MapOutputStatistics readyВходные данные: events 120 GB, users 4 GB. AQE AdaptiveSparkPlanExec готовится к re-planning при завершении map stage.
AQE re-plan
Выбор ShuffledHashJoinOptimizeShuffleWithLocalRead: если build side < maxShuffledHashJoinLocalMapThreshold, Spark предпочитает ShuffledHashJoin для избежания sort overhead. users 4 GB попал под этот порог.
OptimizeSkewedJoin: skipOptimizeSkewedJoin: ищет SortMergeJoinExec в плане. Находит ShuffledHashJoinExec — пропускает. Skew optimization не применяется.
результат
Outlier task 62 минPartition_73 = 6.7 GB идёт в один task ShuffledHashJoin. Build hash table в execution memory: 47 GB peak. GC: 14 мин. Итог: 62 мин один task.

Дополнительный диагностический сигнал: Stage 5

Stage 5 (groupBy aggregation) показывает аналогичный паттерн: outlier task 64 минуты, shuffle write 85 GB. Почему агрегация тоже skewed?

groupBy("country", "campaign_id", "event_type") использует ту же HashPartitioning. Если в входных данных Stage 5 есть горячие комбинации (country=RU + campaign_id=X + event_type=click от того же вирусного события), они снова попадают в одну partition при hash("RU", campaign_id_X, "click") % 200.

Это важный insight: skew распространяется через pipeline. Исправить только join недостаточно — нужно также анализировать aggregation keys.

Проверяем гипотезу через statistics:

# Проверка skew в aggregation keys
enriched.groupBy("country", "campaign_id", "event_type") \
        .count() \
        .orderBy(col("count").desc()) \
        .show(20)

# Результат (гипотетический):
# country  campaign_id  event_type  count
# RU       camp_9873    click       41_234_567   <- 5.2% всех строк в одной группе
# RU       camp_9873    impression  18_432_091
# DE       camp_9873    click       9_234_567
# ...

Это подтверждает: campaign_9873 (та самая вирусная кампания) генерирует непропорциональный объём событий. Один (country, campaign_id, event_type) tuple содержит 5.2% всех строк — при 200 partitions это означает одну partition размером ~4.4 GB.


Что shuffle spill говорит о UnsafeExternalSorter

Spill 15 GB на диск — это не просто медленный I/O. Это цепочка событий внутри движка, которую важно понимать.

При построении hash table в ShuffledHashJoinExec.buildHashedRelation() Spark аллоцирует память через TaskMemoryManager. Когда execution pool исчерпывается, TaskMemoryManager вызывает spill() у registered MemoryConsumer. Для hash join это LongToUnsafeRowMap или UnsafeHashedRelation.

// Из TaskMemoryManager.java (упрощённо)
long acquired = executionMemoryPool.acquireMemory(required, taskAttemptId, ...);
if (acquired < required) {
  // Принудительный spill у другого consumer
  for (MemoryConsumer consumer : consumers) {
    consumer.spill(required - acquired, this);
  }
}

UnsafeExternalSorter.spill() сериализует текущий in-memory buffer в UnsafeInMemorySorter, сортирует его и пишет в DiskBlockManager. Файлы создаются в spark.local.dir (по умолчанию /tmp). Сброс 42 GB через /tmp — это: (a) I/O на локальный диск; (b) последующее чтение при merge; (c) GC накладных расходов на управление buffers.

В нашем случае executor r6g.4xlarge имеет instance storage (NVMe SSD), поэтому disk spill не катастрофически медленный — но 15 GB записи + чтения всё равно добавляет ~8-10 минут к task duration.


Как отличить skew от других причин медленного task

Data skew — не единственная причина task outlier. Важно уметь отличить её от альтернатив:

Симптом: один task сильно медленнее остальных

Skew:
  - Shuffle Read Size для outlier task >> медианы
  - Peak Execution Memory >> медианы
  - Возможен spill

Straggler (медленный executor/узел):
  - Task Duration >> медианы
  - НО Shuffle Read Size ≈ медиана
  - Другие tasks на том же executor тоже медленнее
  - Смотрим: Executors tab -> executor-7 все tasks медленные?

Data quality (null/bad values):
  - Task Duration >> медианы
  - Shuffle Read Size ≈ медиана
  - Но task обрабатывает много null-значений в GROUP BY ключе
  - null партиционируется в partition_0 (hash(null) % N = 0)

Network bottleneck:
  - Все tasks медленнее, не один outlier
  - Fetch Wait Time высокое у многих tasks
  - Смотрим: Stage metrics -> Shuffle Read: Remote Blocks Fetched vs Local

В нашем случае: outlier task имеет Shuffle Read Size = 6.7 GB при медиане 61 MB — это однозначно skew, не straggler и не network.


Попробуй сам

# 1. Проверить, включён ли AQE и skew join
spark.conf.get("spark.sql.adaptive.enabled")
spark.conf.get("spark.sql.adaptive.skewJoin.enabled")
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")

# 2. Принудить AQE к SortMergeJoin вместо ShuffledHashJoin
# (временно для диагностики — проверить, применится ли skew optimization)
spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b")

# 3. Посмотреть, обнаружил ли AQE skew в плане
result.explain("formatted")
# Ищем: "OptimizeSkewedJoin" и "skewedPartitions" в выводе

# 4. Анализ распределения join key
events.groupBy("user_id") \
      .count() \
      .selectExpr(
          "percentile_approx(count, 0.5) as median",
          "percentile_approx(count, 0.99) as p99",
          "max(count) as max_count"
      ).show()
# Если max / median > 100 -- серьёзный skew

# 5. Оценить размер skewed partitions
from pyspark.sql.functions import spark_partition_id
events_repartitioned = events.repartition(200, "user_id")
events_repartitioned.groupBy(spark_partition_id()) \
                    .count() \
                    .orderBy("count", ascending=False) \
                    .show(10)

Итоги

Данные диагностики дают чёткую картину: user_id с экстремально высокой частотой создаёт partition_73 размером 6.7 GB. AQE не применил skew join optimization потому, что заменил SortMergeJoin на ShuffledHashJoin — логичное решение само по себе, но деактивирующее skew protection. Stage 5 добавляет аналогичную проблему в aggregation keys. В следующем уроке разберём вторую составляющую: почему OOM, что происходило в memory layers, и как это связано с решением AQE выбрать hash join.

Проверка знанийKnowledge check
AQE включён, spark.sql.adaptive.skewJoin.enabled=true, skewedPartitionFactor=5. Stage 3 имеет medianPartitionSize=61 MB, partition_73=6.7 GB. Условие isSkewed() выполнено. Почему OptimizeSkewedJoin не применился, и как это исправить конфигурацией?
ОтветAnswer
OptimizeSkewedJoin.apply() ищет в плане узлы типа SortMergeJoinExec. Но AQE до этого применил OptimizeShuffleWithLocalRead и заменил SortMergeJoin на ShuffledHashJoin (потому что build side users = 4 GB попал под spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold). В плане нет SortMergeJoinExec — skew optimization не применяется. Исправление: установить spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=0b (или очень маленькое значение), чтобы запретить замену на ShuffledHashJoin. Тогда AQE сохранит SortMergeJoin и сможет применить OptimizeSkewedJoin — разобьёт partition_73 на ~105 подпартиций по 64 MB. Альтернатива: spark.sql.adaptive.forceApplySkewedJoinOptimization=true (появился в Spark 4.0 SPARK-45567) — применяет skew optimization даже при ShuffledHashJoin.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. AQE включён, skewJoin.enabled=true. Partition_73 = 6.7 GB, медиана = 61 MB. Условие isSkewed() выполнено. Но OptimizeSkewedJoin не применился, и в плане остаётся ShuffledHashJoinExec. Какое именно правило AQE вызвало эту ситуацию?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5