Push-based shuffle (Magnet): архитектура и production-конфиги
Классический Spark shuffle работает по pull-based модели: reduce tasks активно тянут данные с executor-ов, где они были записаны map tasks. Это создаёт два больших узких места на масштабе: тысячи мелких HTTP-запросов от каждого reduce task к каждому map executor, и жёсткую зависимость от того, живы ли executor-ы, написавшие данные.
Проект Magnet (SPARK-30602), разработанный в LinkedIn и вошедший в Spark 3.2, меняет парадигму: map tasks проталкивают (push) свои данные на External Shuffle Service ещё до начала reduce-фазы. Там блоки мержатся по partition, и reduce tasks читают уже крупные объединённые файлы вместо тысяч мелких.
Проблемы классического pull-based shuffle на масштабе
Чтобы понять ценность Magnet, нужно чётко сформулировать, что именно не работает при масштабировании до 10 000+ executor-ов.
Проблема 1: O(M * R) мелких чтений
При M map tasks и R reduce tasks каждый reduce task посылает M запросов — по одному на каждый map executor. При M = 5000 и R = 2000 это 10 миллионов отдельных запросов. Каждый запрос: TCP setup (или повторное использование соединения), поиск по index-файлу, чтение нескольких КБ. При мелких shuffle-блоках накладные расходы на установление соединения и метаданные доминируют над временем реальной передачи данных.
Проблема 2: executor lifecycle coupling
Shuffle-файлы живут на дисках executor-ов. Если executor умирает после завершения map tasks, но до того как все reduce tasks успели прочитать данные — DAGScheduler вынужден перезапустить и map stage (для восстановления файлов), и reduce stage. Это особенно болезненно на кластерах с spot/preemptible instance-ами.
Проблема 3: read amplification при spill
Если map task создал несколько spill-файлов (из-за нехватки памяти), reduce task получает фрагментированные блоки и вынужден читать их раздельно. Это усиливает I/O amplification.
Архитектура Magnet: три участника
Magnet добавляет новую роль: Merge Pusher на стороне map task и MergedShuffleFileManager на стороне External Shuffle Service (ESS).
Классический pull:
map executor --[записал .data + .index]--> (ждёт)
reduce task --[fetch каждый блок]--> map executor (по одному)
Push-based (Magnet):
map executor --[push blocks]--> ESS (во время map-фазы)
ESS --[merge по partition]--> merged files (параллельно с push)
reduce task --[один запрос]--> ESS (читает merged block)
reduce task --[fallback, если merge неполный]--> map executor
Три ключевых класса:
ShuffleBlockPusher— на стороне executor. После того как map task записал.data + .index,ShuffleBlockPusherчитает блоки partition-за-partition и pushes их на ESS-узлы, выбранные driver-ом какmergerLocations.MergedShuffleFileManager(интерфейс) /RemoteBlockPushResolver(реализация) — на стороне ESS. Принимает push-запросы, открываетAppShuffleMergeManager, мержит входящие байты вmerged_shuffle_{shuffleId}_{shuffleMergeId}_{reduceId}.dataфайлы.MergeStatuses— результат merge phase. Driver получаетMergeStatusesот каждого ESS-узла и использует их при планировании reduce tasks: если для partitionpесть merged блок на ESS-узле X, reduce task назначается по возможности на узел X (data locality).
Фаза 1: map tasks пишут локальные .data/.index и параллельно push'ат блоки на ESS. Фаза 2: ESS мержит. Фаза 3: reduce tasks читают merged блоки с ESS. Стрелки показывают направление движения данных.
Merge протокол: как ESS мержит блоки
RemoteBlockPushResolver (реализация MergedShuffleFileManager) поддерживает для каждого (shuffleId, shuffleMergeId) набор AppShuffleMergePartitionsInfo. Для каждой partition это:
- Открытый
FileChannelв append-режиме на файлmerged_shuffle_{shuffleId}_{shuffleMergeId}_{partition}.data RoaringBitmap— bitmap map task-ов, блоки которых уже смержены (для отслеживания completeness)- Текущий offset в файле (для построения merged .index)
Когда приходит push-запрос (OpenDag + PushBlock):
1. Executor shouts: PUSH partition_block(shuffleId=0, mapId=42, reduceId=5, data=bytes)
2. ESS RemoteBlockPushResolver:
a. Найти AppShuffleMergePartitionsInfo для (shuffleId=0, reduceId=5)
b. Если bitmap уже содержит mapId=42 -- дубликат, отклонить
c. Append bytes в merged data file
d. bitmap.add(42); update offset
3. После finalizeShuffleMerge (сигнал от driver):
a. Записать merged .index файл
b. Вернуть MergeStatus(bitmap, sizes)
Роль bitmap для reduce task
Bitmap — это ключевой механизм для reduce task. Когда reduce task получает MergeStatus для partition p, bitmap говорит: «merged блок на ESS содержит данные от map tasks {0, 1, 3, 5, ...}». Если какие-то map tasks отсутствуют в bitmap (push не успел или упал), reduce task дочитывает недостающие блоки напрямую с executor-ов в обычном pull-режиме.
Это делает push-based shuffle best-effort: даже если push полностью провалится, корректность не нарушается — просто нет выигрыша по производительности.
Конфиги spark.shuffle.push.*
В Spark 3.2-4.0 push-based shuffle управляется следующими конфигами:
| Конфиг | По умолчанию | Описание |
|---|---|---|
spark.shuffle.push.enabled | false | Включить push-based shuffle |
spark.shuffle.push.finalize.timeout | 10s | Сколько ждать завершения push перед finalize |
spark.shuffle.push.maxBlockSizeToPush | 1m | Блоки крупнее этого не push’аются, читаются напрямую |
spark.shuffle.push.maxBlockBatchSize | 3m | Макс. суммарный размер одного batch push-запроса |
spark.shuffle.push.numPushThreads | (все ядра executor) | Потоков для push в пуле ShuffleBlockPusher |
spark.shuffle.push.minShuffleSizeToWait | 500m | Ниже этого размера finalize не ждёт |
spark.shuffle.push.minCompletedPushRatio | 1.0 | Доля map tasks, завершивших push, до finalize |
spark.shuffle.push.mergersMinThresholdRatio | 0.05 | Мин. отношение merger-узлов к shuffle partitions |
spark.shuffle.push.mergersMinStaticThreshold | 5 | Мин. абсолютное число merger-узлов |
spark.shuffle.push.server.mergedShuffleFileManagerImpl | org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager | Реализация MergedShuffleFileManager на ESS |
Для включения push-based shuffle недостаточно spark.shuffle.push.enabled = true. Обязательно:
- Запущен External Shuffle Service (YARN:
spark.shuffle.service.enabled = true) - ESS настроен с
spark.shuffle.push.server.mergedShuffleFileManagerImpl = org.apache.spark.network.shuffle.RemoteBlockPushResolver - Отключено IO-шифрование (
spark.io.encryption.enabled = false— несовместимо с push) - Сериализатор поддерживает relocation (UnsafeRowSerializer или Kryo с auto-reset)
Как настроить minCompletedPushRatio
spark.shuffle.push.minCompletedPushRatio — один из самых важных параметров. Он определяет: после какой доли завершённых push driver отправляет FinalizeShuffleMerge. Значение 1.0 (по умолчанию) означает «ждать, пока все map tasks завершат push». Это максимальная completeness, но может добавить latency.
В production с spot-инстансами разумно снизить до 0.8-0.9: reduce tasks, не получившие merged блок для некоторых map tasks, прозрачно fallback’ают на прямое чтение.
spark.conf.set("spark.shuffle.push.enabled", "true")
spark.conf.set("spark.shuffle.push.minCompletedPushRatio", "0.9")
spark.conf.set("spark.shuffle.push.finalize.timeout", "30s")
spark.conf.set("spark.shuffle.push.maxBlockSizeToPush", "4m")
Ограничения push-based shuffle
Push-based shuffle не является серебряной пулей. Важно понимать, когда он не поможет или навредит:
1. Только с YARN ESS. В Spark 4.0 push поддерживается только с YARN External Shuffle Service. Standalone и Kubernetes не поддерживаются (на момент Spark 4.0 — проверяйте release notes для вашей версии).
2. Нет выигрыша при большом числе мелких приложений. ESS должен обрабатывать push-запросы от всех приложений кластера. При высокой конкуренции ESS становится bottleneck.
3. Большие блоки не push’аются. Блоки размером более spark.shuffle.push.maxBlockSizeToPush (1 МБ по умолчанию) всегда читаются напрямую. Если shuffle partition большая (десятки МБ), push не поможет с такими блоками.
4. Дополнительная нагрузка на диски ESS. Merged файлы пишутся на диски ESS-узлов. На очень большом shuffle это может создать I/O bottleneck на ESS.
5. Push overhead при маленьком shuffle. При shuffle < spark.shuffle.push.minShuffleSizeToWait (500 МБ по умолчанию) push происходит, но finalize не ждёт — это снижает overhead для маленьких jobs.
Что смотреть в Spark UI
Для диагностики push-based shuffle в Spark UI добавлены новые метрики:
- Stages -> Task Metrics -> “Shuffle Push Time”: время, потраченное на push блоков. Если большое — executor тратит много времени на push (возможно, ESS перегружен).
- “Remote Merged Blocks Fetched” vs “Remote Blocks Fetched”: сколько merged блоков прочитано vs обычных. Высокая доля merged говорит о том, что push работает.
- “Corrupt Merged Block Chunks”: если > 0 — проблемы с integrity на ESS, возможно нехватка диска или race condition.
В логах ESS ищите:
INFO RemoteBlockPushResolver: Finalize shuffle 0, mergeId 0:
numPartitions=200, mergedBlocks=198, missingBlocks=2
missingBlocks=2 — это 2 партиции без merged блока (все reduce tasks для них пойдут в fallback режим).
Production: когда включать Magnet
На основе данных LinkedIn (5 PB/день, 30K+ приложений) и публичных бенчмарков:
- Рекомендуется: широкие shuffle (много map tasks, много reduce tasks), YARN кластер с ESS, DataFrame-workloads, долгоживущие приложения с повторяющимися shuffle.
- Не рекомендуется: Kubernetes-кластеры (нет поддержки), streaming приложения (shuffle pattern другой), shuffle с очень крупными partition-блоками (более 10 МБ — push не поможет).
- Измеримый эффект: снижение end-to-end latency на 10-30% для широких shuffle, снижение числа сетевых соединений на порядок.
Попробуй сам
# Симуляция push-based shuffle (для реальной работы нужен YARN ESS)
# Ниже -- код для диагностики push на уже настроенном кластере
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("push-shuffle-diagnostics")
# Предполагаем, что ESS настроен с RemoteBlockPushResolver
.config("spark.shuffle.push.enabled", "true")
.config("spark.shuffle.push.minCompletedPushRatio", "0.8")
.config("spark.shuffle.push.finalize.timeout", "30s")
.config("spark.shuffle.push.maxBlockSizeToPush", "4m")
.getOrCreate())
# Большой join -- хороший кандидат для push-based shuffle
orders = spark.read.parquet("/data/orders") # 100 GB
customers = spark.read.parquet("/data/customers") # 5 GB
result = (orders
.join(customers, "customer_id")
.groupBy("region", "product_category")
.agg(F.sum("amount").alias("total"), F.count("*").alias("cnt")))
result.write.parquet("/output/regional_sales")
# После выполнения проверяем в UI:
# Stages -> join stage -> Task Metrics:
# Remote Merged Blocks Fetched: должно быть близко к numReduceTasks * numMapTasks
# Remote Blocks Fetched: должно быть маленьким (fallback only)
# Shuffle Push Time: накладные расходы push
Для локального тестирования без ESS можно изучить MergedShuffleFileManager через unit-тесты Spark:
# Запустить тесты push-based shuffle (требует исходники Spark)
./build/mvn -pl core -Dtest=RemoteBlockPushResolverSuite test
External Shuffle Service: архитектура и ограничения