Learning Platform
Глоссарий Troubleshooting
Урок 03.02 · 25 мин
Продвинутый
TaskManagerSlotsSlot sharingCo-locationParallelismResource model

TaskManager — это JVM-процесс, который исполняет Task-и. Внутри TM есть slots — конкретные runtime-units, которые могут принять Task. Связь между TM и slots — это центральная концепция Flink resource model, и она нетривиальная: один TM может иметь несколько slots, в одном slot может выполняться несколько Task-ов (через slot sharing), а парадигма “1 Task = 1 thread” остаётся при этом верной.

Без понимания slot model вы не сможете правильно конфигурировать parallelism, не сможете объяснить, почему taskmanager.numberOfTaskSlots: 4 и parallelism 16 требует 4 TM (а не 16), не сможете предсказать, какие операторы попадут в один slot и почему. В этом уроке — детальный разбор.

Поток vs процесс: что общего, что разного

Структура TaskManager-процесса

TaskManager — это JVM-процесс. Главный класс: org.apache.flink.runtime.taskexecutor.TaskExecutor. Он содержит несколько ключевых подсистем:

Структура TaskManager-процесса
TaskExecutor (main)Главный класс. Регистрируется с ResourceManager, принимает slot allocations, исполняет Tasks. Метод submitTask — entry point для Task deployment.
TaskSlotTableУправляет slots: какой slot allocated какому JobMaster-у, какие Tasks в нём активны. Class: TaskSlotTableImpl.
JobLeaderServiceDiscovery JobMaster-ов для текущего job-а через HA. Используется для connection и heartbeating.
MemoryManagerУправление managed memory (отдельно от JVM heap). Allocates MemorySegments из off-heap pool для batch operators (sort, hashtable) и для RocksDB.
NetworkEnvironmentNetwork subsystem: ConnectionManager, ResultPartitionManager, BufferPools, Netty server. Здесь живёт credit-based flow control (модуль 03).
ShuffleEnvironmentShuffle layer: NettyShuffleEnvironment (default). Управляет ResultPartitions (output) и InputGates (input). Используется для data transfer между Tasks.
IOManagerBackground disk IO: BufferFileChannelManager, асинхронные writes/reads. Используется state backends и spilling в batch.
BlobCacheЛокальный кеш загруженных JAR-ов от BlobServer. Скачивает JAR один раз per JobID, переиспользует.
StateBackend instanceОдин state backend per Task (через TaskStateManager). RocksDB / HashMap / ForStDB. Каждый task имеет access к своему slice.
Task threads (по 1 на Task)Каждый running Task — отдельный JVM thread. invoke() метод исполняется в этом thread, всё processing операторов — в нём же.
RPC endpointsTaskExecutorGateway — RPC, через который JobMaster и ResourceManager общаются с TM. Heartbeats, submitTask, cancelTask, triggerCheckpoint.

Когда вы видите в логах TM “Starting TaskExecutor with: total memory 4096 MB, …”, это инициализация всех этих подсистем. На typical production TM может быть 16 GiB heap + 16 GiB managed memory + 2 GiB network buffers + 1 GiB framework + 2 GiB JVM metaspace — всё это конфигурируется в taskmanager.memory.* properties (детально — модуль 04).

Что такое slot

Slot — это единица memory allocation в TM. Не CPU. Главное mental model:

Slot — это часть TM-памяти, выделенная под исполнение Task-ов.

Если у TM 16 GiB heap и taskmanager.numberOfTaskSlots: 4, то каждый slot получает ~4 GiB heap (плюс пропорциональная часть managed memory, network buffers).

# flink-conf.yaml
taskmanager.memory.process.size: 16g     # total process memory
taskmanager.numberOfTaskSlots: 4         # 4 slots в TM

Каждый slot:

  • Получает proportional share heap memory (heap / numberOfTaskSlots).
  • Получает proportional share managed memory.
  • Получает proportional share network buffers.
  • НЕ получает dedicated CPU. CPU shared между slots через JVM thread scheduler.

Это означает: если у TM 4 vCPU и 4 slots, каждый slot effectively получает ~1 vCPU. Но это не enforced — slot не лимитирован к 1 CPU, он просто statistically получит около 1, если все 4 slots активны.

Why slots are memory-based, not CPU-based?

Flink решает, что memory — это hard limit (OOM = crash), а CPU — это soft constraint (медленнее, но работает). Поэтому isolation между slots — на memory. CPU isolation — это уже задача OS/container scheduler (cgroup в Linux / K8s pod limits).

