Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 26 мин
Продвинутый
JobManagerDispatcherResourceManagerJobMasterJob submissionArchitecture

JobManager в Flink — это не один компонент, а процесс, содержащий три главных под-компонента. Они работают вместе, но имеют чёткие обязанности и общаются между собой через RpcGateway (Pekko/Akka actor-style messaging). Понимание разделения этих обязанностей — фундамент для дебаггинга JM-логов: при ошибке “Could not allocate slot” вы знаете, что копать в ResourceManager + SlotManager, а не в JobMaster или Dispatcher.

В этом уроке препарируем JM до этих трёх компонентов, проследим путь job submission от kubectl/CLI до состояния RUNNING, и разберём, какие классы за что отвечают.

JobManager и TaskManager: основная архитектура (практика) Анатомия процесса: PID, address space, fd-таблица

Один процесс, три компонента

Сначала важно понять: в одном JM-процессе физически живут три actor-like компонента, плюс ещё несколько вспомогательных.

Под-компоненты JobManager-процесса
Dispatcherorg.apache.flink.runtime.dispatcher.Dispatcher. Принимает job submissions через REST API. Создаёт JobMaster instance per job. Маршрутизирует REST requests конкретному JobMaster.
ResourceManagerorg.apache.flink.runtime.resourcemanager.ResourceManager. Управляет lifecycle TaskManager-ов. Outsourced slot management — SlotManager как inner component. Имеет implementations: Standalone, K8s, YARN.
JobMaster (N штук)org.apache.flink.runtime.jobmaster.JobMaster. Один instance per job. Владеет ExecutionGraph, общается с TaskExecutor-ами, инициирует чекпоинты. Создаётся Dispatcher-ом при submission.
RestEndpointHTTP/HTTPS endpoint. Принимает REST calls от kubectl, flink CLI, Web UI. Делегирует к Dispatcher.
WebMonitorWeb UI на :8081. Использует тот же REST endpoint, но обрабатывает запросы из браузера.
MetricRegistryСобирает метрики из всех под-компонентов. Экспортирует в Prometheus или другие reporters.
HA Servicesorg.apache.flink.runtime.highavailability.HighAvailabilityServices. Leader election (через ZK или K8s), JobGraphStore (persistent), CompletedCheckpointStore.
BlobServerHTTP server для distribution job JAR-ов и shipped files в TM-ы. JM хранит uploaded JAR'ы, TM их скачивают перед запуском Task.
HeartbeatServicesHeartbeat management между JM и TM, между JM и RM. TM пингует RM каждые 10s (default), потеря 3 в ряд = TM считается dead.

Класс org.apache.flink.runtime.entrypoint.ClusterEntrypoint — это main класс, который запускается при старте JM-процесса. Он инстанцирует и запускает все эти компоненты. Конкретный entrypoint зависит от deployment:

  • StandaloneSessionClusterEntrypoint — standalone cluster, session mode.
  • KubernetesSessionClusterEntrypoint — K8s session.
  • KubernetesJobClusterEntrypoint — K8s application mode (один job = один cluster).
  • YarnSessionClusterEntrypoint — YARN.

Они немного отличаются (Standalone vs K8s vs YARN), но все включают в себя одни и те же три главные подсистемы: Dispatcher, ResourceManager, JobMaster.

Dispatcher: входная дверь

Dispatcher — это первый компонент, к которому приходит любой запрос на job submission. Его обязанности:

  1. Принимать REST-запросы на submission, cancellation, status query.
  2. Создавать JobMaster instances при successful submission.
  3. Хранить ссылки на active JobMaster-ы (через JobManagerRunner).
  4. Маршрутизировать запросы к конкретному JobMaster по jobID.
  5. Восстанавливать job-ы из JobGraphStore при failover (в HA).

Главный класс: org.apache.flink.runtime.dispatcher.Dispatcher (это abstract — конкретные impl: StandaloneDispatcher, MiniDispatcher).

Ключевые методы:

public CompletableFuture<Acknowledge> submitJob(
        JobGraph jobGraph,
        Time timeout) {
    // 1. Persist JobGraph в JobGraphStore (для HA)
    // 2. Create JobManagerRunner
    // 3. Start JobMaster
    // 4. Return Acknowledge
}

public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
    // 1. Find JobMaster by jobId
    // 2. Delegate cancel to JobMaster
    // 3. Wait for completion
    // 4. Remove from JobGraphStore
}

