Learning Platform
Глоссарий Troubleshooting
Урок 13.01 · 15 мин
Средний
ShuffleExternal Shuffle ServicePush-Based ShuffleRemote ShuffleCelebornUniffle

Основы shuffle и ограничения ESS

Shuffle как самая дорогая операция Spark

В М04/L01 мы разобрали shuffle как ключевую операцию перемещения данных между partitions: каждый Exchange в physical plan означает сетевой transfer, disk I/O и сериализацию. На кластерах с десятками executor-ов и терабайтами данных shuffle становится доминирующим bottleneck — до 70-80% времени job может уходить на shuffle.

Но проблема глубже, чем просто сетевой overhead. Архитектура shuffle в Spark имеет три фундаментальных ограничения, которые не решаются настройкой параметров. Разберём их подробно.

Архитектура встроенного shuffle

Стандартный shuffle в Spark работает по pull-based модели: map tasks записывают данные на локальный диск executor, а reduce tasks запрашивают (pull) нужные partition-блоки по сети:

Built-in Shuffle (pull-based)

Problem: executor dies → shuffle files LOST → recompute

Executor 1 (Map)
Map Task↓ Local Diskshuffle files
pull
Executor 3 (Reduce)
Reduce Task↑ Fetch

Map task вычисляет результат, партиционирует его по ключу (hash partitioning) и записывает shuffle files на локальный диск executor. Reduce task обращается к каждому map executor, запрашивает свою partition — это классическая all-to-all коммуникация.

Три фундаментальные проблемы

1. Executor lifecycle coupling

Shuffle-данные привязаны к жизненному циклу executor. Пока reduce tasks не забрали shuffle output, executor не может быть освобождён — даже если все его map tasks завершились. На кластере с dynamic allocation это означает, что executor-ы удерживают ресурсы (CPU, memory) дольше, чем нужно для вычислений.

В облачных средах (Kubernetes, YARN) это напрямую влияет на стоимость: executor держит node занятым, пока reduce tasks не завершат fetch. При длительных stage-ах это может означать часы простаивающих ресурсов.

2. Disk I/O bottleneck

Все shuffle-данные записываются на локальный диск executor. При большом shuffle (десятки GB на executor) это создаёт:

  • Random read pattern: reduce tasks запрашивают разные partition-блоки, создавая random I/O на диске executor
  • Конкуренция за I/O: несколько reduce tasks одновременно читают с одного executor, конкурируя за disk bandwidth
  • Нет распределения нагрузки: если один executor обрабатывает skewed partition, его диск становится hotspot для всех reduce tasks, которым нужна эта partition

3. Recomputation on failure

Если executor умирает (crash, OOM, preemption в spot/preemptible instances) — все shuffle файлы на его локальном диске теряются безвозвратно. Reduce tasks не могут получить данные, и Spark вынужден пересчитать весь upstream map stage на другом executor.

Для длительных ETL pipeline это катастрофично: потеря одного executor может запустить каскадный recompute, удваивая или утраивая время job. На кластерах со spot instances (AWS) или preemptible VMs (GCP) потеря executor — не исключение, а норма.

Проверка знанийKnowledge check
Почему потеря executor в стандартном Spark приводит к пересчёту upstream stages?
ОтветAnswer
Shuffle-данные хранятся на локальном диске executor. При crash executor файлы теряются, и reduce tasks не могут получить данные -- Spark вынужден пересчитать map stages. Remote shuffle сервисы решают это, храня данные на отдельном кластере.

External Shuffle Service (ESS)

Spark предоставляет встроенное решение — External Shuffle Service. ESS — это отдельный daemon-процесс на каждой node, который обслуживает shuffle-данные вместо executor:

# Включение External Shuffle Service
spark.shuffle.service.enabled = true

# ESS запускается как YARN auxiliary service или standalone daemon
# Executor регистрирует shuffle файлы в ESS
# ESS обслуживает fetch-запросы от reduce tasks

Полная конфигурация ESS с диагностикой:

# ESS на YARN — yarn-site.xml
yarn.nodemanager.aux-services=spark_shuffle
yarn.nodemanager.aux-services.spark_shuffle.class=\
  org.apache.spark.network.yarn.YarnShuffleService

# Spark-side ESS configuration
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=100
spark.dynamicAllocation.executorIdleTimeout=60s

ESS решает проблему lifecycle coupling: executor может быть освобождён сразу после завершения map tasks, потому что ESS продолжает обслуживать shuffle-данные. Dynamic allocation работает эффективнее.

Но ESS не решает остальные проблемы:

  • Shuffle-данные по-прежнему на локальном диске node — disk I/O bottleneck остаётся
  • При потере node (не executor, а всей машины) shuffle-данные теряются — recomputation всё ещё необходим
  • Нет распределения shuffle-нагрузки между node-ами — hotspot на skewed partition сохраняется

Push-Based Shuffle

Spark3.5 Push-based shuffle (экспериментально с Spark 3.2, production-ready в 3.5) — оптимизация, при которой mapper push-ит shuffle-блоки на reducer-ноды, вместо того чтобы reducer pull-ил их:

# Включение push-based shuffle (Spark 3.2+)
spark.shuffle.push.enabled = true
spark.shuffle.push.maxBlockSizeToPush = 1m
spark.shuffle.push.maxBlockBatchSize = 3m

# Требует External Shuffle Service
spark.shuffle.service.enabled = true

Полная spark-submit конфигурация с push-based shuffle:

spark-submit \
  --name "push-shuffle-job" \
  --conf spark.shuffle.service.enabled=true \
  --conf spark.shuffle.push.enabled=true \
  --conf spark.shuffle.push.maxBlockSizeToPush=1m \
  --conf spark.shuffle.push.maxBlockBatchSize=3m \
  --conf spark.shuffle.push.mergePushMinSize=4m \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
  your_shuffle_heavy_app.py