Slot = 1 Task ИЛИ несколько Tasks?

Это central confusion для начинающих с Flink. Ответ: зависит от slot sharing group.

По умолчанию (default), slot sharing включён, и в одном slot могут выполняться несколько Tasks из разных операторов. Конкретно: в одном slot выполняется по одной параллельной инстанции каждого оператора из одной slot sharing group.

Давайте на примере. Job:

Source -> Map -> KeyBy -> Filter -> Sink
parallelism: 4 для всех

С default slot sharing все эти операторы — в одной group. Для исполнения job-а нужно:

  • 4 slots (parallelism = 4).
  • В каждом slot: 1 Source task + 1 Map task + 1 Filter task + 1 Sink task.
  • 4 slots × 4 tasks = 16 tasks total.
  • Если slots-per-TM = 4, нужен 1 TM.

Это elegant и эффективно: операторы из одной pipeline живут в одном slot, что даёт data locality и минимизирует network shuffle.

Slot sharing: 4 параллельных pipeline в 4 slots
Slot 0Один slot: 1 Source task + 1 Map task + 1 Filter task + 1 Sink task. Все они работают над одним parallel sub-pipeline. Internal data flow без network.
Slot 1Параллельный sub-pipeline #1.
Slot 2Параллельный sub-pipeline #2.
Slot 3Параллельный sub-pipeline #3.

Что если slot sharing отключить?

Можно отключить slot sharing для конкретного оператора:

stream
    .map(...).slotSharingGroup("group-a")  // отдельная sharing group
    .keyBy(...)
    .process(...).slotSharingGroup("group-b");

Если все operators в разных sharing groups, то в один slot попадёт только один task. Тогда для parallelism 4 четырёх-оператор pipeline нужно 4 × 4 = 16 slots.

Когда это полезно:

  • Heavy operators, которые конкурируют за ресурсы. Например, source делает много network IO, sink тоже — лучше разнести по slots.
  • Memory-heavy operators. Если один оператор требует 8 GiB heap, а остальные 1 GiB, разделение по группам позволяет TM allocate slots с разными resource profiles.

Когда не полезно:

  • Default case. Slot sharing — это performance optimization. Отключение without reason = больше slots = больше TM = больше money.

Slot sharing group: when explicit

API:

stream.map(...).slotSharingGroup("heavy-processing");

По умолчанию все operators — в group “default”. Если хотите изоляции, явно указываете group name.

В одной slot sharing group:

  • Операторы могут share slot.
  • Все параллельные instances этих операторов распределяются по slots round-robin.

Между группами:

  • Операторы не могут share slot.
  • Slots для разных групп — независимые.

Tuning slot allocation per group

Можно настроить, сколько resources получит каждая sharing group:

SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("group-a")
    .setCpuCores(2.0)
    .setTaskHeapMemory(MemorySize.ofMebiBytes(2048))
    .build();

stream.map(...).slotSharingGroup(ssgA);

В Adaptive Scheduler это позволяет request slots specific size. ResourceManager должен иметь TM с slots, удовлетворяющими этим ResourceProfile.

Co-location constraint

Co-location — это специфический slot sharing, который гарантирует parallel instances двух operators быть в одном slot, чтобы соответствующие parallel instances (instance 0, 1, 2, …) ровно сошлись.

DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;

// Связать через co-location:
stream1.coLocate("ssg-1");
stream2.coLocate("ssg-1");

Это редко используется в типичных приложениях. Главный use case — iterations (deprecated в 2.0) и legacy patterns. В 2026 году большинство применений — на уровне SlotSharingGroup.

Реальный пример: parallelism, slots, TM count

Job:

env.setParallelism(8);
env.fromSource(kafkaSource, ...)
   .map(...)
   .keyBy(...)
   .window(...)
   .process(...)
   .sinkTo(kafkaSink);

Default slot sharing group для всех operators.

КонфигурацияSlots TMTM countTotal tasks
numberOfTaskSlots: 1188 × 5 = 40
numberOfTaskSlots: 2248 × 5 = 40
numberOfTaskSlots: 4428 × 5 = 40
numberOfTaskSlots: 8818 × 5 = 40

Заметьте: total tasks (40) — то же. Меняется только, как они распределены по slots/TMs.

