Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 20 мин
Продвинутый
CelebornRemote ShufflePush-Based ShuffleLifecycleManagerFault ToleranceRaft

Apache Celeborn

Что такое Celeborn

Apache Celeborn — push-based remote shuffle service, изначально разработанный в Alibaba. Celeborn graduated как Apache Top-Level Project (TLP) в апреле 2024 года. Последняя стабильная версия — 0.6.2 (декабрь 2025).

Celeborn решает все три фундаментальных проблемы shuffle, описанные в L01: executor lifecycle coupling, disk I/O bottleneck и recomputation on failure. Данные push-атся на выделенный кластер Celeborn Workers, где хранятся, реплицируются и отдаются reduce tasks.

Архитектура

Celeborn состоит из четырёх компонентов: серверный кластер (Master + Worker) и клиентская часть (LifecycleManager + ShuffleClient):

Celeborn Cluster
Master (active)
Master (standby)
Master (standby)
Raft HA
slot allocation
Workers
Worker 1SSD/HDD + HDFS
Worker 2SSD/HDD + HDFS
...
push data
Spark ExecutorShuffleClient (Map)
fetch merged
Spark ExecutorShuffleClient (Reduce)
metadata
Spark DriverLifecycleManager

Master cluster

Кластер Master-нод обеспечивает координацию и High Availability через Raft consensus. Active master принимает запросы на регистрацию shuffle и slot allocation — назначение partition-locations на конкретные Worker-ы. Standby master-ы готовы принять трафик при failover.

Master управляет ресурсами Worker-ов: сколько партиций назначено на каждый worker, текущая загрузка дисков, состояние здоровья.

Workers

Worker-ы — рабочие узлы, которые принимают, хранят и отдают shuffle-данные. Каждый worker работает с локальными дисками (SSD/HDD) и опционально с HDFS для дополнительной durability.

Ключевая операция worker — merge by partition: данные от разных map tasks для одной reduce partition объединяются в единый файл. Reduce task читает один merged файл вместо множества мелких блоков.

Worker-ы flush данные из memory в storage (SSD/HDFS), обеспечивая durability. Периодические disk health checks мониторят состояние дисков, изолируя failing диски от новых аллокаций.

LifecycleManager (клиент, Spark Driver)

LifecycleManager запускается на стороне Spark Driver и управляет метаданными shuffle:

  • Регистрирует shuffle в Master (сколько partitions, конфигурация)
  • Хранит PartitionLocation маппинг: какие партиции на каких worker-ах
  • Координирует commit phase после завершения map stage
  • Обрабатывает Revive запросы при сбоях worker-ов

ShuffleClient (клиент, Spark Executor)

ShuffleClient — клиентская библиотека в каждом Spark Executor. Она заменяет стандартный shuffle writer/reader:

  • Push data: асинхронный push через DataPusher (non-blocking). Данные буферизуются в 64KB буфере и отправляются batch-ами
  • Fetch data: получение merged partition-данных от worker-ов
  • Compression: LZ4 или ZSTD сжатие перед push (настраивается)
Проверка знанийKnowledge check
Почему LifecycleManager запускается на стороне Spark Driver, а не на Celeborn Master?
ОтветAnswer
LifecycleManager управляет метаданными конкретного Spark application (маппинг партиций, координация commit). Размещение на Driver позволяет: (1) избежать нагрузки на Master от per-application metadata; (2) обеспечить прямую связь с ShuffleClient в executor-ах; (3) использовать lifecycle Driver -- metadata живёт ровно столько, сколько application. Master остаётся stateless относительно конкретных application-ов.

Push-Based Shuffle Flow

Celeborn реализует 8-шаговый push-based shuffle flow:

  1. Register shuffle: Spark Driver (LifecycleManager) регистрирует shuffle в Celeborn Master — количество partitions, конфигурация
  2. Allocate slots: Master назначает PartitionLocation-ы на конкретные Worker-ы, балансируя нагрузку
  3. Push data: Map tasks через ShuffleClient асинхронно push-ат данные на назначенные Worker-ы. Данные сжимаются (LZ4/ZSTD) и включают заголовок: mapId, attemptId, batchId, size
  4. Buffer and batch: ShuffleClient буферизует данные в 64KB буфере и отправляет batch-ами (PushData / PushMergedData)
  5. Merge on Workers: Worker-ы получают данные и merge их по partition ID в PartitionLocation файлы. Данные от разных map tasks для одной reduce partition объединяются
  6. Flush to storage: Worker-ы flush данные из memory на SSD/HDD или HDFS
  7. Commit: После завершения всех map tasks LifecycleManager координирует commit — Worker-ы подтверждают, что данные записаны. Достаточно одной committed replica на PartitionLocation
  8. Fetch: Reduce tasks запрашивают merged partition-данные от Worker-ов