Push-based shuffle создаёт merged shuffle файлы на ESS node-ах: вместо множества мелких блоков от разных mapper-ов, reduce task читает один крупный merged файл. Это снижает количество fetch-запросов и уменьшает random I/O.

Улучшения:

  • Меньше fetch-запросов (1 merged файл вместо N блоков)
  • Sequential read вместо random I/O
  • Merge на ESS node снижает network traffic

Но push-based shuffle всё ещё работает на локальных дисках node — фундаментальная проблема потери данных при crash node остаётся нерешённой.

Проверка знанийKnowledge check
Какую из трёх фундаментальных проблем shuffle решает ESS, а какие остаются?
ОтветAnswer
ESS решает executor lifecycle coupling -- executor может быть освобождён после завершения map tasks, потому что ESS обслуживает shuffle-данные. Но ESS НЕ решает: (1) disk I/O bottleneck -- данные по-прежнему на локальном диске node; (2) recomputation on failure -- при потере node (crash machine) shuffle-данные теряются.

Remote Shuffle Services: решение

Remote shuffle services полностью отделяют shuffle-данные от compute-кластера. Вместо записи на локальный диск executor или node, map tasks push-ат данные на отдельный выделенный кластер shuffle-серверов:

Remote Shuffle (push-based)

Benefit: executor dies → shuffle data SAFE on remote servers

Executor 1 (Map)
Map TaskShuffleClient (async)
push
Shuffle Service
Merged databy partitionReplicated storage
Executor 3 (Reduce)
Reduce TaskFetch merged data

Преимущества disaggregated shuffle

  1. Elastic scaling: compute и shuffle масштабируются независимо. При пиковой shuffle-нагрузке можно добавить shuffle-серверов без увеличения compute
  2. Нет recomputation: shuffle-данные реплицированы на shuffle-серверах. Потеря executor или даже целой compute-ноды не требует пересчёта
  3. Независимый lifecycle: executor освобождается сразу после push данных. Shuffle-серверы управляют данными до завершения job
  4. Оптимизация I/O: shuffle-серверы оптимизированы для хранения и выдачи shuffle-данных (SSD, memory caching, merge by partition)
  5. Снижение стоимости: в облачных средах compute instances могут быть spot/preemptible (дешёвые, но могут быть отозваны), потому что потеря executor не приводит к потере shuffle-данных

Apache-проекты

Два ведущих open-source remote shuffle service — оба являются Apache Top-Level Projects:

  • Apache Celeborn (TLP с апреля 2024, v0.6.2) — push-based remote shuffle от Alibaba. Архитектура Master/Worker/LifecycleManager. Поддерживает Spark 2.4-4.1, Flink, MapReduce. В production у ByteDance, Alibaba, Pinterest, Shopee
  • Apache Uniffle (TLP с февраля 2025, v0.9.1) — remote shuffle от Tencent. Архитектура Coordinator/Shuffle Server с 3-tier storage (Memory + Local + HDFS). Поддерживает Spark 2.3-3.5, MapReduce. В production у Tencent, iQiyi, Didi, Bilibili

В следующих двух уроках мы подробно разберём архитектуру, конфигурацию и fault tolerance обоих проектов, а затем сравним их для выбора в production.

Проверка знанийKnowledge check
Почему remote shuffle особенно выгоден в облачных средах со spot instances?
ОтветAnswer
Spot instances могут быть отозваны cloud provider в любой момент. В стандартном Spark потеря executor на spot instance означает потерю shuffle-данных и recomputation upstream stages. С remote shuffle сервисом shuffle-данные хранятся на отдельных серверах -- потеря spot instance не приводит к потере данных. Это позволяет использовать дешёвые spot instances для compute без риска дорогого recomputation.

Итоги

  • Встроенный shuffle Spark имеет три фундаментальных ограничения: executor lifecycle coupling, disk I/O bottleneck, recomputation on failure
  • External Shuffle Service (ESS) решает lifecycle coupling, но не disk I/O и не recomputation при потере node
  • Push-based shuffle (Spark 3.2+) оптимизирует I/O через merge, но данные остаются на локальных дисках
  • Remote shuffle services (Celeborn, Uniffle) полностью отделяют shuffle от compute: независимый lifecycle, нет recomputation, elastic scaling
  • Далее: deep-dive в Apache Celeborn (L02) и Apache Uniffle + сравнение (L03)

Диагностика shuffle bottleneck

Для определения, является ли shuffle узким местом вашего workload:

-- Spark SQL: анализ shuffle метрик через Spark UI REST API
-- или через SparkListener в коде:
-- GET /api/v1/applications/{appId}/stages

-- В PySpark — программный анализ shuffle size
spark.sql("""
  SELECT
    stage_id,
    shuffle_read_bytes / 1024 / 1024 AS shuffle_read_mb,
    shuffle_write_bytes / 1024 / 1024 AS shuffle_write_mb,
    shuffle_read_records,
    executor_run_time / 1000 AS exec_time_sec
  FROM spark_stage_metrics
  WHERE shuffle_read_bytes > 0
  ORDER BY shuffle_read_bytes DESC
""")
# Ключевые метрики для мониторинга shuffle health
spark.sql.shuffle.partitions=200
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true

# Увеличение fetch retry при нестабильной сети
spark.shuffle.io.maxRetries=5
spark.shuffle.io.retryWait=10s
spark.reducer.maxBlocksInFlightPerAddress=5

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 8. При crash executor в стандартном Spark shuffle-данные на его локальном диске теряются безвозвратно, и Spark вынужден пересчитать весь upstream map stage.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3