Диагностика skew и shuffle
В предыдущем уроке мы зафиксировали симптомы: Stage 3 с outlier task в 62 минуты, 15 GB disk spill, нарастающая деградация после рекламных кампаний. Теперь разбираем механизм: что именно происходит внутри shuffle и почему AQE не устранил проблему.
Обработка data skew: практические подходыЧто происходит внутри shuffle при skew
Когда Spark выполняет events.join(users, "user_id"), DAG Scheduler создаёт shuffle: оба DataFrame партиционируются по user_id через HashPartitioning. Физически это означает, что каждая строка с user_id = X попадает в партицию hash(X) % numPartitions.
Проблема начинается, если распределение user_id неравномерно. В нашем случае данные событий за 12 мая включали результаты крупной рекламной кампании с одним celebrity-endorser — тысячи новых пользователей зарегистрировались под одним campaign_id, и значительная часть событий принадлежала горстке «вирусных» user_id.
# Гипотетическое распределение user_id в событиях
user_id_123456: 45_000_000 строк (5.6% всех событий)
user_id_789012: 31_000_000 строк (3.9%)
user_id_345678: 18_000_000 строк (2.2%)
остальные ~25M: 706_000_000 строк (88.3%) равномерно
После hash(user_id_123456) % 200 = partition_73 — всё 5.6% данных в одной партиции из 200. Эта партиция будет в ~112 раз больше медианы. Task, обрабатывающий partition_73, получит 6.7 GB строк вместо медианных 60 MB.
Как читать shuffle metrics в Spark UI
Spark UI предоставляет три уровня детализации для диагностики skew.
Уровень 1 — Stage summary. На странице Stage детали видим агрегированные метрики:
Stage 3: ShuffledHashJoin
Duration: 1h 2min
Input Size: 120.0 GB
Shuffle Read Size: 120.0 GB
Shuffle Write Size: 0 B (это join, не shuffle write stage)
Shuffle spill (memory): 42.0 GB
Shuffle spill (disk): 15.0 GB
GC Time: 8.3 min
Shuffle spill (memory) — сколько данных прошло через in-memory sort/hash buffer перед записью на диск. Shuffle spill (disk) — финальный объём на диске. Соотношение 42:15 означает, что данные сжимались при сбросе, но масштаб катастрофический.
Уровень 2 — Task metrics distribution. Кнопка «Show Additional Metrics» в Spark UI открывает гистограмму task metrics:
Task Duration (ms):
5th percentile: 210,000 ms (3.5 мин)
25th percentile: 220,000 ms (3.7 мин)
Median: 238,000 ms (4.0 мин)
75th percentile: 245,000 ms (4.1 мин)
Max: 3,720,000 ms (62.0 мин) <-- outlier
Shuffle Read Size/Records:
Median: 61.2 MB / 512K records
Max: 6.7 GB / 45M records <-- тот же outlier task
Видно, что 75th percentile почти совпадает с медианой — это равномерное распределение. Outlier один, и он совершенно отдельно. Это не «вся data skewed», это один конкретный хотспот.
Уровень 3 — Individual task. Кликаем на outlier task в списке:
Task ID: 45 Status: SUCCESS
Duration: 62 min 0 s
GC Time: 14 min 32 s <- 23% времени в GC!
Peak Execution Memory: 47.2 GB
Shuffle Read Metrics:
Remote Bytes Read: 6.72 GB
Remote Records Read: 45,312,441
Spill Metrics:
Memory Spill: 42.1 GB
Disk Spill: 15.0 GB
Peak Execution Memory: 47.2 GB при spark.executor.memory=96g — это 49% всей heap одного executor. GC Time: 14 мин — G1GC под давлением. Всё сходится: task получил огромную partition, построил hash table в execution memory, которая начала занимать пространство у storage pool, затем G1GC начал агрессивно собирать мусор.
AQE Skew Join Optimization: механизм и почему он не сработал
AQE skew join optimization (SPARK-29544) — это механизм обнаружения и устранения skew во время выполнения. Разберём его точный механизм через исходный код.
При завершении map phase AQE имеет полную картину shuffle partition sizes через MapOutputStatistics. Класс SkewedJoinOptimization (в пакете org.apache.spark.sql.execution.adaptive) вычисляет, является ли партиция skewed:
// Упрощённая логика из SkewedJoinOptimization.scala
def isSkewed(partitionSize: Long,
medianPartitionSize: Long,
totalPartitionSize: Long): Boolean = {
partitionSize > skewedPartitionFactor * medianPartitionSize &&
partitionSize > skewedPartitionThresholdInBytes
}
// spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5 (default)
// spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB (default)
Если partition_73 = 6.7 GB и медиана = 61 MB: 6700 > 5 * 61 = 305 MB — TRUE. 6700 > 256 MB — TRUE. AQE должен обнаружить skew.
Обнаружив skewed partition, AQE разбивает её на подпартиции размером spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB) и обрабатывает каждую подпартицию как отдельный task. Partition_73 размером 6.7 GB разбилась бы на ~105 подзадач по 64 MB каждая.
Почему это не сработало в нашем случае? Смотрим на Physical Plan ещё раз:
ShuffledHashJoin [user_id#78], [user_id#112], LeftOuter, BuildRight
Ключевое слово — BuildRight. AQE выбрал ShuffledHashJoin с build side = правая таблица (users). Skew join optimization работает для SortMergeJoin, но не для ShuffledHashJoin. Это ограничение AQE 4.0: SkewedJoinOptimization применяется только к SortMergeJoin.
// Из OptimizeSkewedJoin.scala
object OptimizeSkewedJoin extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case j: SortMergeJoinExec if j.isSkewJoin => ...
// ShuffledHashJoinExec здесь нет
}
}
Причина, по которой AQE переключился на ShuffledHashJoin: spark.sql.adaptive.localShuffleReader.enabled=true и размер build side (users после filter = 4 GB) попал под порог spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold. AQE принял решение перейти на hash join, но это решение деактивировало skew optimization.
Дополнительный диагностический сигнал: Stage 5
Stage 5 (groupBy aggregation) показывает аналогичный паттерн: outlier task 64 минуты, shuffle write 85 GB. Почему агрегация тоже skewed?
groupBy("country", "campaign_id", "event_type") использует ту же HashPartitioning. Если в входных данных Stage 5 есть горячие комбинации (country=RU + campaign_id=X + event_type=click от того же вирусного события), они снова попадают в одну partition при hash("RU", campaign_id_X, "click") % 200.
Это важный insight: skew распространяется через pipeline. Исправить только join недостаточно — нужно также анализировать aggregation keys.
Проверяем гипотезу через statistics:
# Проверка skew в aggregation keys
enriched.groupBy("country", "campaign_id", "event_type") \
.count() \
.orderBy(col("count").desc()) \
.show(20)
# Результат (гипотетический):
# country campaign_id event_type count
# RU camp_9873 click 41_234_567 <- 5.2% всех строк в одной группе
# RU camp_9873 impression 18_432_091
# DE camp_9873 click 9_234_567
# ...
Это подтверждает: campaign_9873 (та самая вирусная кампания) генерирует непропорциональный объём событий. Один (country, campaign_id, event_type) tuple содержит 5.2% всех строк — при 200 partitions это означает одну partition размером ~4.4 GB.
Что shuffle spill говорит о UnsafeExternalSorter
Spill 15 GB на диск — это не просто медленный I/O. Это цепочка событий внутри движка, которую важно понимать.
При построении hash table в ShuffledHashJoinExec.buildHashedRelation() Spark аллоцирует память через TaskMemoryManager. Когда execution pool исчерпывается, TaskMemoryManager вызывает spill() у registered MemoryConsumer. Для hash join это LongToUnsafeRowMap или UnsafeHashedRelation.
// Из TaskMemoryManager.java (упрощённо)
long acquired = executionMemoryPool.acquireMemory(required, taskAttemptId, ...);
if (acquired < required) {
// Принудительный spill у другого consumer
for (MemoryConsumer consumer : consumers) {
consumer.spill(required - acquired, this);
}
}
UnsafeExternalSorter.spill() сериализует текущий in-memory buffer в UnsafeInMemorySorter, сортирует его и пишет в DiskBlockManager. Файлы создаются в spark.local.dir (по умолчанию /tmp). Сброс 42 GB через /tmp — это: (a) I/O на локальный диск; (b) последующее чтение при merge; (c) GC накладных расходов на управление buffers.
В нашем случае executor r6g.4xlarge имеет instance storage (NVMe SSD), поэтому disk spill не катастрофически медленный — но 15 GB записи + чтения всё равно добавляет ~8-10 минут к task duration.
Как отличить skew от других причин медленного task
Data skew — не единственная причина task outlier. Важно уметь отличить её от альтернатив:
Симптом: один task сильно медленнее остальных
Skew:
- Shuffle Read Size для outlier task >> медианы
- Peak Execution Memory >> медианы
- Возможен spill
Straggler (медленный executor/узел):
- Task Duration >> медианы
- НО Shuffle Read Size ≈ медиана
- Другие tasks на том же executor тоже медленнее
- Смотрим: Executors tab -> executor-7 все tasks медленные?
Data quality (null/bad values):
- Task Duration >> медианы
- Shuffle Read Size ≈ медиана
- Но task обрабатывает много null-значений в GROUP BY ключе
- null партиционируется в partition_0 (hash(null) % N = 0)
Network bottleneck:
- Все tasks медленнее, не один outlier
- Fetch Wait Time высокое у многих tasks
- Смотрим: Stage metrics -> Shuffle Read: Remote Blocks Fetched vs Local
В нашем случае: outlier task имеет Shuffle Read Size = 6.7 GB при медиане 61 MB — это однозначно skew, не straggler и не network.
Попробуй сам
# 1. Проверить, включён ли AQE и skew join
spark.conf.get("spark.sql.adaptive.enabled")
spark.conf.get("spark.sql.adaptive.skewJoin.enabled")
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
# 2. Принудить AQE к SortMergeJoin вместо ShuffledHashJoin
# (временно для диагностики — проверить, применится ли skew optimization)
spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b")
# 3. Посмотреть, обнаружил ли AQE skew в плане
result.explain("formatted")
# Ищем: "OptimizeSkewedJoin" и "skewedPartitions" в выводе
# 4. Анализ распределения join key
events.groupBy("user_id") \
.count() \
.selectExpr(
"percentile_approx(count, 0.5) as median",
"percentile_approx(count, 0.99) as p99",
"max(count) as max_count"
).show()
# Если max / median > 100 -- серьёзный skew
# 5. Оценить размер skewed partitions
from pyspark.sql.functions import spark_partition_id
events_repartitioned = events.repartition(200, "user_id")
events_repartitioned.groupBy(spark_partition_id()) \
.count() \
.orderBy("count", ascending=False) \
.show(10)
Итоги
Данные диагностики дают чёткую картину: user_id с экстремально высокой частотой создаёт partition_73 размером 6.7 GB. AQE не применил skew join optimization потому, что заменил SortMergeJoin на ShuffledHashJoin — логичное решение само по себе, но деактивирующее skew protection. Stage 5 добавляет аналогичную проблему в aggregation keys. В следующем уроке разберём вторую составляющую: почему OOM, что происходило в memory layers, и как это связано с решением AQE выбрать hash join.