Когда вы делаете flink run -m localhost:8081 my-job.jar, под капотом происходит:

  1. CLI uploads JAR к BlobServer.
  2. CLI converts main(String[] args) -> StreamGraph -> JobGraph (на client side).
  3. CLI делает HTTP POST к /jars/upload (JAR) и /jars/{jar-id}/run (job submission).
  4. REST endpoint конвертирует request в Dispatcher#submitJob call.
  5. Dispatcher создаёт JobManagerRunner и стартует JobMaster.
  6. Возвращается JobID клиенту.

После этого Dispatcher держит ссылку на этот JobMaster в Map<JobID, JobManagerRunner>. Любые последующие requests (/jobs/{job-id}/...) маршрутизируются туда.

ResourceManager: управление слотами

ResourceManager — компонент, который знает, какие TaskManager-ы существуют, сколько у них слотов, и выдаёт слоты JobMaster-ам по запросу.

Главный класс: org.apache.flink.runtime.resourcemanager.ResourceManager (abstract — конкретные impl: StandaloneResourceManager, KubernetesResourceManager, YarnResourceManager).

Inner component: SlotManager (SlotManagerImpl) — отвечает за match-making между slot requests от JobMaster-ов и available slots на TaskManager-ах.

Жизненный цикл TaskManager в ResourceManager:

TaskManager registration flow
TM startTaskManager-процесс стартует (JVM с TaskManagerRunner). Загружает Configuration, инициализирует MemoryManager, NetworkEnvironment.
TM -> RM: RegisterTaskExecutor RPCTaskExecutor вызывает ResourceManagerGateway#registerTaskExecutor(taskExecutorAddress, resourceID, dataPort, jmxPort, hardwareDescription, ...).
RM accepts: TaskExecutorConnectionResourceManager создаёт WorkerRegistration с информацией о TM. SlotManager делает register у себя — учитывает slots TM-а как доступные.
RM -> TM: heartbeat начатHeartbeatServices начинают периодический ping (10s default). Потеря heartbeats -> TM считается dead, slots removed.
TM ready for SlotRequestsПосле successful registration, slots TM-а доступны для allocation. SlotManager может ответить на slot requests от JobMaster-ов.

Slot allocation: JobMaster ← ResourceManager

Когда JobMaster нужны слоты для запуска job-а, он делает request к ResourceManager. ResourceManager делегирует к SlotManager, который match-ит request с available slots.

// JobMaster, упрощённо:
SlotPool slotPool = ...;
slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, allocationID, timeout);
// SlotPool делает RPC к ResourceManager:
//   resourceManager.requestSlot(jobMasterID, slotRequest, timeout);

ResourceManager -> SlotManager#registerSlotRequest. SlotManager пытается найти available slot:

  • Если есть свободный slot, который удовлетворяет ResourceProfile (cpu, memory), и принадлежит TM, доступному JobMaster-у — выделяет.
  • Если нет, и cluster может масштабироваться (K8s или YARN) — запрашивает новый TM от Cluster Manager (создаётся новый pod, новый container).
  • Возвращает slot offer.

JobMaster получает slot, и может разместить там Task.

Standalone vs K8s vs YARN: где отличия

  • StandaloneResourceManager: TM-ы предзапущены вручную или через скрипт. RM не может запросить новые TM-ы — фиксированный набор. Если slots не хватает, request fails по timeout.
  • KubernetesResourceManager: RM может через K8s API создать новые TM pods. Pool TM-ов может расти под нагрузкой и shrink-ить при бездействии.
  • YarnResourceManager: similar, но через YARN ResourceManager allocates containers.

Это критично для понимания production behavior: если у вас “stuck Adaptive Scheduler” или slot request timeout, в Standalone deployment решение — увеличить TM count вручную, в K8s — проверить configuration K8s namespace (есть ли квоты, доступны ли nodes).

JobMaster: per-job orchestrator

JobMaster — это душа конкретного job-а. Каждый running job имеет свой JobMaster instance внутри JM-процесса. JobMaster:

  1. Владеет ExecutionGraph — runtime-граф job-а.
  2. Общается с TaskExecutor-ами через RPC: deploy Task, trigger checkpoint, cancel Task.
  3. Управляет slot allocation через SlotPool (заказывает слоты у ResourceManager).
  4. Координирует checkpoints через CheckpointCoordinator (модуль 06).
  5. Обрабатывает failures — рестартит Task-и, инициирует partial или full job restart.