Какой выбрать?

  • 1 slot per TM: max isolation, max resource per slot. Если operator требует много heap. Минус: высокий overhead (много JVM-процессов).
  • 2-4 slots per TM: компромисс. Хороший resource utilization, разумная isolation.
  • 8+ slots per TM: high density. Меньше JVM overhead, но больше contention между slots за CPU/IO.

Типичные production-значения: 2-4 slots per TM для streaming workloads, 8-16 slots per TM для batch (где CPU bottleneck меньше).

Operator chaining + slots: ещё один уровень

Внутри одного slot, операторы могут быть chained в один Task. Это значит: вместо отдельных threads на каждый оператор, они исполняются sequentially в одном thread, и data между ними передаётся через method call вместо serialization.

Source -> Map -> Filter -> Sink   (4 operators)

   Chained Task: 1 thread исполняет все 4 (если они в одной slot sharing group и chaining не отключен)

Когда chaining работает:

  • Оба operators в одном slot (одна sharing group).
  • Forwarding partitioning (rebalance, hash, broadcast — break chain).
  • Same parallelism (если parallelism разный — break chain, нужен network shuffle).

Когда chaining ломается:

  • После .keyBy() — partitioning меняется.
  • После .rebalance(), .shuffle(), .broadcast() — partitioning меняется.
  • Если разные slot sharing groups.
  • Если parallelism разный.
  • Если operators в разных JobVertex.

Эффект chaining: огромное increase throughput, потому что нет serialization между operators. На typical pipeline разница 5-10x.

Code: org.apache.flink.streaming.api.graph.StreamGraphHasherV2 и org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#connect — здесь логика chaining.

Visualization: реальная job в Web UI

Когда вы открываете JM Web UI и видите job graph, вы видите JobGraph (после chaining, до parallelism expansion). Каждая “box” — это JobVertex, который объединяет один или несколько chained operators.

Когда вы кликнете на vertex, увидите subtasks — параллельные instances. Каждая subtask — это один Task, исполняющийся в одном slot.

Например, JobGraph выглядит так:

[Source -> Map -> Filter] -> [keyBy.window -> process] -> [Sink]
         JobVertex 1            JobVertex 2          JobVertex 3

Если parallelism = 4:

  • JobVertex 1 имеет 4 subtasks (Source-Map-Filter chain).
  • JobVertex 2 имеет 4 subtasks (window-process chain).
  • JobVertex 3 имеет 4 subtasks (Sink).

Total: 12 subtasks. Со slot sharing default — нужно 4 slots.

Slot allocation: реальный flow

Sequence: как JobMaster получает slots для запуска tasks.

Slot allocation flow
1. JobMaster: ExecutionGraph createJobMaster инстанцирует ExecutionGraph из JobGraph. Каждая ExecutionVertex — параллельная инстанция оператора, ожидает deployment.
2. Scheduler: декларирует slot requirementsScheduler анализирует ExecutionGraph, группирует ExecutionVertices по SlotSharingGroup, считает required slot count. Для каждой group — request к SlotPool.
3. SlotPool -> ResourceManager: requestSlotSlotPool (per-job cache) делает RPC к RM для каждого нужного slot. Включает ResourceProfile (memory, cpu).
4. SlotManager: match с availableSlotManager scans registered TM-ы, ищет free slot с matching ResourceProfile. Если найден — instructs TM allocate slot конкретному JobMaster.
5. TaskExecutor: allocateSlotTM марк slot как allocated к JobMaster, через RPC offer-ит slot. JobMaster добавляет slot в SlotPool.
6. Scheduler: assign tasks to slotsScheduler решает, какой ExecutionVertex деплоится в какой slot, учитывая slot sharing groups и co-location constraints.
7. JobMaster: submitTask к TaskExecutorДля каждой назначенной ExecutionVertex — RPC submitTask с TaskDeploymentDescriptor. TM создаёт Task, запускает thread, начинает invoke().

Этот flow — для Default Scheduler (eager). Для Adaptive Scheduler (модуль 02.05) flow слегка другой — он может начать с partial slots и rescale dynamically.

Tuning: сколько slots per TM ставить

Это прагматический вопрос, и ответ зависит от workload.

Streaming, IO-bound (Kafka source, JDBC sink, HTTP enrichment)

В IO-bound workloads операторы много времени блокируются на IO. CPU underutilized. Можно ставить много slots per TM (8-16) — они не конкурируют сильно за CPU.

