Основы shuffle и ограничения ESS
Shuffle как самая дорогая операция Spark
В М04/L01 мы разобрали shuffle как ключевую операцию перемещения данных между partitions: каждый Exchange в physical plan означает сетевой transfer, disk I/O и сериализацию. На кластерах с десятками executor-ов и терабайтами данных shuffle становится доминирующим bottleneck — до 70-80% времени job может уходить на shuffle.
Но проблема глубже, чем просто сетевой overhead. Архитектура shuffle в Spark имеет три фундаментальных ограничения, которые не решаются настройкой параметров. Разберём их подробно.
Архитектура встроенного shuffle
Стандартный shuffle в Spark работает по pull-based модели: map tasks записывают данные на локальный диск executor, а reduce tasks запрашивают (pull) нужные partition-блоки по сети:
Problem: executor dies → shuffle files LOST → recompute
Map task вычисляет результат, партиционирует его по ключу (hash partitioning) и записывает shuffle files на локальный диск executor. Reduce task обращается к каждому map executor, запрашивает свою partition — это классическая all-to-all коммуникация.
Три фундаментальные проблемы
1. Executor lifecycle coupling
Shuffle-данные привязаны к жизненному циклу executor. Пока reduce tasks не забрали shuffle output, executor не может быть освобождён — даже если все его map tasks завершились. На кластере с dynamic allocation это означает, что executor-ы удерживают ресурсы (CPU, memory) дольше, чем нужно для вычислений.
В облачных средах (Kubernetes, YARN) это напрямую влияет на стоимость: executor держит node занятым, пока reduce tasks не завершат fetch. При длительных stage-ах это может означать часы простаивающих ресурсов.
2. Disk I/O bottleneck
Все shuffle-данные записываются на локальный диск executor. При большом shuffle (десятки GB на executor) это создаёт:
- Random read pattern: reduce tasks запрашивают разные partition-блоки, создавая random I/O на диске executor
- Конкуренция за I/O: несколько reduce tasks одновременно читают с одного executor, конкурируя за disk bandwidth
- Нет распределения нагрузки: если один executor обрабатывает skewed partition, его диск становится hotspot для всех reduce tasks, которым нужна эта partition
3. Recomputation on failure
Если executor умирает (crash, OOM, preemption в spot/preemptible instances) — все shuffle файлы на его локальном диске теряются безвозвратно. Reduce tasks не могут получить данные, и Spark вынужден пересчитать весь upstream map stage на другом executor.
Для длительных ETL pipeline это катастрофично: потеря одного executor может запустить каскадный recompute, удваивая или утраивая время job. На кластерах со spot instances (AWS) или preemptible VMs (GCP) потеря executor — не исключение, а норма.
External Shuffle Service (ESS)
Spark предоставляет встроенное решение — External Shuffle Service. ESS — это отдельный daemon-процесс на каждой node, который обслуживает shuffle-данные вместо executor:
# Включение External Shuffle Service
spark.shuffle.service.enabled = true
# ESS запускается как YARN auxiliary service или standalone daemon
# Executor регистрирует shuffle файлы в ESS
# ESS обслуживает fetch-запросы от reduce tasks
Полная конфигурация ESS с диагностикой:
# ESS на YARN — yarn-site.xml
yarn.nodemanager.aux-services=spark_shuffle
yarn.nodemanager.aux-services.spark_shuffle.class=\
org.apache.spark.network.yarn.YarnShuffleService
# Spark-side ESS configuration
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=100
spark.dynamicAllocation.executorIdleTimeout=60s
ESS решает проблему lifecycle coupling: executor может быть освобождён сразу после завершения map tasks, потому что ESS продолжает обслуживать shuffle-данные. Dynamic allocation работает эффективнее.
Но ESS не решает остальные проблемы:
- Shuffle-данные по-прежнему на локальном диске node — disk I/O bottleneck остаётся
- При потере node (не executor, а всей машины) shuffle-данные теряются — recomputation всё ещё необходим
- Нет распределения shuffle-нагрузки между node-ами — hotspot на skewed partition сохраняется
Push-Based Shuffle
Spark3.5 Push-based shuffle (экспериментально с Spark 3.2, production-ready в 3.5) — оптимизация, при которой mapper push-ит shuffle-блоки на reducer-ноды, вместо того чтобы reducer pull-ил их:
# Включение push-based shuffle (Spark 3.2+)
spark.shuffle.push.enabled = true
spark.shuffle.push.maxBlockSizeToPush = 1m
spark.shuffle.push.maxBlockBatchSize = 3m
# Требует External Shuffle Service
spark.shuffle.service.enabled = true
Полная spark-submit конфигурация с push-based shuffle:
spark-submit \
--name "push-shuffle-job" \
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.push.enabled=true \
--conf spark.shuffle.push.maxBlockSizeToPush=1m \
--conf spark.shuffle.push.maxBlockBatchSize=3m \
--conf spark.shuffle.push.mergePushMinSize=4m \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
your_shuffle_heavy_app.py
Push-based shuffle создаёт merged shuffle файлы на ESS node-ах: вместо множества мелких блоков от разных mapper-ов, reduce task читает один крупный merged файл. Это снижает количество fetch-запросов и уменьшает random I/O.
Улучшения:
- Меньше fetch-запросов (1 merged файл вместо N блоков)
- Sequential read вместо random I/O
- Merge на ESS node снижает network traffic
Но push-based shuffle всё ещё работает на локальных дисках node — фундаментальная проблема потери данных при crash node остаётся нерешённой.
Remote Shuffle Services: решение
Remote shuffle services полностью отделяют shuffle-данные от compute-кластера. Вместо записи на локальный диск executor или node, map tasks push-ат данные на отдельный выделенный кластер shuffle-серверов:
Benefit: executor dies → shuffle data SAFE on remote servers
Преимущества disaggregated shuffle
- Elastic scaling: compute и shuffle масштабируются независимо. При пиковой shuffle-нагрузке можно добавить shuffle-серверов без увеличения compute
- Нет recomputation: shuffle-данные реплицированы на shuffle-серверах. Потеря executor или даже целой compute-ноды не требует пересчёта
- Независимый lifecycle: executor освобождается сразу после push данных. Shuffle-серверы управляют данными до завершения job
- Оптимизация I/O: shuffle-серверы оптимизированы для хранения и выдачи shuffle-данных (SSD, memory caching, merge by partition)
- Снижение стоимости: в облачных средах compute instances могут быть spot/preemptible (дешёвые, но могут быть отозваны), потому что потеря executor не приводит к потере shuffle-данных
Apache-проекты
Два ведущих open-source remote shuffle service — оба являются Apache Top-Level Projects:
- Apache Celeborn (TLP с апреля 2024, v0.6.2) — push-based remote shuffle от Alibaba. Архитектура Master/Worker/LifecycleManager. Поддерживает Spark 2.4-4.1, Flink, MapReduce. В production у ByteDance, Alibaba, Pinterest, Shopee
- Apache Uniffle (TLP с февраля 2025, v0.9.1) — remote shuffle от Tencent. Архитектура Coordinator/Shuffle Server с 3-tier storage (Memory + Local + HDFS). Поддерживает Spark 2.3-3.5, MapReduce. В production у Tencent, iQiyi, Didi, Bilibili
В следующих двух уроках мы подробно разберём архитектуру, конфигурацию и fault tolerance обоих проектов, а затем сравним их для выбора в production.
Итоги
- Встроенный shuffle Spark имеет три фундаментальных ограничения: executor lifecycle coupling, disk I/O bottleneck, recomputation on failure
- External Shuffle Service (ESS) решает lifecycle coupling, но не disk I/O и не recomputation при потере node
- Push-based shuffle (Spark 3.2+) оптимизирует I/O через merge, но данные остаются на локальных дисках
- Remote shuffle services (Celeborn, Uniffle) полностью отделяют shuffle от compute: независимый lifecycle, нет recomputation, elastic scaling
- Далее: deep-dive в Apache Celeborn (L02) и Apache Uniffle + сравнение (L03)
Диагностика shuffle bottleneck
Для определения, является ли shuffle узким местом вашего workload:
-- Spark SQL: анализ shuffle метрик через Spark UI REST API
-- или через SparkListener в коде:
-- GET /api/v1/applications/{appId}/stages
-- В PySpark — программный анализ shuffle size
spark.sql("""
SELECT
stage_id,
shuffle_read_bytes / 1024 / 1024 AS shuffle_read_mb,
shuffle_write_bytes / 1024 / 1024 AS shuffle_write_mb,
shuffle_read_records,
executor_run_time / 1000 AS exec_time_sec
FROM spark_stage_metrics
WHERE shuffle_read_bytes > 0
ORDER BY shuffle_read_bytes DESC
""")
# Ключевые метрики для мониторинга shuffle health
spark.sql.shuffle.partitions=200
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# Увеличение fetch retry при нестабильной сети
spark.shuffle.io.maxRetries=5
spark.shuffle.io.retryWait=10s
spark.reducer.maxBlocksInFlightPerAddress=5