Главный класс: org.apache.flink.runtime.jobmaster.JobMaster.

Структура JobMaster:

Под-компоненты JobMaster
ExecutionGraphRuntime-граф job-а: каждая ExecutionVertex = одна параллельная инстанция оператора. State machine: CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED/FAILED.
SchedulerDefaultScheduler / AdaptiveScheduler / AdaptiveBatchScheduler. Решает, когда и как запускать ExecutionVertices.
SlotPoolorg.apache.flink.runtime.jobmaster.slotpool.SlotPool. Кеш available slots для этого job-а. Делает slot requests к ResourceManager.
CheckpointCoordinatorТриггерит checkpoints периодически, собирает ACK от TaskExecutor-ов, сохраняет CompletedCheckpoint metadata.
OperatorCoordinatorPer-operator coordinator (для Source V2 — SourceCoordinator; для CEP — CEP-coordinator). Управляет split discovery, dynamic patterns.
RestartBackoffStrategyСтратегия restart: FixedDelay, ExponentialDelay, FailureRate. Решает, когда рестартовать после fail.
JobMasterGatewayRPC gateway, через который other components (Dispatcher, TaskExecutor) общаются с JobMaster. Methods: requestJob, declineCheckpoint, triggerSavepoint, и т.д.
HeartbeatManagerHeartbeat tracking для всех зарегистрированных TaskExecutor-ов этого job-а. Потеря heartbeat -> mark TM как unreachable.

JobMaster — самый “толстый” компонент. Большинство interesting things в JM-логах происходят именно здесь.

Sequence flow: от submission до RUNNING

Покажем полный путь job-а от flink run до состояния RUNNING.

CLI
REST
Dispatcher
JobMaster
ResourceManager
TaskExecutor
POST /jars/uploadPOST /jars/{id}/runsubmitJob(JobGraph)create JobMaster + startconnect, register as JobMasterrequestSlot per vertexrequestSlot RPCofferSlotsubmitTask (для каждого vertex)load JAR from BlobServercreate Task + Thread + invoke()updateTaskExecutionState: RUNNINGjob status: RUNNINGGET /jobs/{id}/status

Это happy path. В реальности на каждом шаге может произойти ошибка:

  • На m3: если JobGraph невалиден (deserialization error), Dispatcher отвергает.
  • На m5: если RM недоступен, JobMaster fails с registration timeout.
  • На m6-m8: если slots не хватает, slot request fails с timeout, scheduler делает restart с backoff.
  • На m9: если TM упал между offerSlot и submitTask, операция fails.
  • На m10: если BlobServer недоступен или JAR corrupted, Task fails to deploy.
  • На m11: если operator.open() throws, Task fails и эскалирует к JobMaster.

Каждая из этих ошибок генерирует специфические log messages в JM/TM. Знание sequence помогает диагностировать.

Application mode vs session mode

Дополнительная деталь, которую важно знать: Flink поддерживает два режима запуска cluster.

Session mode

JobManager + TaskManagers стартуют отдельно от job-а. Это persistent cluster, к которому job-ы submit-ятся по одному.

./bin/start-cluster.sh                       # запустил session cluster
./bin/flink run -m localhost:8081 job1.jar   # submit job 1
./bin/flink run -m localhost:8081 job2.jar   # submit job 2 в тот же cluster

Преимущество: shared resources, dispatcher reuses тот же JM-процесс для multiple jobs. Недостаток: один job может влиять на другой (memory pressure, classloader issues).

Application mode

JobManager и job стартуют вместе, один JM = один job. Это рекомендуемый mode для production.

./bin/flink run-application -t kubernetes-application \
    -c MyMainClass my-job.jar

Каждый job получает свой JM-процесс. Isolation полная, но overhead больше (один JM на job).

В K8s Operator это типично выглядит как один FlinkDeployment CRD = один application mode cluster.

Code references: что читать

Если хотите видеть это всё в коде:

  • org.apache.flink.runtime.dispatcher.Dispatcher — главный класс Dispatcher. Метод submitJob — главный.
  • org.apache.flink.runtime.jobmanager.JobManagerRunner — wrapper around JobMaster. Отвечает за leader election (HA), создание JobMaster.
  • org.apache.flink.runtime.jobmaster.JobMaster — JobMaster main class. Метод start инициализирует ExecutionGraph и Scheduler.
  • org.apache.flink.runtime.resourcemanager.ResourceManager — abstract base для RM implementations.
  • org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl — slot match-making logic.
  • org.apache.flink.runtime.taskexecutor.TaskExecutor — TaskManager-side main class. Метод submitTask принимает Task deployment.
  • org.apache.flink.runtime.entrypoint.ClusterEntrypoint — main entry для JM-процесса.

