JobManager в Flink — это не один компонент, а процесс, содержащий три главных под-компонента. Они работают вместе, но имеют чёткие обязанности и общаются между собой через RpcGateway (Pekko/Akka actor-style messaging). Понимание разделения этих обязанностей — фундамент для дебаггинга JM-логов: при ошибке “Could not allocate slot” вы знаете, что копать в ResourceManager + SlotManager, а не в JobMaster или Dispatcher.
В этом уроке препарируем JM до этих трёх компонентов, проследим путь job submission от kubectl/CLI до состояния RUNNING, и разберём, какие классы за что отвечают.
JobManager и TaskManager: основная архитектура (практика) Анатомия процесса: PID, address space, fd-таблицаОдин процесс, три компонента
Сначала важно понять: в одном JM-процессе физически живут три actor-like компонента, плюс ещё несколько вспомогательных.
Класс org.apache.flink.runtime.entrypoint.ClusterEntrypoint — это main класс, который запускается при старте JM-процесса. Он инстанцирует и запускает все эти компоненты. Конкретный entrypoint зависит от deployment:
StandaloneSessionClusterEntrypoint— standalone cluster, session mode.KubernetesSessionClusterEntrypoint— K8s session.KubernetesJobClusterEntrypoint— K8s application mode (один job = один cluster).YarnSessionClusterEntrypoint— YARN.
Они немного отличаются (Standalone vs K8s vs YARN), но все включают в себя одни и те же три главные подсистемы: Dispatcher, ResourceManager, JobMaster.
Dispatcher: входная дверь
Dispatcher — это первый компонент, к которому приходит любой запрос на job submission. Его обязанности:
- Принимать REST-запросы на submission, cancellation, status query.
- Создавать JobMaster instances при successful submission.
- Хранить ссылки на active JobMaster-ы (через
JobManagerRunner). - Маршрутизировать запросы к конкретному JobMaster по jobID.
- Восстанавливать job-ы из JobGraphStore при failover (в HA).
Главный класс: org.apache.flink.runtime.dispatcher.Dispatcher (это abstract — конкретные impl: StandaloneDispatcher, MiniDispatcher).
Ключевые методы:
public CompletableFuture<Acknowledge> submitJob(
JobGraph jobGraph,
Time timeout) {
// 1. Persist JobGraph в JobGraphStore (для HA)
// 2. Create JobManagerRunner
// 3. Start JobMaster
// 4. Return Acknowledge
}
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
// 1. Find JobMaster by jobId
// 2. Delegate cancel to JobMaster
// 3. Wait for completion
// 4. Remove from JobGraphStore
}
Когда вы делаете flink run -m localhost:8081 my-job.jar, под капотом происходит:
- CLI uploads JAR к BlobServer.
- CLI converts main(String[] args) -> StreamGraph -> JobGraph (на client side).
- CLI делает HTTP POST к
/jars/upload(JAR) и/jars/{jar-id}/run(job submission). - REST endpoint конвертирует request в Dispatcher#submitJob call.
- Dispatcher создаёт JobManagerRunner и стартует JobMaster.
- Возвращается JobID клиенту.
После этого Dispatcher держит ссылку на этот JobMaster в Map<JobID, JobManagerRunner>. Любые последующие requests (/jobs/{job-id}/...) маршрутизируются туда.
ResourceManager: управление слотами
ResourceManager — компонент, который знает, какие TaskManager-ы существуют, сколько у них слотов, и выдаёт слоты JobMaster-ам по запросу.
Главный класс: org.apache.flink.runtime.resourcemanager.ResourceManager (abstract — конкретные impl: StandaloneResourceManager, KubernetesResourceManager, YarnResourceManager).
Inner component: SlotManager (SlotManagerImpl) — отвечает за match-making между slot requests от JobMaster-ов и available slots на TaskManager-ах.
Жизненный цикл TaskManager в ResourceManager:
Slot allocation: JobMaster ← ResourceManager
Когда JobMaster нужны слоты для запуска job-а, он делает request к ResourceManager. ResourceManager делегирует к SlotManager, который match-ит request с available slots.
// JobMaster, упрощённо:
SlotPool slotPool = ...;
slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, allocationID, timeout);
// SlotPool делает RPC к ResourceManager:
// resourceManager.requestSlot(jobMasterID, slotRequest, timeout);
ResourceManager -> SlotManager#registerSlotRequest. SlotManager пытается найти available slot:
- Если есть свободный slot, который удовлетворяет ResourceProfile (cpu, memory), и принадлежит TM, доступному JobMaster-у — выделяет.
- Если нет, и cluster может масштабироваться (K8s или YARN) — запрашивает новый TM от Cluster Manager (создаётся новый pod, новый container).
- Возвращает slot offer.
JobMaster получает slot, и может разместить там Task.
Standalone vs K8s vs YARN: где отличия
- StandaloneResourceManager: TM-ы предзапущены вручную или через скрипт. RM не может запросить новые TM-ы — фиксированный набор. Если slots не хватает, request fails по timeout.
- KubernetesResourceManager: RM может через K8s API создать новые TM pods. Pool TM-ов может расти под нагрузкой и shrink-ить при бездействии.
- YarnResourceManager: similar, но через YARN ResourceManager allocates containers.
Это критично для понимания production behavior: если у вас “stuck Adaptive Scheduler” или slot request timeout, в Standalone deployment решение — увеличить TM count вручную, в K8s — проверить configuration K8s namespace (есть ли квоты, доступны ли nodes).
JobMaster: per-job orchestrator
JobMaster — это душа конкретного job-а. Каждый running job имеет свой JobMaster instance внутри JM-процесса. JobMaster:
- Владеет ExecutionGraph — runtime-граф job-а.
- Общается с TaskExecutor-ами через RPC: deploy Task, trigger checkpoint, cancel Task.
- Управляет slot allocation через SlotPool (заказывает слоты у ResourceManager).
- Координирует checkpoints через CheckpointCoordinator (модуль 06).
- Обрабатывает failures — рестартит Task-и, инициирует partial или full job restart.
Главный класс: org.apache.flink.runtime.jobmaster.JobMaster.
Структура JobMaster:
JobMaster — самый “толстый” компонент. Большинство interesting things в JM-логах происходят именно здесь.
Sequence flow: от submission до RUNNING
Покажем полный путь job-а от flink run до состояния RUNNING.
Это happy path. В реальности на каждом шаге может произойти ошибка:
- На m3: если JobGraph невалиден (deserialization error), Dispatcher отвергает.
- На m5: если RM недоступен, JobMaster fails с registration timeout.
- На m6-m8: если slots не хватает, slot request fails с timeout, scheduler делает restart с backoff.
- На m9: если TM упал между offerSlot и submitTask, операция fails.
- На m10: если BlobServer недоступен или JAR corrupted, Task fails to deploy.
- На m11: если operator.open() throws, Task fails и эскалирует к JobMaster.
Каждая из этих ошибок генерирует специфические log messages в JM/TM. Знание sequence помогает диагностировать.
Application mode vs session mode
Дополнительная деталь, которую важно знать: Flink поддерживает два режима запуска cluster.
Session mode
JobManager + TaskManagers стартуют отдельно от job-а. Это persistent cluster, к которому job-ы submit-ятся по одному.
./bin/start-cluster.sh # запустил session cluster
./bin/flink run -m localhost:8081 job1.jar # submit job 1
./bin/flink run -m localhost:8081 job2.jar # submit job 2 в тот же cluster
Преимущество: shared resources, dispatcher reuses тот же JM-процесс для multiple jobs. Недостаток: один job может влиять на другой (memory pressure, classloader issues).
Application mode
JobManager и job стартуют вместе, один JM = один job. Это рекомендуемый mode для production.
./bin/flink run-application -t kubernetes-application \
-c MyMainClass my-job.jar
Каждый job получает свой JM-процесс. Isolation полная, но overhead больше (один JM на job).
В K8s Operator это типично выглядит как один FlinkDeployment CRD = один application mode cluster.
Code references: что читать
Если хотите видеть это всё в коде:
org.apache.flink.runtime.dispatcher.Dispatcher— главный класс Dispatcher. МетодsubmitJob— главный.org.apache.flink.runtime.jobmanager.JobManagerRunner— wrapper around JobMaster. Отвечает за leader election (HA), создание JobMaster.org.apache.flink.runtime.jobmaster.JobMaster— JobMaster main class. Методstartинициализирует ExecutionGraph и Scheduler.org.apache.flink.runtime.resourcemanager.ResourceManager— abstract base для RM implementations.org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl— slot match-making logic.org.apache.flink.runtime.taskexecutor.TaskExecutor— TaskManager-side main class. МетодsubmitTaskпринимает Task deployment.org.apache.flink.runtime.entrypoint.ClusterEntrypoint— main entry для JM-процесса.
Главные RPC gateway interfaces:
DispatcherGateway— что Dispatcher exposes.JobMasterGateway— что JobMaster exposes (TaskExecutor -> JobMaster calls).ResourceManagerGateway— что RM exposes (TaskExecutor -> RM calls).TaskExecutorGateway— что TM exposes (JobMaster -> TaskExecutor calls).
Эти interfaces — публичный API между under-components. Если хотите понять, что один компонент может попросить у другого — читайте gateway interface.
Когда дебажите JM-логи, ищите эти keywords: “Dispatcher” — это про submission/cancellation, “ResourceManager” — про slot management и TM registration, “JobMaster” — про конкретный job. По ним можно быстро локализовать, в каком компоненте проблема.
Чтение source code
Конкретный exercise. Откройте в IDE:
org.apache.flink.runtime.dispatcher.Dispatcher— найдите методsubmitJob. Прочитайте его. Заметьте, что он async (возвращает CompletableFuture).- Прыгните на
JobManagerRunner(через “Find usages” в IDE) — найдёте, как Dispatcher создаёт JobManagerRunner. - Зайдите в
JobManagerRunner#startJobManagerRunnerServices— увидите создание JobMaster черезJobMasterServiceLeadershipRunnerFactory. - Откройте
JobMaster#start— увидите инициализацию ExecutionGraph и Scheduler.
Этот path даёт хорошее представление, как реально устроен flow submission в коде. На первый раз может быть много прыжков, но через 2-3 чтения это станет привычным.
Что дальше
Следующий урок — про TaskManager и slot model. Разберём, что такое slot, как slots работают как memory unit, что такое slot sharing groups и co-location constraints.
После — три урока модуля 02: graph transformations (StreamGraph -> JobGraph -> ExecutionGraph), HA + leader election, scheduler types.