Learning Platform
Глоссарий Troubleshooting
Урок 05.04 · 30 мин
Продвинутый
Push-Based ShuffleMagnetExternal Shuffle ServiceMergedShuffleFileManagerSPARK-30602

Push-based shuffle (Magnet): архитектура и production-конфиги

Классический Spark shuffle работает по pull-based модели: reduce tasks активно тянут данные с executor-ов, где они были записаны map tasks. Это создаёт два больших узких места на масштабе: тысячи мелких HTTP-запросов от каждого reduce task к каждому map executor, и жёсткую зависимость от того, живы ли executor-ы, написавшие данные.

Проект Magnet (SPARK-30602), разработанный в LinkedIn и вошедший в Spark 3.2, меняет парадигму: map tasks проталкивают (push) свои данные на External Shuffle Service ещё до начала reduce-фазы. Там блоки мержатся по partition, и reduce tasks читают уже крупные объединённые файлы вместо тысяч мелких.

Проблемы классического pull-based shuffle на масштабе

Чтобы понять ценность Magnet, нужно чётко сформулировать, что именно не работает при масштабировании до 10 000+ executor-ов.

Проблема 1: O(M * R) мелких чтений

При M map tasks и R reduce tasks каждый reduce task посылает M запросов — по одному на каждый map executor. При M = 5000 и R = 2000 это 10 миллионов отдельных запросов. Каждый запрос: TCP setup (или повторное использование соединения), поиск по index-файлу, чтение нескольких КБ. При мелких shuffle-блоках накладные расходы на установление соединения и метаданные доминируют над временем реальной передачи данных.

Проблема 2: executor lifecycle coupling

Shuffle-файлы живут на дисках executor-ов. Если executor умирает после завершения map tasks, но до того как все reduce tasks успели прочитать данные — DAGScheduler вынужден перезапустить и map stage (для восстановления файлов), и reduce stage. Это особенно болезненно на кластерах с spot/preemptible instance-ами.

Проблема 3: read amplification при spill

Если map task создал несколько spill-файлов (из-за нехватки памяти), reduce task получает фрагментированные блоки и вынужден читать их раздельно. Это усиливает I/O amplification.

Архитектура Magnet: три участника

Magnet добавляет новую роль: Merge Pusher на стороне map task и MergedShuffleFileManager на стороне External Shuffle Service (ESS).

Классический pull:
  map executor --[записал .data + .index]--> (ждёт)
  reduce task --[fetch каждый блок]--> map executor (по одному)

Push-based (Magnet):
  map executor --[push blocks]--> ESS (во время map-фазы)
  ESS --[merge по partition]--> merged files (параллельно с push)
  reduce task --[один запрос]--> ESS (читает merged block)
  reduce task --[fallback, если merge неполный]--> map executor

Три ключевых класса:

  • ShuffleBlockPusher — на стороне executor. После того как map task записал .data + .index, ShuffleBlockPusher читает блоки partition-за-partition и pushes их на ESS-узлы, выбранные driver-ом как mergerLocations.
  • MergedShuffleFileManager (интерфейс) / RemoteBlockPushResolver (реализация) — на стороне ESS. Принимает push-запросы, открывает AppShuffleMergeManager, мержит входящие байты в merged_shuffle_{shuffleId}_{shuffleMergeId}_{reduceId}.data файлы.
  • MergeStatuses — результат merge phase. Driver получает MergeStatuses от каждого ESS-узла и использует их при планировании reduce tasks: если для partition p есть merged блок на ESS-узле X, reduce task назначается по возможности на узел X (data locality).
Push-based shuffle: три фазы

Фаза 1: map tasks пишут локальные .data/.index и параллельно push'ат блоки на ESS. Фаза 2: ESS мержит. Фаза 3: reduce tasks читают merged блоки с ESS. Стрелки показывают направление движения данных.

ФАЗА 1: Map + Push (параллельно)map task пишет .data/.index ЛОКАЛЬНО, затем ShuffleBlockPusher push'ает блоки на ESSPush происходит в фоновых потоках executor-а (spark.shuffle.push.numPushThreads). Основной поток map task уже завершён.
Executor 0map task 0 done; pusher -> ESS_A: partition[0,1,2]; ESS_B: partition[3,4]mergerLocations выбираются driver-ом: случайный subset из ESS-узлов кластера, по одному ESS на partition.
Executor 1map task 1 done; pusher -> ESS_A: partition[0,1,2]; ESS_B: partition[3,4]Все map tasks push'ают одинаковые partition-блоки на те же ESS-узлы. ESS_A собирает partition 0 от всех map tasks.
Executor N...N map tasks
ФАЗА 2: ESS MergeRemoteBlockPushResolver: append-merge входящих байт в merged_shuffle файлы по partitionMerge не ждёт всех map tasks. Как только приходит блок от очередного map task, он append'ится в merged файл. Bitmap отслеживает, от каких map tasks уже получены блоки.
ФАЗА 3: Reduce Readreduce task -> getMapSizesByExecutorId -> предпочитает merged блок на ESS; fallback на оригинальные .data файлы если блок не смердженBlockStoreShuffleReader проверяет MergeStatus для каждой partition. Если merged блок доступен и размер > 0, читает его. Иначе fallback.