Caveat: каждый slot требует proportional memory. Если TM 32 GiB и вы хотите 16 slots, каждый slot получит 2 GiB — может быть мало для state-heavy operators.

Streaming, CPU-bound (heavy parse, ML inference, complex aggregations)

CPU bottleneck. Slots конкурируют за CPU thread time. Лучше меньше slots, по одному на vCPU (4 slots на 4 vCPU TM).

Streaming, state-heavy (RocksDB)

RocksDB сам потребляет много memory (block cache, write buffers) и делает много disk IO. Если у вас 4 slots на TM, каждый имеет свой RocksDB instance, и они competing за disk. Better: меньше slots (2 на 4 vCPU), больше memory per slot, RocksDB меньше contention.

Batch (BATCH mode)

CPU-bound почти всегда (heavy sort, hash join). 1 slot на 1 vCPU — оптимально.

Recommendation

Для typical streaming production на 2026 — 2 slots per TM на 4 vCPU TM (или 4 slots на 8 vCPU). Это даёт хороший balance memory per slot vs JVM overhead.

TIP

Не оптимизируйте slot count premature. Запустите job с default (1 slot per TM), измерьте CPU и memory utilization, потом тюньте. Большинство production проблем с slots — от cargo-culting “у нас всегда 8 slots per TM”.

TaskExecutor lifecycle: registration -> heartbeat -> unregistration

После старта TM, его lifecycle:

  1. Registration: TM делает RPC к RM (ResourceManagerGateway#registerTaskExecutor).
  2. HeartbeatTarget: устанавливается двусторонний heartbeat (RM ↔ TM каждые 10s default).
  3. Slot allocation: TM ждёт slot allocation requests от RM.
  4. Task submission: после allocation, JobMaster шлёт submitTask RPC.
  5. Task execution: TM создаёт Task, исполняет.
  6. Heartbeat lost: если 3 heartbeats подряд lost, RM marks TM as dead, removes slots.
  7. Unregistration: при graceful shutdown TM делает RPC к RM для unregister.

Class: org.apache.flink.runtime.taskexecutor.TaskExecutor, методы connectToResourceManager, registerWithResourceManager, disconnectResourceManager.

Что дальше

Следующий урок — про graph transformations: StreamGraph -> JobGraph -> ExecutionGraph. Что происходит на каждом уровне, какие optimizations applied, как DataStream-программа становится runtime execution.

После — HA и leader election, scheduler types.

Проверка знанийKnowledge check
У вас Flink job: 8 операторов, parallelism 32, state ~500 GiB (RocksDB), throughput 200k events/sec. Текущая конфигурация: 16 TM × 4 slots = 64 slots (избыток для parallelism 32). Каждый TM: 8 vCPU, 64 GiB memory. CPU utilization 25%, RocksDB latency p99 = 12ms. Backpressure 20% времени. Какие изменения slot model вы бы предложили и почему?
ОтветAnswer
Текущая ситуация неоптимальна по нескольким причинам: (1) 64 slots при parallelism 32 — половина slots простаивает, это waste of memory. Каждый идущий slot получает 16 GiB heap (64/4), но половина не используется. (2) RocksDB latency 12ms — высокий, для state 500 GiB. С 4 slots per TM каждый имеет свой RocksDB instance (4 RocksDB на TM), они competing за disk IO. (3) CPU 25% означает либо IO-bound (RocksDB waits), либо underutilization. Рекомендация: уменьшить slots per TM до 2. Новая конфигурация: 16 TM × 2 slots = 32 slots, ровно соответствует parallelism. Каждый slot теперь получает 32 GiB heap + proportional managed memory — RocksDB block cache можно расширить с (например) 2 GiB до 6-8 GiB, что улучшит read latency дramatically. Меньше RocksDB instances per TM (2 вместо 4) — меньше disk contention, latency должна упасть до 4-6ms. Backpressure уменьшится. Альтернатива: если bottleneck реально CPU-bound на конкретных operators, сделать slot sharing groups (выделить heavy operator в отдельную group), чтобы он получил dedicated slot. Но это complex; начинать с уменьшения slots/TM. Validation: после изменения измерить CPU (должно вырасти к ~40-50%, что нормально для streaming), RocksDB latency (должно упасть), backpressure (должно уменьшиться). Если CPU shoots выше 80% — нужно scale out (больше TM с тем же количеством slots).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что такое slot в Flink, и почему он считается 'memory unit', а не 'CPU unit'?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5