Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 18 мин
Средний
JobManagerTaskManagerSlotsArchitectureDispatcher

JobManager и TaskManager: основная архитектура

Flink-кластер состоит из двух типов процессов: JobManager (координатор) и TaskManager (worker). Этот урок — обзор того, как они устроены и взаимодействуют. Без deep dive в internals — для production-инженера достаточно понимать модель, чтобы читать логи, конфигурировать ресурсы, и интерпретировать поведение в Web UI.

К концу урока вы будете знать, кто за что отвечает, и сможете правильно отвечать на вопросы типа “почему JobManager OOM, а TaskManager в порядке” или “сколько slots мне нужно для job с parallelism=20”.


JobManager: мозг кластера

JobManager — это master-процесс. В кластере типично 1 активный JobManager (с опциональным standby в HA-режиме). Он не обрабатывает данные сам — он координирует TaskManager’ов.

Внутри JobManager — три логических компонента:

Внутренняя структура JobManager
JobManager (single JVM process)Один JVM-процесс на JobManager. В Application mode каждый job имеет свой JobManager (изолированно). В Session mode — один JobManager обслуживает несколько jobs.

Dispatcher (REST + WebUI)

Dispatcher: REST API + WebUI. Принимает submit job, возвращает JobID, обслуживает запросы статуса. Это интерфейс, через который клиенты взаимодействуют с кластером.

ResourceManager (slot allocation)

ResourceManager: управляет TaskManager slots. Решает, на каком TaskManager разместить subtask. В Native Kubernetes mode общается с K8s API server для запроса pods.

JobMaster (per-job coordinator)

JobMaster: per-job координатор. Один JobMaster на job. Управляет ExecutionGraph, scheduling subtasks, checkpoints, failover. В Application mode — один JobMaster на JobManager.

Dispatcher

Dispatcher — это REST API и Web UI endpoint. Когда вы делаете flink run job.jar, CLI отправляет HTTP-запрос на Dispatcher (POST /jobs). Dispatcher принимает JobGraph, сохраняет JAR в blob store, и запускает JobMaster для этого job.

Также Dispatcher отдаёт Web UI — то, что вы видите на http://localhost:8081. Все ваши клики в UI — это REST-запросы к Dispatcher.

ResourceManager

ResourceManager управляет slot’ами — ресурсами для выполнения subtasks. Когда JobMaster нужны slots для развёртывания job (например, 4 slot для parallelism=4), он просит ResourceManager.

В разных режимах ResourceManager работает по-разному:

  • Standalone (наш docker compose): TaskManager’ы зарегистрированы статически. ResourceManager выделяет slots из уже доступных.
  • Native Kubernetes: ResourceManager общается с Kubernetes API server. Когда нужны slots, создаёт новые TaskManager pods через K8s API. Это позволяет автоматический скейлинг.
  • YARN: то же самое, но через YARN ResourceManager.

JobMaster

JobMaster — это per-job компонент. Один JobMaster координирует один job: строит ExecutionGraph, размещает subtasks на TaskManager’ах, триггерит checkpoints, обрабатывает failover при падении TaskManager.

В Application mode у вас один job на JobManager — следовательно, один JobMaster. В Session mode на одном JobManager могут быть несколько JobMaster’ов (по числу запущенных jobs).

Внутренности Dispatcher и JobMaster
NOTE

Эти три компонента живут в одном JVM-процессе. Вы НЕ видите их как отдельные сущности в логах — они логически разделены внутри одного JobManager. Когда говорят “JobManager упал” — упал весь JVM, включая все три компонента.


TaskManager: рабочая лошадка

TaskManager — это worker-процесс. В кластере типично много TaskManager’ов (от 2-3 для маленьких задач до сотен для production). Каждый TaskManager — отдельный JVM-процесс с настроенным количеством slots.

Slots

Slot — единица выделения ресурсов в TaskManager’е. Если TaskManager имеет 4 slots, он может выполнять до 4 subtasks одновременно. Slots — это static configuration: taskmanager.numberOfTaskSlots: 4.

Каждый slot получает фиксированную долю memory TaskManager’а. Если TaskManager имеет 4 GB heap и 4 slots, каждый slot — примерно 1 GB. (Точнее, есть network memory, off-heap, managed memory — но общий принцип такой.)

Распределение slots по job parallelism:

  • Job с parallelism=10 нужно 10 slots ВСЕГО (с slot sharing — по одному slot на параллельный pipeline).
  • Эти 10 slots могут быть распределены: 2 TaskManager’а по 5 slots, или 5 TaskManager’ов по 2 slots, и так далее.
  • Если slots не хватает (например, кластер имеет 8 slots total) — job в RESCALING / SUSPENDED state, ждёт ресурсов.