Merge протокол: как ESS мержит блоки

RemoteBlockPushResolver (реализация MergedShuffleFileManager) поддерживает для каждого (shuffleId, shuffleMergeId) набор AppShuffleMergePartitionsInfo. Для каждой partition это:

  • Открытый FileChannel в append-режиме на файл merged_shuffle_{shuffleId}_{shuffleMergeId}_{partition}.data
  • RoaringBitmap — bitmap map task-ов, блоки которых уже смержены (для отслеживания completeness)
  • Текущий offset в файле (для построения merged .index)

Когда приходит push-запрос (OpenDag + PushBlock):

1. Executor shouts: PUSH partition_block(shuffleId=0, mapId=42, reduceId=5, data=bytes)
2. ESS RemoteBlockPushResolver:
   a. Найти AppShuffleMergePartitionsInfo для (shuffleId=0, reduceId=5)
   b. Если bitmap уже содержит mapId=42 -- дубликат, отклонить
   c. Append bytes в merged data file
   d. bitmap.add(42); update offset
3. После finalizeShuffleMerge (сигнал от driver):
   a. Записать merged .index файл
   b. Вернуть MergeStatus(bitmap, sizes)

Роль bitmap для reduce task

Bitmap — это ключевой механизм для reduce task. Когда reduce task получает MergeStatus для partition p, bitmap говорит: «merged блок на ESS содержит данные от map tasks {0, 1, 3, 5, ...}». Если какие-то map tasks отсутствуют в bitmap (push не успел или упал), reduce task дочитывает недостающие блоки напрямую с executor-ов в обычном pull-режиме.

Это делает push-based shuffle best-effort: даже если push полностью провалится, корректность не нарушается — просто нет выигрыша по производительности.

Конфиги spark.shuffle.push.*

В Spark 3.2-4.0 push-based shuffle управляется следующими конфигами:

КонфигПо умолчаниюОписание
spark.shuffle.push.enabledfalseВключить push-based shuffle
spark.shuffle.push.finalize.timeout10sСколько ждать завершения push перед finalize
spark.shuffle.push.maxBlockSizeToPush1mБлоки крупнее этого не push’аются, читаются напрямую
spark.shuffle.push.maxBlockBatchSize3mМакс. суммарный размер одного batch push-запроса
spark.shuffle.push.numPushThreads(все ядра executor)Потоков для push в пуле ShuffleBlockPusher
spark.shuffle.push.minShuffleSizeToWait500mНиже этого размера finalize не ждёт
spark.shuffle.push.minCompletedPushRatio1.0Доля map tasks, завершивших push, до finalize
spark.shuffle.push.mergersMinThresholdRatio0.05Мин. отношение merger-узлов к shuffle partitions
spark.shuffle.push.mergersMinStaticThreshold5Мин. абсолютное число merger-узлов
spark.shuffle.push.server.mergedShuffleFileManagerImplorg.apache.spark.network.shuffle.NoOpMergedShuffleFileManagerРеализация MergedShuffleFileManager на ESS
WARNING

Для включения push-based shuffle недостаточно spark.shuffle.push.enabled = true. Обязательно:

  1. Запущен External Shuffle Service (YARN: spark.shuffle.service.enabled = true)
  2. ESS настроен с spark.shuffle.push.server.mergedShuffleFileManagerImpl = org.apache.spark.network.shuffle.RemoteBlockPushResolver
  3. Отключено IO-шифрование (spark.io.encryption.enabled = false — несовместимо с push)
  4. Сериализатор поддерживает relocation (UnsafeRowSerializer или Kryo с auto-reset)

Как настроить minCompletedPushRatio

spark.shuffle.push.minCompletedPushRatio — один из самых важных параметров. Он определяет: после какой доли завершённых push driver отправляет FinalizeShuffleMerge. Значение 1.0 (по умолчанию) означает «ждать, пока все map tasks завершат push». Это максимальная completeness, но может добавить latency.

В production с spot-инстансами разумно снизить до 0.8-0.9: reduce tasks, не получившие merged блок для некоторых map tasks, прозрачно fallback’ают на прямое чтение.

spark.conf.set("spark.shuffle.push.enabled", "true")
spark.conf.set("spark.shuffle.push.minCompletedPushRatio", "0.9")
spark.conf.set("spark.shuffle.push.finalize.timeout", "30s")
spark.conf.set("spark.shuffle.push.maxBlockSizeToPush", "4m")

Ограничения push-based shuffle

Push-based shuffle не является серебряной пулей. Важно понимать, когда он не поможет или навредит:

1. Только с YARN ESS. В Spark 4.0 push поддерживается только с YARN External Shuffle Service. Standalone и Kubernetes не поддерживаются (на момент Spark 4.0 — проверяйте release notes для вашей версии).

2. Нет выигрыша при большом числе мелких приложений. ESS должен обрабатывать push-запросы от всех приложений кластера. При высокой конкуренции ESS становится bottleneck.