Partition Types

Celeborn поддерживает два типа организации данных:

ReducePartition (по умолчанию): данные организованы по reduce partition ID. Worker-ы merge все map outputs для одной partition в единый файл. Reduce task читает один файл — оптимально для большинства workloads.

MapPartition: данные организованы по map task. Используется credit-based shuffle read. Partition split включён по умолчанию. Полезен для сценариев с очень большим количеством reduce partitions, где merge by partition ID создаёт слишком много мелких файлов.

Fault Tolerance

Celeborn предоставляет многоуровневую fault tolerance:

Revive mechanism

Если push данных на worker завершается ошибкой, ShuffleClient запрашивает у LifecycleManager альтернативный PartitionLocation на другом worker. Данные перенаправляются на новый worker. Это прозрачно для map task — retry происходит автоматически.

Batched revive

При отказе worker все partition-locations на этом worker нужно переназначить. Вместо отдельных Revive запросов для каждой partition, Celeborn группирует их в batched revive — один RPC содержит все переназначения. Это предотвращает RPC flooding при массовом сбое.

Data replication

Данные могут быть реплицированы на несколько worker-ов. Commit считается успешным, когда хотя бы одна replica per PartitionLocation подтверждена. Конфигурируется через spark.celeborn.client.push.replica.enabled.

Exactly-once semantics

Каждый push содержит уникальный batch ID (комбинация mapId + attemptId + batchId). LifecycleManager отслеживает все committed locations. При retry дублирующие batch-и отклоняются — гарантия exactly-once.

Graceful shutdown и disk health

  • Graceful shutdown: worker останавливает приём новых аллокаций и flush-ит все данные перед завершением
  • Disk health checks: периодический мониторинг состояния дисков. Failing диски изолируются от новых аллокаций
  • HARD_SPLIT: при низком свободном месте на диске Celeborn выполняет принудительное разделение affected partitions на другие worker-ы
Проверка знанийKnowledge check
Что происходит при Revive в Celeborn, когда push данных на worker завершается ошибкой?
ОтветAnswer
ShuffleClient запрашивает у LifecycleManager альтернативный PartitionLocation на другом worker. Данные перенаправляются на новый worker. При массовом отказе worker запросы группируются (batched revive) для предотвращения RPC flooding. Exactly-once гарантируется через уникальные batch ID.

Production Deployment

Конфигурация серверного кластера

Развёртывание Celeborn кластера начинается с конфигурации Master и Worker:

# celeborn-defaults.conf (Master + Worker общая конфигурация)
celeborn.master.endpoints=clb-master-1:9097,clb-master-2:9097,clb-master-3:9097
celeborn.master.port=9097

# Worker storage — укажите все доступные диски с типом
celeborn.worker.storage.dirs=/mnt/ssd1:disktype=SSD,/mnt/ssd2:disktype=SSD,\
  /mnt/hdd1:disktype=HDD
celeborn.worker.flusher.buffer.size=256k
celeborn.metrics.enabled=true
celeborn.metrics.prometheus.port=9096

# Slot allocation policy
celeborn.master.slot.assign.policy=roundrobin
# celeborn-env.sh — ресурсы для Master и Worker
CELEBORN_MASTER_MEMORY=4g
CELEBORN_WORKER_MEMORY=2g
CELEBORN_WORKER_OFFHEAP_MEMORY=12g
# Формула: off-heap = (disk_buffer × num_disks) × 1.2
# Пример: 256k × 3 disks × 1.2 ≈ 1 MB — но для production
# буфер значительно больше, используйте 4-12 GB

CELEBORN_MASTER_JAVAOPT="-XX:+UseG1GC"
CELEBORN_WORKER_JAVAOPT="-XX:+UseG1GC -XX:MaxDirectMemorySize=12g"

Kubernetes Helm deployment

# celeborn-helm-values.yaml
master:
  replicas: 3
  resources:
    requests:
      memory: "4Gi"
      cpu: "2"
  env:
    - name: CELEBORN_MASTER_MEMORY
      value: "4g"

worker:
  replicas: 10
  resources:
    requests:
      memory: "16Gi"
      cpu: "4"
  volumes:
    - name: shuffle-data
      hostPath:
        path: /mnt/ssd1
  env:
    - name: CELEBORN_WORKER_OFFHEAP_MEMORY
      value: "12g"

metrics:
  enabled: true
  serviceMonitor:
    enabled: true
    port: 9096

Конфигурация Spark

spark-submit

