JobManager и TaskManager: основная архитектура
Flink-кластер состоит из двух типов процессов: JobManager (координатор) и TaskManager (worker). Этот урок — обзор того, как они устроены и взаимодействуют. Без deep dive в internals — для production-инженера достаточно понимать модель, чтобы читать логи, конфигурировать ресурсы, и интерпретировать поведение в Web UI.
К концу урока вы будете знать, кто за что отвечает, и сможете правильно отвечать на вопросы типа “почему JobManager OOM, а TaskManager в порядке” или “сколько slots мне нужно для job с parallelism=20”.
JobManager: мозг кластера
JobManager — это master-процесс. В кластере типично 1 активный JobManager (с опциональным standby в HA-режиме). Он не обрабатывает данные сам — он координирует TaskManager’ов.
Внутри JobManager — три логических компонента:
Dispatcher (REST + WebUI)
Dispatcher: REST API + WebUI. Принимает submit job, возвращает JobID, обслуживает запросы статуса. Это интерфейс, через который клиенты взаимодействуют с кластером.ResourceManager (slot allocation)
ResourceManager: управляет TaskManager slots. Решает, на каком TaskManager разместить subtask. В Native Kubernetes mode общается с K8s API server для запроса pods.JobMaster (per-job coordinator)
JobMaster: per-job координатор. Один JobMaster на job. Управляет ExecutionGraph, scheduling subtasks, checkpoints, failover. В Application mode — один JobMaster на JobManager.Dispatcher
Dispatcher — это REST API и Web UI endpoint. Когда вы делаете flink run job.jar, CLI отправляет HTTP-запрос на Dispatcher (POST /jobs). Dispatcher принимает JobGraph, сохраняет JAR в blob store, и запускает JobMaster для этого job.
Также Dispatcher отдаёт Web UI — то, что вы видите на http://localhost:8081. Все ваши клики в UI — это REST-запросы к Dispatcher.
ResourceManager
ResourceManager управляет slot’ами — ресурсами для выполнения subtasks. Когда JobMaster нужны slots для развёртывания job (например, 4 slot для parallelism=4), он просит ResourceManager.
В разных режимах ResourceManager работает по-разному:
- Standalone (наш docker compose): TaskManager’ы зарегистрированы статически. ResourceManager выделяет slots из уже доступных.
- Native Kubernetes: ResourceManager общается с Kubernetes API server. Когда нужны slots, создаёт новые TaskManager pods через K8s API. Это позволяет автоматический скейлинг.
- YARN: то же самое, но через YARN ResourceManager.
JobMaster
JobMaster — это per-job компонент. Один JobMaster координирует один job: строит ExecutionGraph, размещает subtasks на TaskManager’ах, триггерит checkpoints, обрабатывает failover при падении TaskManager.
В Application mode у вас один job на JobManager — следовательно, один JobMaster. В Session mode на одном JobManager могут быть несколько JobMaster’ов (по числу запущенных jobs).
Внутренности Dispatcher и JobMasterЭти три компонента живут в одном JVM-процессе. Вы НЕ видите их как отдельные сущности в логах — они логически разделены внутри одного JobManager. Когда говорят “JobManager упал” — упал весь JVM, включая все три компонента.
TaskManager: рабочая лошадка
TaskManager — это worker-процесс. В кластере типично много TaskManager’ов (от 2-3 для маленьких задач до сотен для production). Каждый TaskManager — отдельный JVM-процесс с настроенным количеством slots.
Slots
Slot — единица выделения ресурсов в TaskManager’е. Если TaskManager имеет 4 slots, он может выполнять до 4 subtasks одновременно. Slots — это static configuration: taskmanager.numberOfTaskSlots: 4.
Каждый slot получает фиксированную долю memory TaskManager’а. Если TaskManager имеет 4 GB heap и 4 slots, каждый slot — примерно 1 GB. (Точнее, есть network memory, off-heap, managed memory — но общий принцип такой.)
Распределение slots по job parallelism:
- Job с parallelism=10 нужно 10 slots ВСЕГО (с slot sharing — по одному slot на параллельный pipeline).
- Эти 10 slots могут быть распределены: 2 TaskManager’а по 5 slots, или 5 TaskManager’ов по 2 slots, и так далее.
- Если slots не хватает (например, кластер имеет 8 slots total) — job в RESCALING / SUSPENDED state, ждёт ресурсов.
Slot sharing
По умолчанию slot sharing включён: subtasks разных операторов одного job могут делить slot. Это резко уменьшает количество нужных slots.
Пример: job с операторами Source/parallelism=4, Map/parallelism=4, Aggregate/parallelism=4, Sink/parallelism=4. Без slot sharing нужно 16 slots. С slot sharing — 4 slots (по одному pipeline в каждом slot).
Когда отключать slot sharing:
- Когда оператор очень тяжёлый (большой state, много CPU) и мешает соседям в slot.
- Когда нужно изолировать failure domain — если subtask упадёт в shared slot, другие subtasks в этом slot тоже остановятся.
Disabling через slot sharing groups: оператор может быть в named group, и его slots делятся только с операторами этой же group.
stream.flatMap(...).slotSharingGroup("heavy");
Связь JobManager - TaskManager
JobManager
JobManager — центральный координатор. Получает запросы от клиентов, командует TaskManager'ам, обрабатывает heartbeat'ы.TaskManager 1
TaskManager 1: выполняет назначенные subtasks. Регулярно отправляет heartbeat в JobManager и metric reports.TaskManager 2
TaskManager 2: то же самое — независимый worker. TaskManager'ы общаются друг с другом через сетевые channels (shuffle data).TaskManager N
TaskManager N: горизонтально масштабируется. В Native K8s mode TaskManager'ы создаются динамически через K8s API когда нужны slots.Что передаётся:
- JobManager -> TaskManager: команды (deploy subtask, trigger checkpoint, cancel job).
- TaskManager -> JobManager: heartbeats (каждые ~10 секунд), метрики, статус subtasks, checkpoint acknowledgements.
- TaskManager -> TaskManager: данные между операторами (shuffle, keyBy). Это бо́льшая часть network traffic в кластере.
Что происходит при сбое
TaskManager падает (OOM, kill, hardware): JobManager обнаруживает через missed heartbeat. JobMaster инициирует failover: помечает все subtasks этого TaskManager’а как FAILED, восстанавливает их state из последнего checkpoint, и размещает на других TaskManager’ах. Если slots на других TaskManager’ах не хватает — job ждёт ресурсов (в Native K8s — новый TaskManager pod создаётся автоматически).
JobManager падает (без HA): job полностью останавливается. После рестарта JobManager — job нужно вручную submit заново (с savepoint, если он есть).
JobManager падает (с HA): standby JobManager берёт лидерство. JobMaster восстанавливает state из ZooKeeper / Kubernetes ConfigMap, и продолжает координацию. TaskManager’ы обнаруживают нового лидера, переподключаются. Это покрывается в модуле 15.
В Standalone mode (как наш docker compose) HA для JobManager не настроена. Если JobManager упадёт — job останавливается. Для production обязательно используйте Native Kubernetes mode с Flink Kubernetes Operator (модуль 15).
Memory model: что внутри JVM
Каждый процесс Flink (и JobManager, и TaskManager) имеет сложную memory model. Понимание её — критично для troubleshooting OOM.
TaskManager memory layout (для одного TaskManager):
| Сегмент | Назначение |
|---|---|
| JVM Heap (Framework) | Внутренний код Flink, фреймворковые объекты |
| JVM Heap (Task) | Ваш код, user objects, on-heap state |
| Off-heap (Direct) | Network buffers, на сегменты для shuffle |
| Off-heap (Managed) | RocksDB state, сортировка, hash tables |
| JVM Metaspace | Class metadata |
| JVM Overhead | Threads, native libraries |
Total memory TaskManager = сумма всех сегментов. Управляется через taskmanager.memory.process.size (общая) или детально через taskmanager.memory.task.heap.size, taskmanager.memory.managed.size и так далее.
JobManager memory layout — проще:
| Сегмент | Назначение |
|---|---|
| JVM Heap | Dispatcher, ResourceManager, JobMaster, in-memory ExecutionGraph |
| Off-heap | Network для RPC с TaskManager’ами |
| JVM Overhead | Threads, native libraries |
Управляется через jobmanager.memory.process.size или детально.
Когда происходит OOM
- TaskManager OOM Heap: ваш код держит слишком много в памяти (state в HashMapStateBackend, аккумуляторы, кеши). Решение: увеличить heap или использовать RocksDB state backend.
- TaskManager OOM Direct: network buffers недостаточно для высокой throughput. Решение: увеличить
taskmanager.memory.network.fractionили absolute size. - TaskManager OOM Managed: RocksDB state не помещается. Решение: увеличить managed memory или переход на ForSt (disaggregated state).
- JobManager OOM Heap: ExecutionGraph слишком большой (тысячи subtasks), или хранится много метаданных. Решение: увеличить jobmanager heap.
Топология примера: production кластер
Реальный production Flink кластер для CDC pipeline:
JobManager (1 pod, 2 GB)
JobManager: 1 pod в Kubernetes Deployment. Memory: 2 GB. Heap: 1.5 GB. Стандарт для не очень больших jobs (sub-1000 subtasks). HA через K8s API server (high availability config).TM 1: 4 slots, 8 GB
TaskManager 1: 4 slots. Memory: 8 GB total. Heap: 2 GB, Managed (RocksDB): 4 GB, Network: 1 GB. Запускает 4 subtasks.TM 2: 4 slots, 8 GB
TaskManager 2: те же ресурсы и конфигурация. Кластер однородный — облегчает scheduling и capacity planning.TM 3: 4 slots, 8 GB
TaskManager 3: тоже 4 slots. Job с parallelism=16 поместится точно в кластер 4 TM x 4 slots = 16 slots.TM 4: 4 slots, 8 GB
TaskManager 4: завершает кластер. Total: 16 slots. С slot sharing — может выполнять job с parallelism=16 даже если у него много операторов.Это типичный размер. Большие production кластеры — десятки TaskManager’ов с 8-16 slots каждый, JobManager с 4-8 GB heap.
Slot calculation: сколько мне нужно
Вопрос практический: “У меня job с parallelism=20 на самом тяжёлом операторе. Сколько slots мне нужно?”
С slot sharing (по умолчанию): 20 slots. Это max parallelism среди операторов.
С slot sharing groups (если оператор выделен в свою group): больше, в зависимости от структуры.
Без slot sharing (явно отключено): сумма parallelism всех операторов. Для job с 5 операторами по parallelism=20 каждый — 100 slots. Это очень редкий случай.
Memory на slot: зависит от backend. Для HashMapStateBackend — heap memory растёт с размером state. Для RocksDB — managed memory. Capacity planning — это модуль 16, но базовый принцип такой:
- Stateless операторы: 100-500 MB heap на slot достаточно.
- Stateful с маленьким state (меньше 100 MB на ключ): 1-2 GB heap или 2-4 GB managed memory.
- Stateful с большим state (windows, joins): 4-8 GB managed memory + RocksDB.
- Stateful с очень большим state (терабайты): ForSt disaggregated state с S3 backing.
Попробуй сам
Откройте Web UI вашего локального кластера:
- JobManager: найдите в левом меню “Job Manager”. Какая Memory configuration? Сколько Heap, сколько Off-heap?
- TaskManagers: список всех TaskManager’ов в кластере. Сколько у каждого slots? Сколько available (свободно) vs allocated (занято)?
- Запустите job (WordCount из урока 00.3). Посмотрите Job Graph — какие subtasks на каком TaskManager? Используется ли slot sharing?
- Symuliruйте сбой: остановите один TaskManager (
docker stop flink-tm-2). Что произошло в Web UI? Видны ли в JobManager логах сообщения о потере slots?
Опыт реальной работы с кластером в маленьком масштабе — единственный способ понять модель.