TaskManager — это JVM-процесс, который исполняет Task-и. Внутри TM есть slots — конкретные runtime-units, которые могут принять Task. Связь между TM и slots — это центральная концепция Flink resource model, и она нетривиальная: один TM может иметь несколько slots, в одном slot может выполняться несколько Task-ов (через slot sharing), а парадигма “1 Task = 1 thread” остаётся при этом верной.
Без понимания slot model вы не сможете правильно конфигурировать parallelism, не сможете объяснить, почему taskmanager.numberOfTaskSlots: 4 и parallelism 16 требует 4 TM (а не 16), не сможете предсказать, какие операторы попадут в один slot и почему. В этом уроке — детальный разбор.
Структура TaskManager-процесса
TaskManager — это JVM-процесс. Главный класс: org.apache.flink.runtime.taskexecutor.TaskExecutor. Он содержит несколько ключевых подсистем:
Когда вы видите в логах TM “Starting TaskExecutor with: total memory 4096 MB, …”, это инициализация всех этих подсистем. На typical production TM может быть 16 GiB heap + 16 GiB managed memory + 2 GiB network buffers + 1 GiB framework + 2 GiB JVM metaspace — всё это конфигурируется в taskmanager.memory.* properties (детально — модуль 04).
Что такое slot
Slot — это единица memory allocation в TM. Не CPU. Главное mental model:
Slot — это часть TM-памяти, выделенная под исполнение Task-ов.
Если у TM 16 GiB heap и taskmanager.numberOfTaskSlots: 4, то каждый slot получает ~4 GiB heap (плюс пропорциональная часть managed memory, network buffers).
# flink-conf.yaml
taskmanager.memory.process.size: 16g # total process memory
taskmanager.numberOfTaskSlots: 4 # 4 slots в TM
Каждый slot:
- Получает proportional share heap memory (heap / numberOfTaskSlots).
- Получает proportional share managed memory.
- Получает proportional share network buffers.
- НЕ получает dedicated CPU. CPU shared между slots через JVM thread scheduler.
Это означает: если у TM 4 vCPU и 4 slots, каждый slot effectively получает ~1 vCPU. Но это не enforced — slot не лимитирован к 1 CPU, он просто statistically получит около 1, если все 4 slots активны.
Why slots are memory-based, not CPU-based?
Flink решает, что memory — это hard limit (OOM = crash), а CPU — это soft constraint (медленнее, но работает). Поэтому isolation между slots — на memory. CPU isolation — это уже задача OS/container scheduler (cgroup в Linux / K8s pod limits).
Slot = 1 Task ИЛИ несколько Tasks?
Это central confusion для начинающих с Flink. Ответ: зависит от slot sharing group.
По умолчанию (default), slot sharing включён, и в одном slot могут выполняться несколько Tasks из разных операторов. Конкретно: в одном slot выполняется по одной параллельной инстанции каждого оператора из одной slot sharing group.
Давайте на примере. Job:
Source -> Map -> KeyBy -> Filter -> Sink
parallelism: 4 для всех
С default slot sharing все эти операторы — в одной group. Для исполнения job-а нужно:
- 4 slots (parallelism = 4).
- В каждом slot: 1 Source task + 1 Map task + 1 Filter task + 1 Sink task.
- 4 slots × 4 tasks = 16 tasks total.
- Если slots-per-TM = 4, нужен 1 TM.
Это elegant и эффективно: операторы из одной pipeline живут в одном slot, что даёт data locality и минимизирует network shuffle.
Что если slot sharing отключить?
Можно отключить slot sharing для конкретного оператора:
stream
.map(...).slotSharingGroup("group-a") // отдельная sharing group
.keyBy(...)
.process(...).slotSharingGroup("group-b");
Если все operators в разных sharing groups, то в один slot попадёт только один task. Тогда для parallelism 4 четырёх-оператор pipeline нужно 4 × 4 = 16 slots.
Когда это полезно:
- Heavy operators, которые конкурируют за ресурсы. Например, source делает много network IO, sink тоже — лучше разнести по slots.
- Memory-heavy operators. Если один оператор требует 8 GiB heap, а остальные 1 GiB, разделение по группам позволяет TM allocate slots с разными resource profiles.
Когда не полезно:
- Default case. Slot sharing — это performance optimization. Отключение without reason = больше slots = больше TM = больше money.
Slot sharing group: when explicit
API:
stream.map(...).slotSharingGroup("heavy-processing");
По умолчанию все operators — в group “default”. Если хотите изоляции, явно указываете group name.
В одной slot sharing group:
- Операторы могут share slot.
- Все параллельные instances этих операторов распределяются по slots round-robin.
Между группами:
- Операторы не могут share slot.
- Slots для разных групп — независимые.
Tuning slot allocation per group
Можно настроить, сколько resources получит каждая sharing group:
SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("group-a")
.setCpuCores(2.0)
.setTaskHeapMemory(MemorySize.ofMebiBytes(2048))
.build();
stream.map(...).slotSharingGroup(ssgA);
В Adaptive Scheduler это позволяет request slots specific size. ResourceManager должен иметь TM с slots, удовлетворяющими этим ResourceProfile.
Co-location constraint
Co-location — это специфический slot sharing, который гарантирует parallel instances двух operators быть в одном slot, чтобы соответствующие parallel instances (instance 0, 1, 2, …) ровно сошлись.
DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;
// Связать через co-location:
stream1.coLocate("ssg-1");
stream2.coLocate("ssg-1");
Это редко используется в типичных приложениях. Главный use case — iterations (deprecated в 2.0) и legacy patterns. В 2026 году большинство применений — на уровне SlotSharingGroup.
Реальный пример: parallelism, slots, TM count
Job:
env.setParallelism(8);
env.fromSource(kafkaSource, ...)
.map(...)
.keyBy(...)
.window(...)
.process(...)
.sinkTo(kafkaSink);
Default slot sharing group для всех operators.
| Конфигурация | Slots TM | TM count | Total tasks |
|---|---|---|---|
numberOfTaskSlots: 1 | 1 | 8 | 8 × 5 = 40 |
numberOfTaskSlots: 2 | 2 | 4 | 8 × 5 = 40 |
numberOfTaskSlots: 4 | 4 | 2 | 8 × 5 = 40 |
numberOfTaskSlots: 8 | 8 | 1 | 8 × 5 = 40 |
Заметьте: total tasks (40) — то же. Меняется только, как они распределены по slots/TMs.
Какой выбрать?
- 1 slot per TM: max isolation, max resource per slot. Если operator требует много heap. Минус: высокий overhead (много JVM-процессов).
- 2-4 slots per TM: компромисс. Хороший resource utilization, разумная isolation.
- 8+ slots per TM: high density. Меньше JVM overhead, но больше contention между slots за CPU/IO.
Типичные production-значения: 2-4 slots per TM для streaming workloads, 8-16 slots per TM для batch (где CPU bottleneck меньше).
Operator chaining + slots: ещё один уровень
Внутри одного slot, операторы могут быть chained в один Task. Это значит: вместо отдельных threads на каждый оператор, они исполняются sequentially в одном thread, и data между ними передаётся через method call вместо serialization.
Source -> Map -> Filter -> Sink (4 operators)
↓
Chained Task: 1 thread исполняет все 4 (если они в одной slot sharing group и chaining не отключен)
Когда chaining работает:
- Оба operators в одном slot (одна sharing group).
- Forwarding partitioning (rebalance, hash, broadcast — break chain).
- Same parallelism (если parallelism разный — break chain, нужен network shuffle).
Когда chaining ломается:
- После
.keyBy()— partitioning меняется. - После
.rebalance(),.shuffle(),.broadcast()— partitioning меняется. - Если разные slot sharing groups.
- Если parallelism разный.
- Если operators в разных JobVertex.
Эффект chaining: огромное increase throughput, потому что нет serialization между operators. На typical pipeline разница 5-10x.
Code: org.apache.flink.streaming.api.graph.StreamGraphHasherV2 и org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#connect — здесь логика chaining.
Visualization: реальная job в Web UI
Когда вы открываете JM Web UI и видите job graph, вы видите JobGraph (после chaining, до parallelism expansion). Каждая “box” — это JobVertex, который объединяет один или несколько chained operators.
Когда вы кликнете на vertex, увидите subtasks — параллельные instances. Каждая subtask — это один Task, исполняющийся в одном slot.
Например, JobGraph выглядит так:
[Source -> Map -> Filter] -> [keyBy.window -> process] -> [Sink]
JobVertex 1 JobVertex 2 JobVertex 3
Если parallelism = 4:
- JobVertex 1 имеет 4 subtasks (Source-Map-Filter chain).
- JobVertex 2 имеет 4 subtasks (window-process chain).
- JobVertex 3 имеет 4 subtasks (Sink).
Total: 12 subtasks. Со slot sharing default — нужно 4 slots.
Slot allocation: реальный flow
Sequence: как JobMaster получает slots для запуска tasks.
Этот flow — для Default Scheduler (eager). Для Adaptive Scheduler (модуль 02.05) flow слегка другой — он может начать с partial slots и rescale dynamically.
Tuning: сколько slots per TM ставить
Это прагматический вопрос, и ответ зависит от workload.
Streaming, IO-bound (Kafka source, JDBC sink, HTTP enrichment)
В IO-bound workloads операторы много времени блокируются на IO. CPU underutilized. Можно ставить много slots per TM (8-16) — они не конкурируют сильно за CPU.
Caveat: каждый slot требует proportional memory. Если TM 32 GiB и вы хотите 16 slots, каждый slot получит 2 GiB — может быть мало для state-heavy operators.
Streaming, CPU-bound (heavy parse, ML inference, complex aggregations)
CPU bottleneck. Slots конкурируют за CPU thread time. Лучше меньше slots, по одному на vCPU (4 slots на 4 vCPU TM).
Streaming, state-heavy (RocksDB)
RocksDB сам потребляет много memory (block cache, write buffers) и делает много disk IO. Если у вас 4 slots на TM, каждый имеет свой RocksDB instance, и они competing за disk. Better: меньше slots (2 на 4 vCPU), больше memory per slot, RocksDB меньше contention.
Batch (BATCH mode)
CPU-bound почти всегда (heavy sort, hash join). 1 slot на 1 vCPU — оптимально.
Recommendation
Для typical streaming production на 2026 — 2 slots per TM на 4 vCPU TM (или 4 slots на 8 vCPU). Это даёт хороший balance memory per slot vs JVM overhead.
Не оптимизируйте slot count premature. Запустите job с default (1 slot per TM), измерьте CPU и memory utilization, потом тюньте. Большинство production проблем с slots — от cargo-culting “у нас всегда 8 slots per TM”.
TaskExecutor lifecycle: registration -> heartbeat -> unregistration
После старта TM, его lifecycle:
- Registration: TM делает RPC к RM (
ResourceManagerGateway#registerTaskExecutor). - HeartbeatTarget: устанавливается двусторонний heartbeat (RM ↔ TM каждые 10s default).
- Slot allocation: TM ждёт slot allocation requests от RM.
- Task submission: после allocation, JobMaster шлёт submitTask RPC.
- Task execution: TM создаёт Task, исполняет.
- Heartbeat lost: если 3 heartbeats подряд lost, RM marks TM as dead, removes slots.
- Unregistration: при graceful shutdown TM делает RPC к RM для unregister.
Class: org.apache.flink.runtime.taskexecutor.TaskExecutor, методы connectToResourceManager, registerWithResourceManager, disconnectResourceManager.
Что дальше
Следующий урок — про graph transformations: StreamGraph -> JobGraph -> ExecutionGraph. Что происходит на каждом уровне, какие optimizations applied, как DataStream-программа становится runtime execution.
После — HA и leader election, scheduler types.