3. Большие блоки не push’аются. Блоки размером более spark.shuffle.push.maxBlockSizeToPush (1 МБ по умолчанию) всегда читаются напрямую. Если shuffle partition большая (десятки МБ), push не поможет с такими блоками.

4. Дополнительная нагрузка на диски ESS. Merged файлы пишутся на диски ESS-узлов. На очень большом shuffle это может создать I/O bottleneck на ESS.

5. Push overhead при маленьком shuffle. При shuffle < spark.shuffle.push.minShuffleSizeToWait (500 МБ по умолчанию) push происходит, но finalize не ждёт — это снижает overhead для маленьких jobs.

Что смотреть в Spark UI

Для диагностики push-based shuffle в Spark UI добавлены новые метрики:

  • Stages -> Task Metrics -> “Shuffle Push Time”: время, потраченное на push блоков. Если большое — executor тратит много времени на push (возможно, ESS перегружен).
  • “Remote Merged Blocks Fetched” vs “Remote Blocks Fetched”: сколько merged блоков прочитано vs обычных. Высокая доля merged говорит о том, что push работает.
  • “Corrupt Merged Block Chunks”: если > 0 — проблемы с integrity на ESS, возможно нехватка диска или race condition.

В логах ESS ищите:

INFO RemoteBlockPushResolver: Finalize shuffle 0, mergeId 0:
  numPartitions=200, mergedBlocks=198, missingBlocks=2

missingBlocks=2 — это 2 партиции без merged блока (все reduce tasks для них пойдут в fallback режим).

Production: когда включать Magnet

На основе данных LinkedIn (5 PB/день, 30K+ приложений) и публичных бенчмарков:

  • Рекомендуется: широкие shuffle (много map tasks, много reduce tasks), YARN кластер с ESS, DataFrame-workloads, долгоживущие приложения с повторяющимися shuffle.
  • Не рекомендуется: Kubernetes-кластеры (нет поддержки), streaming приложения (shuffle pattern другой), shuffle с очень крупными partition-блоками (более 10 МБ — push не поможет).
  • Измеримый эффект: снижение end-to-end latency на 10-30% для широких shuffle, снижение числа сетевых соединений на порядок.

Попробуй сам

# Симуляция push-based shuffle (для реальной работы нужен YARN ESS)
# Ниже -- код для диагностики push на уже настроенном кластере

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
  .appName("push-shuffle-diagnostics")
  # Предполагаем, что ESS настроен с RemoteBlockPushResolver
  .config("spark.shuffle.push.enabled", "true")
  .config("spark.shuffle.push.minCompletedPushRatio", "0.8")
  .config("spark.shuffle.push.finalize.timeout", "30s")
  .config("spark.shuffle.push.maxBlockSizeToPush", "4m")
  .getOrCreate())

# Большой join -- хороший кандидат для push-based shuffle
orders = spark.read.parquet("/data/orders")         # 100 GB
customers = spark.read.parquet("/data/customers")   # 5 GB

result = (orders
  .join(customers, "customer_id")
  .groupBy("region", "product_category")
  .agg(F.sum("amount").alias("total"), F.count("*").alias("cnt")))

result.write.parquet("/output/regional_sales")

# После выполнения проверяем в UI:
# Stages -> join stage -> Task Metrics:
#   Remote Merged Blocks Fetched: должно быть близко к numReduceTasks * numMapTasks
#   Remote Blocks Fetched: должно быть маленьким (fallback only)
#   Shuffle Push Time: накладные расходы push

Для локального тестирования без ESS можно изучить MergedShuffleFileManager через unit-тесты Spark:

# Запустить тесты push-based shuffle (требует исходники Spark)
./build/mvn -pl core -Dtest=RemoteBlockPushResolverSuite test
External Shuffle Service: архитектура и ограничения
Проверка знанийKnowledge check
Кластер использует push-based shuffle с spark.shuffle.push.minCompletedPushRatio=1.0. Один из 500 map tasks завис (stuck) и никогда не завершает push. Что произойдёт с reduce-фазой?
ОтветAnswer
Driver ждёт завершения push от всех map tasks (ratio=1.0), но один task завис. Driver ждёт spark.shuffle.push.finalize.timeout (по умолчанию 10 секунд с момента завершения других push). По истечению timeout driver отправляет FinalizeShuffleMerge без завершённого push от stuck task. ESS строит MergeStatuses с bitmap'ами, где stuck mapId отсутствует. Reduce tasks при чтении видят, что merged блок не содержит данных от mapId hung task, и делают fallback -- читают блок напрямую с executor-а, где выполняется hung task. Если executor тоже завис -- DAGScheduler в итоге убьёт hung task по timeout и перезапустит его на другом executor, после чего reduce tasks дочитают недостающий блок. Корректность сохраняется, но reduce-фаза откладывается до завершения hung task.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Push-based shuffle (Magnet) помечен как best-effort. Что это означает для корректности результатов, если 20% map tasks не успели завершить push до finalize?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4