spark-submit \
  --conf spark.shuffle.manager=org.apache.spark.celeborn.CelebornShuffleManager \
  --conf spark.celeborn.master.endpoints=clb-1:9097,clb-2:9097,clb-3:9097 \
  --conf spark.celeborn.client.spark.shuffle.writer=hash \
  --conf spark.celeborn.client.shuffle.compression.codec=lz4 \
  --conf spark.celeborn.client.spark.shuffle.fallback.policy=AUTO \
  --conf spark.celeborn.client.spark.stageRerun.enabled=true \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --jars celeborn-client-spark-3-shaded_2.12-0.6.2.jar \
  your_app.py

SparkSession builder

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CelebornExample") \
    .config("spark.shuffle.manager",
            "org.apache.spark.celeborn.CelebornShuffleManager") \
    .config("spark.celeborn.master.endpoints",
            "clb-1:9097,clb-2:9097,clb-3:9097") \
    .config("spark.celeborn.client.spark.shuffle.writer", "hash") \
    .config("spark.celeborn.client.shuffle.compression.codec", "lz4") \
    .config("spark.serializer",
            "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

Ключевые параметры

ПараметрЗначениеОписание
spark.shuffle.managerCelebornShuffleManagerЗамена стандартного shuffle manager
spark.celeborn.master.endpointshost:port,...Адреса Celeborn Master кластера
spark.celeborn.client.spark.shuffle.writerhash / sortsort — при высокой memory pressure
spark.celeborn.client.shuffle.compression.codeclz4 / zstd / noneLZ4 — баланс скорости и compression ratio
spark.celeborn.client.spark.shuffle.fallback.policyAUTO / ALWAYS / NEVERAUTO — fallback на встроенный shuffle при недоступности Celeborn
spark.celeborn.client.spark.stageRerun.enabledtrueПовторный запуск stage при shuffle failure
spark.celeborn.client.push.buffer.max.size64kРазмер буфера для batching push данных
TIP

Fallback policy = AUTO — рекомендация для production. Если Celeborn кластер недоступен или перегружен, Spark автоматически fallback на встроенный shuffle. Это обеспечивает resilience: job не падает при проблемах с Celeborn.

Мониторинг и администрирование

Health check и метрики

# Проверить состояние Master кластера (HTTP API)
curl http://clb-master-1:9098/metrics/prometheus

# Ключевые Celeborn метрики для Prometheus:
# celeborn_master_registered_shuffle_count — активные shuffle
# celeborn_worker_flusher_total_bytes — bytes flushed на диск
# celeborn_worker_active_connection_count — активные соединения
# celeborn_master_slot_count — распределённые слоты

# Проверить список Worker-ов через Master REST API
curl http://clb-master-1:9098/api/v1/workers

Поддержка версий

Spark4.0 Celeborn поддерживает широкий диапазон версий Spark: 2.4, 3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 4.0, 4.1. Это одно из ключевых преимуществ — возможность использования с новейшими версиями Spark, включая 4.x.

Помимо Spark, Celeborn поддерживает:

  • Apache Flink 1.16—2.2
  • Hadoop MapReduce 2/3
  • Интеграция с Apache Gluten (см. М15/L03) для full native stack

Production adoption

Celeborn используется на масштабе крупнейшими технологическими компаниями:

КомпанияКонтекст
AlibabaСоздатель Celeborn, EB-scale shuffle
ByteDanceКрупнейший deployment, conference talks
PinterestApacheCon 2024 presentation
ShopeeE-commerce analytics
BilibiliVideo platform analytics
BIGOVideo streaming
XiaohongshuSocial commerce
Trip.comTravel platform
Проверка знанийKnowledge check
Какие преимущества даёт интеграция Celeborn с Apache Gluten?
ОтветAnswer
Celeborn + Gluten создают full native stack: Gluten ускоряет execution через нативные C++ движки (Velox/ClickHouse), а Celeborn ускоряет shuffle через remote service. Вместе они устраняют два основных bottleneck Spark: JVM overhead (Gluten) и shuffle I/O (Celeborn). При этом ColumnarShuffleManager Gluten совместим с Celeborn. Подробнее о Gluten см. М15/L03.

Итоги

  • Celeborn — Apache TLP (апрель 2024), push-based remote shuffle service
  • 4 компонента: Master (Raft HA), Worker (SSD/HDD + HDFS), LifecycleManager (Driver), ShuffleClient (Executor)
  • Push-based flow: async push -> merge by partition -> commit -> fetch
  • Fault tolerance: Revive mechanism, batched revive, data replication, exactly-once via batch IDs
  • Конфигурация: замена spark.shuffle.manager + endpoints Master кластера
  • Spark 2.4—4.1 + Flink + MapReduce + Gluten integration
  • В следующем уроке: Apache Uniffle и сравнение с Celeborn

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 8. Где запускается LifecycleManager в архитектуре Celeborn и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3