Slot sharing

По умолчанию slot sharing включён: subtasks разных операторов одного job могут делить slot. Это резко уменьшает количество нужных slots.

Пример: job с операторами Source/parallelism=4, Map/parallelism=4, Aggregate/parallelism=4, Sink/parallelism=4. Без slot sharing нужно 16 slots. С slot sharing — 4 slots (по одному pipeline в каждом slot).

Когда отключать slot sharing:

  • Когда оператор очень тяжёлый (большой state, много CPU) и мешает соседям в slot.
  • Когда нужно изолировать failure domain — если subtask упадёт в shared slot, другие subtasks в этом slot тоже остановятся.

Disabling через slot sharing groups: оператор может быть в named group, и его slots делятся только с операторами этой же group.

stream.flatMap(...).slotSharingGroup("heavy");

Связь JobManager - TaskManager

Communication: JobManager и TaskManager'ы

JobManager

JobManager — центральный координатор. Получает запросы от клиентов, командует TaskManager'ам, обрабатывает heartbeat'ы.
RPC: deploy subtasks, trigger checkpoints

TaskManager 1

TaskManager 1: выполняет назначенные subtasks. Регулярно отправляет heartbeat в JobManager и metric reports.

TaskManager 2

TaskManager 2: то же самое — независимый worker. TaskManager'ы общаются друг с другом через сетевые channels (shuffle data).

TaskManager N

TaskManager N: горизонтально масштабируется. В Native K8s mode TaskManager'ы создаются динамически через K8s API когда нужны slots.
heartbeat: я живой, мои метрики

Что передаётся:

  • JobManager -> TaskManager: команды (deploy subtask, trigger checkpoint, cancel job).
  • TaskManager -> JobManager: heartbeats (каждые ~10 секунд), метрики, статус subtasks, checkpoint acknowledgements.
  • TaskManager -> TaskManager: данные между операторами (shuffle, keyBy). Это бо́льшая часть network traffic в кластере.

Что происходит при сбое

TaskManager падает (OOM, kill, hardware): JobManager обнаруживает через missed heartbeat. JobMaster инициирует failover: помечает все subtasks этого TaskManager’а как FAILED, восстанавливает их state из последнего checkpoint, и размещает на других TaskManager’ах. Если slots на других TaskManager’ах не хватает — job ждёт ресурсов (в Native K8s — новый TaskManager pod создаётся автоматически).

JobManager падает (без HA): job полностью останавливается. После рестарта JobManager — job нужно вручную submit заново (с savepoint, если он есть).

JobManager падает (с HA): standby JobManager берёт лидерство. JobMaster восстанавливает state из ZooKeeper / Kubernetes ConfigMap, и продолжает координацию. TaskManager’ы обнаруживают нового лидера, переподключаются. Это покрывается в модуле 15.

WARNING

В Standalone mode (как наш docker compose) HA для JobManager не настроена. Если JobManager упадёт — job останавливается. Для production обязательно используйте Native Kubernetes mode с Flink Kubernetes Operator (модуль 15).


Memory model: что внутри JVM

Каждый процесс Flink (и JobManager, и TaskManager) имеет сложную memory model. Понимание её — критично для troubleshooting OOM.

TaskManager memory layout (для одного TaskManager):

СегментНазначение
JVM Heap (Framework)Внутренний код Flink, фреймворковые объекты
JVM Heap (Task)Ваш код, user objects, on-heap state
Off-heap (Direct)Network buffers, на сегменты для shuffle
Off-heap (Managed)RocksDB state, сортировка, hash tables
JVM MetaspaceClass metadata
JVM OverheadThreads, native libraries

Total memory TaskManager = сумма всех сегментов. Управляется через taskmanager.memory.process.size (общая) или детально через taskmanager.memory.task.heap.size, taskmanager.memory.managed.size и так далее.

JobManager memory layout — проще:

СегментНазначение
JVM HeapDispatcher, ResourceManager, JobMaster, in-memory ExecutionGraph
Off-heapNetwork для RPC с TaskManager’ами
JVM OverheadThreads, native libraries

Управляется через jobmanager.memory.process.size или детально.

Когда происходит OOM

  • TaskManager OOM Heap: ваш код держит слишком много в памяти (state в HashMapStateBackend, аккумуляторы, кеши). Решение: увеличить heap или использовать RocksDB state backend.
  • TaskManager OOM Direct: network buffers недостаточно для высокой throughput. Решение: увеличить taskmanager.memory.network.fraction или absolute size.
  • TaskManager OOM Managed: RocksDB state не помещается. Решение: увеличить managed memory или переход на ForSt (disaggregated state).