Главные RPC gateway interfaces:

  • DispatcherGateway — что Dispatcher exposes.
  • JobMasterGateway — что JobMaster exposes (TaskExecutor -> JobMaster calls).
  • ResourceManagerGateway — что RM exposes (TaskExecutor -> RM calls).
  • TaskExecutorGateway — что TM exposes (JobMaster -> TaskExecutor calls).

Эти interfaces — публичный API между under-components. Если хотите понять, что один компонент может попросить у другого — читайте gateway interface.

TIP

Когда дебажите JM-логи, ищите эти keywords: “Dispatcher” — это про submission/cancellation, “ResourceManager” — про slot management и TM registration, “JobMaster” — про конкретный job. По ним можно быстро локализовать, в каком компоненте проблема.

Чтение source code

Конкретный exercise. Откройте в IDE:

  1. org.apache.flink.runtime.dispatcher.Dispatcher — найдите метод submitJob. Прочитайте его. Заметьте, что он async (возвращает CompletableFuture).
  2. Прыгните на JobManagerRunner (через “Find usages” в IDE) — найдёте, как Dispatcher создаёт JobManagerRunner.
  3. Зайдите в JobManagerRunner#startJobManagerRunnerServices — увидите создание JobMaster через JobMasterServiceLeadershipRunnerFactory.
  4. Откройте JobMaster#start — увидите инициализацию ExecutionGraph и Scheduler.

Этот path даёт хорошее представление, как реально устроен flow submission в коде. На первый раз может быть много прыжков, но через 2-3 чтения это станет привычным.

Что дальше

Следующий урок — про TaskManager и slot model. Разберём, что такое slot, как slots работают как memory unit, что такое slot sharing groups и co-location constraints.

После — три урока модуля 02: graph transformations (StreamGraph -> JobGraph -> ExecutionGraph), HA + leader election, scheduler types.

Проверка знанийKnowledge check
Production-инцидент: вы submit-ите job в Flink K8s session cluster через flink CLI. Job submission "проходит" (CLI возвращает JobID), но job застревает в состоянии CREATED, никогда не переходя в RUNNING. JM-логи показывают: "Requesting 32 slots for job ID xxx", потом тишина. Через 5 минут: "Could not allocate slots within slot request timeout". В каких компонентах JM искать root cause, и какие следующие шаги для диагностики?
ОтветAnswer
Root cause лежит в interaction между JobMaster#SlotPool и ResourceManager#SlotManager. Шаги диагностики: (1) Проверить ResourceManager view: GET /taskmanagers через REST — видны ли вообще registered TM-ы? Если 0 TM-ов registered — проблема в TM startup (K8s проблемы с pods: image pull, resource quotas, scheduling). (2) Если TM-ы есть, проверить total available slots vs requested 32. Сначала на JM Web UI (/taskmanagers tab), потом в логах RM искать: "Registering TaskExecutor ... 4 slots". Подсчитать total. Если total меньше 32 — нужно scale up TM count (в K8s Operator: увеличить spec.taskManager.replicas). (3) Если slots достаточно, проверить slot allocation logs RM: SlotManager должен распределять slots между requesters. Искать в логах "Allocating slot" — есть ли они? Если нет — возможно, есть mismatching ResourceProfile (например, JobMaster запрашивает slot с GPU, а в кластере все TM CPU-only). (4) Проверить heartbeats: TM ↔ RM heartbeat должен быть alive каждые 10s. В логах TM искать "ResourceManager heartbeat lost" — если есть, TM не доступен RM, его slots невидимы. (5) Если ничего не помогает — включить DEBUG logging для org.apache.flink.runtime.resourcemanager.slotmanager и пересубмить. Должны увидеть detailed accept/reject decisions в SlotManager. Глубинная причина в 90% случаев: либо TM не зарегистрированы, либо slot count не хватает, либо ResourceProfile mismatch. Не лечится "увеличить timeout" — это симптом, не cause.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие три главных под-компонента живут в JobManager-процессе, и каковы их обязанности?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5