Apache Celeborn
Что такое Celeborn
Apache Celeborn — push-based remote shuffle service, изначально разработанный в Alibaba. Celeborn graduated как Apache Top-Level Project (TLP) в апреле 2024 года. Последняя стабильная версия — 0.6.2 (декабрь 2025).
Celeborn решает все три фундаментальных проблемы shuffle, описанные в L01: executor lifecycle coupling, disk I/O bottleneck и recomputation on failure. Данные push-атся на выделенный кластер Celeborn Workers, где хранятся, реплицируются и отдаются reduce tasks.
Архитектура
Celeborn состоит из четырёх компонентов: серверный кластер (Master + Worker) и клиентская часть (LifecycleManager + ShuffleClient):
Master cluster
Кластер Master-нод обеспечивает координацию и High Availability через Raft consensus. Active master принимает запросы на регистрацию shuffle и slot allocation — назначение partition-locations на конкретные Worker-ы. Standby master-ы готовы принять трафик при failover.
Master управляет ресурсами Worker-ов: сколько партиций назначено на каждый worker, текущая загрузка дисков, состояние здоровья.
Workers
Worker-ы — рабочие узлы, которые принимают, хранят и отдают shuffle-данные. Каждый worker работает с локальными дисками (SSD/HDD) и опционально с HDFS для дополнительной durability.
Ключевая операция worker — merge by partition: данные от разных map tasks для одной reduce partition объединяются в единый файл. Reduce task читает один merged файл вместо множества мелких блоков.
Worker-ы flush данные из memory в storage (SSD/HDFS), обеспечивая durability. Периодические disk health checks мониторят состояние дисков, изолируя failing диски от новых аллокаций.
LifecycleManager (клиент, Spark Driver)
LifecycleManager запускается на стороне Spark Driver и управляет метаданными shuffle:
- Регистрирует shuffle в Master (сколько partitions, конфигурация)
- Хранит PartitionLocation маппинг: какие партиции на каких worker-ах
- Координирует commit phase после завершения map stage
- Обрабатывает Revive запросы при сбоях worker-ов
ShuffleClient (клиент, Spark Executor)
ShuffleClient — клиентская библиотека в каждом Spark Executor. Она заменяет стандартный shuffle writer/reader:
- Push data: асинхронный push через DataPusher (non-blocking). Данные буферизуются в 64KB буфере и отправляются batch-ами
- Fetch data: получение merged partition-данных от worker-ов
- Compression: LZ4 или ZSTD сжатие перед push (настраивается)
Push-Based Shuffle Flow
Celeborn реализует 8-шаговый push-based shuffle flow:
- Register shuffle: Spark Driver (LifecycleManager) регистрирует shuffle в Celeborn Master — количество partitions, конфигурация
- Allocate slots: Master назначает PartitionLocation-ы на конкретные Worker-ы, балансируя нагрузку
- Push data: Map tasks через ShuffleClient асинхронно push-ат данные на назначенные Worker-ы. Данные сжимаются (LZ4/ZSTD) и включают заголовок:
mapId,attemptId,batchId,size - Buffer and batch: ShuffleClient буферизует данные в 64KB буфере и отправляет batch-ами (
PushData/PushMergedData) - Merge on Workers: Worker-ы получают данные и merge их по partition ID в PartitionLocation файлы. Данные от разных map tasks для одной reduce partition объединяются
- Flush to storage: Worker-ы flush данные из memory на SSD/HDD или HDFS
- Commit: После завершения всех map tasks LifecycleManager координирует commit — Worker-ы подтверждают, что данные записаны. Достаточно одной committed replica на PartitionLocation
- Fetch: Reduce tasks запрашивают merged partition-данные от Worker-ов
Partition Types
Celeborn поддерживает два типа организации данных:
ReducePartition (по умолчанию): данные организованы по reduce partition ID. Worker-ы merge все map outputs для одной partition в единый файл. Reduce task читает один файл — оптимально для большинства workloads.
MapPartition: данные организованы по map task. Используется credit-based shuffle read. Partition split включён по умолчанию. Полезен для сценариев с очень большим количеством reduce partitions, где merge by partition ID создаёт слишком много мелких файлов.
Fault Tolerance
Celeborn предоставляет многоуровневую fault tolerance:
Revive mechanism
Если push данных на worker завершается ошибкой, ShuffleClient запрашивает у LifecycleManager альтернативный PartitionLocation на другом worker. Данные перенаправляются на новый worker. Это прозрачно для map task — retry происходит автоматически.
Batched revive
При отказе worker все partition-locations на этом worker нужно переназначить. Вместо отдельных Revive запросов для каждой partition, Celeborn группирует их в batched revive — один RPC содержит все переназначения. Это предотвращает RPC flooding при массовом сбое.
Data replication
Данные могут быть реплицированы на несколько worker-ов. Commit считается успешным, когда хотя бы одна replica per PartitionLocation подтверждена. Конфигурируется через spark.celeborn.client.push.replica.enabled.
Exactly-once semantics
Каждый push содержит уникальный batch ID (комбинация mapId + attemptId + batchId). LifecycleManager отслеживает все committed locations. При retry дублирующие batch-и отклоняются — гарантия exactly-once.
Graceful shutdown и disk health
- Graceful shutdown: worker останавливает приём новых аллокаций и flush-ит все данные перед завершением
- Disk health checks: периодический мониторинг состояния дисков. Failing диски изолируются от новых аллокаций
- HARD_SPLIT: при низком свободном месте на диске Celeborn выполняет принудительное разделение affected partitions на другие worker-ы
Production Deployment
Конфигурация серверного кластера
Развёртывание Celeborn кластера начинается с конфигурации Master и Worker:
# celeborn-defaults.conf (Master + Worker общая конфигурация)
celeborn.master.endpoints=clb-master-1:9097,clb-master-2:9097,clb-master-3:9097
celeborn.master.port=9097
# Worker storage — укажите все доступные диски с типом
celeborn.worker.storage.dirs=/mnt/ssd1:disktype=SSD,/mnt/ssd2:disktype=SSD,\
/mnt/hdd1:disktype=HDD
celeborn.worker.flusher.buffer.size=256k
celeborn.metrics.enabled=true
celeborn.metrics.prometheus.port=9096
# Slot allocation policy
celeborn.master.slot.assign.policy=roundrobin
# celeborn-env.sh — ресурсы для Master и Worker
CELEBORN_MASTER_MEMORY=4g
CELEBORN_WORKER_MEMORY=2g
CELEBORN_WORKER_OFFHEAP_MEMORY=12g
# Формула: off-heap = (disk_buffer × num_disks) × 1.2
# Пример: 256k × 3 disks × 1.2 ≈ 1 MB — но для production
# буфер значительно больше, используйте 4-12 GB
CELEBORN_MASTER_JAVAOPT="-XX:+UseG1GC"
CELEBORN_WORKER_JAVAOPT="-XX:+UseG1GC -XX:MaxDirectMemorySize=12g"
Kubernetes Helm deployment
# celeborn-helm-values.yaml
master:
replicas: 3
resources:
requests:
memory: "4Gi"
cpu: "2"
env:
- name: CELEBORN_MASTER_MEMORY
value: "4g"
worker:
replicas: 10
resources:
requests:
memory: "16Gi"
cpu: "4"
volumes:
- name: shuffle-data
hostPath:
path: /mnt/ssd1
env:
- name: CELEBORN_WORKER_OFFHEAP_MEMORY
value: "12g"
metrics:
enabled: true
serviceMonitor:
enabled: true
port: 9096
Конфигурация Spark
spark-submit
spark-submit \
--conf spark.shuffle.manager=org.apache.spark.celeborn.CelebornShuffleManager \
--conf spark.celeborn.master.endpoints=clb-1:9097,clb-2:9097,clb-3:9097 \
--conf spark.celeborn.client.spark.shuffle.writer=hash \
--conf spark.celeborn.client.shuffle.compression.codec=lz4 \
--conf spark.celeborn.client.spark.shuffle.fallback.policy=AUTO \
--conf spark.celeborn.client.spark.stageRerun.enabled=true \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--jars celeborn-client-spark-3-shaded_2.12-0.6.2.jar \
your_app.py
SparkSession builder
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CelebornExample") \
.config("spark.shuffle.manager",
"org.apache.spark.celeborn.CelebornShuffleManager") \
.config("spark.celeborn.master.endpoints",
"clb-1:9097,clb-2:9097,clb-3:9097") \
.config("spark.celeborn.client.spark.shuffle.writer", "hash") \
.config("spark.celeborn.client.shuffle.compression.codec", "lz4") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
Ключевые параметры
| Параметр | Значение | Описание |
|---|---|---|
spark.shuffle.manager | CelebornShuffleManager | Замена стандартного shuffle manager |
spark.celeborn.master.endpoints | host:port,... | Адреса Celeborn Master кластера |
spark.celeborn.client.spark.shuffle.writer | hash / sort | sort — при высокой memory pressure |
spark.celeborn.client.shuffle.compression.codec | lz4 / zstd / none | LZ4 — баланс скорости и compression ratio |
spark.celeborn.client.spark.shuffle.fallback.policy | AUTO / ALWAYS / NEVER | AUTO — fallback на встроенный shuffle при недоступности Celeborn |
spark.celeborn.client.spark.stageRerun.enabled | true | Повторный запуск stage при shuffle failure |
spark.celeborn.client.push.buffer.max.size | 64k | Размер буфера для batching push данных |
Fallback policy = AUTO — рекомендация для production. Если Celeborn кластер недоступен или перегружен, Spark автоматически fallback на встроенный shuffle. Это обеспечивает resilience: job не падает при проблемах с Celeborn.
Мониторинг и администрирование
Health check и метрики
# Проверить состояние Master кластера (HTTP API)
curl http://clb-master-1:9098/metrics/prometheus
# Ключевые Celeborn метрики для Prometheus:
# celeborn_master_registered_shuffle_count — активные shuffle
# celeborn_worker_flusher_total_bytes — bytes flushed на диск
# celeborn_worker_active_connection_count — активные соединения
# celeborn_master_slot_count — распределённые слоты
# Проверить список Worker-ов через Master REST API
curl http://clb-master-1:9098/api/v1/workers
Поддержка версий
Spark4.0 Celeborn поддерживает широкий диапазон версий Spark: 2.4, 3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 4.0, 4.1. Это одно из ключевых преимуществ — возможность использования с новейшими версиями Spark, включая 4.x.
Помимо Spark, Celeborn поддерживает:
- Apache Flink 1.16—2.2
- Hadoop MapReduce 2/3
- Интеграция с Apache Gluten (см. М15/L03) для full native stack
Production adoption
Celeborn используется на масштабе крупнейшими технологическими компаниями:
| Компания | Контекст |
|---|---|
| Alibaba | Создатель Celeborn, EB-scale shuffle |
| ByteDance | Крупнейший deployment, conference talks |
| ApacheCon 2024 presentation | |
| Shopee | E-commerce analytics |
| Bilibili | Video platform analytics |
| BIGO | Video streaming |
| Xiaohongshu | Social commerce |
| Trip.com | Travel platform |
Итоги
- Celeborn — Apache TLP (апрель 2024), push-based remote shuffle service
- 4 компонента: Master (Raft HA), Worker (SSD/HDD + HDFS), LifecycleManager (Driver), ShuffleClient (Executor)
- Push-based flow: async push -> merge by partition -> commit -> fetch
- Fault tolerance: Revive mechanism, batched revive, data replication, exactly-once via batch IDs
- Конфигурация: замена
spark.shuffle.manager+ endpoints Master кластера - Spark 2.4—4.1 + Flink + MapReduce + Gluten integration
- В следующем уроке: Apache Uniffle и сравнение с Celeborn