RocksDB internals: как устроено managed memory
  • JobManager OOM Heap: ExecutionGraph слишком большой (тысячи subtasks), или хранится много метаданных. Решение: увеличить jobmanager heap.

Топология примера: production кластер

Реальный production Flink кластер для CDC pipeline:

Production кластер: 1 JobManager + 4 TaskManager

JobManager (1 pod, 2 GB)

JobManager: 1 pod в Kubernetes Deployment. Memory: 2 GB. Heap: 1.5 GB. Стандарт для не очень больших jobs (sub-1000 subtasks). HA через K8s API server (high availability config).
управляет

TM 1: 4 slots, 8 GB

TaskManager 1: 4 slots. Memory: 8 GB total. Heap: 2 GB, Managed (RocksDB): 4 GB, Network: 1 GB. Запускает 4 subtasks.

TM 2: 4 slots, 8 GB

TaskManager 2: те же ресурсы и конфигурация. Кластер однородный — облегчает scheduling и capacity planning.

TM 3: 4 slots, 8 GB

TaskManager 3: тоже 4 slots. Job с parallelism=16 поместится точно в кластер 4 TM x 4 slots = 16 slots.

TM 4: 4 slots, 8 GB

TaskManager 4: завершает кластер. Total: 16 slots. С slot sharing — может выполнять job с parallelism=16 даже если у него много операторов.

Это типичный размер. Большие production кластеры — десятки TaskManager’ов с 8-16 slots каждый, JobManager с 4-8 GB heap.


Slot calculation: сколько мне нужно

Вопрос практический: “У меня job с parallelism=20 на самом тяжёлом операторе. Сколько slots мне нужно?”

С slot sharing (по умолчанию): 20 slots. Это max parallelism среди операторов.

С slot sharing groups (если оператор выделен в свою group): больше, в зависимости от структуры.

Без slot sharing (явно отключено): сумма parallelism всех операторов. Для job с 5 операторами по parallelism=20 каждый — 100 slots. Это очень редкий случай.

Memory на slot: зависит от backend. Для HashMapStateBackend — heap memory растёт с размером state. Для RocksDB — managed memory. Capacity planning — это модуль 16, но базовый принцип такой:

  • Stateless операторы: 100-500 MB heap на slot достаточно.
  • Stateful с маленьким state (меньше 100 MB на ключ): 1-2 GB heap или 2-4 GB managed memory.
  • Stateful с большим state (windows, joins): 4-8 GB managed memory + RocksDB.
  • Stateful с очень большим state (терабайты): ForSt disaggregated state с S3 backing.

Попробуй сам

Откройте Web UI вашего локального кластера:

  1. JobManager: найдите в левом меню “Job Manager”. Какая Memory configuration? Сколько Heap, сколько Off-heap?
  2. TaskManagers: список всех TaskManager’ов в кластере. Сколько у каждого slots? Сколько available (свободно) vs allocated (занято)?
  3. Запустите job (WordCount из урока 00.3). Посмотрите Job Graph — какие subtasks на каком TaskManager? Используется ли slot sharing?
  4. Symuliruйте сбой: остановите один TaskManager (docker stop flink-tm-2). Что произошло в Web UI? Видны ли в JobManager логах сообщения о потере slots?

Опыт реальной работы с кластером в маленьком масштабе — единственный способ понять модель.

Проверка знанийKnowledge check
У вас Flink job с операторами Source/parallelism=4, KeyedAggregate/parallelism=8, Sink/parallelism=4. Slot sharing включён (по умолчанию). Сколько slot вам нужно для размещения job, и какая логика?
ОтветAnswer
8 slots. Логика: с slot sharing subtasks разных операторов одного job могут делить slot. Минимальное количество slots = максимальный parallelism среди операторов. В нашем случае max(4, 8, 4) = 8. Каждый из 8 slots будет выполнять: 1 subtask Source (в 4 из 8 slot, остальные 4 slot работают только с Aggregate), 1 subtask KeyedAggregate (во всех 8 slot), 1 subtask Sink (в 4 из 8 slot). Это pipelined parallelism — каждый slot обрабатывает свою долю данных через всю цепочку. Без slot sharing нужно было бы 4+8+4=16 slots. Включённый slot sharing — главная оптимизация ресурсов в Flink, и её отключение оправдано редко.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие три логических компонента живут внутри JobManager-процесса?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4