Scheduler в Flink — компонент, который решает, как и когда запускать Tasks. Это под-компонент JobMaster, и от его выбора зависит много production-поведения: может ли job стартовать с partial resources, может ли rescaling произойти без savepoint, как обрабатываются failures.
С Flink 1.18 в системе три main scheduler:
- Default Scheduler — eager allocation, all-or-nothing. Был default до 2.0.
- Adaptive Scheduler — lazy allocation, supports partial resources, runtime rescaling. Default для streaming с 2.0.
- Adaptive Batch Scheduler — per-stage parallelism для batch workloads. Default для BATCH mode.
В этом уроке разберём каждый, сравним их semantics, и обсудим, когда выбирать что.
Adaptive Query Execution в Spark Жизненный цикл задания FlinkDefault Scheduler: eager all-or-nothing
Default Scheduler — это original scheduler Flink. Был default с самой первой версии до 2.0. Принцип: eager allocation, all resources at start.
Главный класс: org.apache.flink.runtime.scheduler.DefaultScheduler.
Алгоритм
- JobMaster инициализирует ExecutionGraph.
- Scheduler определяет, сколько slots нужно (sum slots per SlotSharingGroup).
- Scheduler делает slot requests к ResourceManager для всех needed slots одновременно.
- Ждёт все slots allocated.
- Если все slots allocated в
slot.request.timeout(default 5 минут) — деплоит все Tasks одновременно. - Если не все slots allocated — fails job, не запускает партиально.
- Job становится RUNNING.
Pros и cons
Pros:
- Simple semantics: или работает с full parallelism, или fails.
- Predictable performance: всё спланировано upfront.
- Хорошо для job-ов с фиксированным parallelism и предсказуемой нагрузкой.
Cons:
- Wastes resources при partial availability. Если кластер имеет 30 slots, а вам нужно 32 — fails. Это unfortunate.
- No runtime rescaling. Чтобы изменить parallelism, нужно savepoint + restart с новым parallelism.
- Slow start на больших job-ах. Все Tasks deployment одновременно — burst load на RM, на TM, на BlobServer (JAR distribution).
Когда использовать
В 2026 году Default Scheduler deprecated by default. Используется только в специфических scenarios:
- Legacy compatibility (старые pipelines на 1.x, которые ещё не мигрировали).
- Когда вам точно нужны predictable timings (например, для testing).
- Batch workloads, где Adaptive Batch не подходит.
Для типичных production streaming workloads в 2026 — Adaptive Scheduler.
Adaptive Scheduler: lazy, runtime rescaling
Adaptive Scheduler был представлен в FLIP-160 (Flink 1.13, preview) и постепенно созревал. С Flink 2.0 — default scheduler для streaming jobs.
Главный класс: org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.
Принципы
- Lazy allocation: не требует всех slots upfront. Может стартовать с partial slots и масштабироваться позже.
- Runtime rescaling: если slot count меняется (TM добавлен/удалён), scheduler автоматически rescales parallelism.
- Job-level state machine: Adaptive Scheduler имеет explicit state machine с states: Created -> WaitingForResources -> Executing -> Stopped (и др).
- Reactive mode: extreme version, parallelism = available slots (auto-scale based on resources).
Алгоритм (упрощённо)
- JobMaster start. Scheduler в state
Created. - Scheduler considers slot requirements для job (min/max parallelism per JobVertex).
- Запрашивает enough slots для min parallelism к RM. State:
WaitingForResources. - После acquired enough slots, Scheduler decides parallelism dynamically (min ≤ chosen ≤ max), based on available resources.
- Деплоит Tasks с выбранным parallelism. State:
Executing. - Watch for slot changes:
- Если new slots become available (TM added) — может rescale up. Triggers transition:
Executing -> Restarting -> Executingс higher parallelism. - Если slots removed (TM crashed, не recovered) — rescale down. Same transition.
- Если new slots become available (TM added) — может rescale up. Triggers transition:
- На failure recovery, тоже через
Restartingstate.
Configuration
# Adaptive Scheduler config
jobmanager.scheduler: adaptive # explicitly (default в 2.0+)
# Min/max parallelism per job (можно tune через REST API):
parallelism.default: 4
jobmanager.adaptive-scheduler.min-parallelism-increase: 1
jobmanager.adaptive-scheduler.resource-wait-timeout: 5 min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10 s
resource-stabilization-timeout — это debouncing: если slots колеблются (TM комплекс starts/stops), wait this duration before triggering rescale. Default 10s — чтобы не делать rescale на каждый минор изменение.
Reactive mode: extreme version
Adaptive Scheduler имеет специальный mode — Reactive Mode. В нём parallelism = automatic based on available slots. Это полезно для:
- Cloud-native auto-scaling: K8s HPA scales TM deployment вверх/вниз, Flink job автоматически использует все available slots.
- Cost optimization: scale down ночью когда traffic low, scale up днём.
jobmanager.scheduler: adaptive
scheduler-mode: reactive
В Reactive Mode parallelism всегда = total slots (assuming default slot sharing). Если TM count = 4 (8 slots каждый), parallelism = 32. Если K8s HPA scales TM count to 8, parallelism rebalances to 64.
Pros и cons
Pros:
- Partial resource start: можно стартовать с минимальными slots, no all-or-nothing.
- Runtime rescaling без savepoint — это огромная фича для cloud-native deployments.
- Auto-scale через Reactive Mode.
- Better fault tolerance: при TM failure не нужно restart job с full parallelism — только tasks affected by TM crash.
Cons:
- Slightly more complex. State machine более сложная, debug сложнее.
- Rescaling triggers state redistribution (key groups reassigned, что может pause job на seconds-minutes для large state).
- Не подходит для всех scenarios — например, если у вас strict latency SLO и rescaling pauses unacceptable.
Когда использовать
В 2026 году — default для streaming. Используйте если:
- Cloud-native deployment (K8s).
- Хотите elastic scaling под нагрузку.
- Хотите быстрый partial recovery после TM crash.
- Не критично short pauses при rescaling.
Когда не использовать:
- Strict latency SLO (e.g., trading systems с p99 менее 10ms): pauses при rescaling unacceptable.
- Тестирование, где предсказуемые timings critical.
- Legacy jobs которые ещё не tested under Adaptive.
Adaptive Batch Scheduler: per-stage parallelism
Третий scheduler — специально для batch workloads (RuntimeExecutionMode.BATCH). Представлен в FLIP-187 (Flink 1.16+), стал default для BATCH в 2.0.
Главный класс: org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.
Принципы
Batch workloads имеют специфику:
- Stage-based execution: job разбит на stages (separated by blocking shuffles).
- Intermediate data known after stage: после завершения stage можем знать exact size produced data.
- Optimal parallelism depends on data size: 1 GiB -> parallelism 4, 1 TiB -> parallelism 256.
Adaptive Batch Scheduler использует эту information: выбирает parallelism per-stage dynamically.
Алгоритм
- Стартует job со stage 1 (source stage).
- Использует fixed parallelism для source (или derived from source split count).
- После stage 1 completion, scheduler знает actual data size produced.
- Для stage 2 — calculates optimal parallelism (based on data size).
- Allocates slots, deploys stage 2.
- Repeat для каждой следующей stage.
Configuration
execution.batch.adaptive.auto-parallelism.enabled: true
execution.batch.adaptive.auto-parallelism.min-parallelism: 1
execution.batch.adaptive.auto-parallelism.max-parallelism: 256
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: 32mb
avg-data-volume-per-task — target size data per parallel task. Если data 1 GiB, parallelism = 1024 MiB / 32 MiB = 32. Если data 10 GiB, parallelism = 320 (capped to max 256).
Pros
- Optimal parallelism per stage без manual tuning.
- No wasted resources: не запускает 256 tasks for stage that processes 100 MiB.
- Adapts to data skew: можно scale up stages с heavy data, scale down с light data.
Cons
- Sequential execution между stages (waits for previous stage to finish). Это inherent for batch, но в streaming это was not the case.
- Не работает для streaming (только BATCH mode).
- Complexity: harder to predict resource usage upfront.
Когда использовать
- All BATCH workloads в 2026. Default для RuntimeExecutionMode.BATCH с 2.0.
Сравнительная таблица
Rescaling механика (Adaptive Scheduler)
Когда Adaptive Scheduler решает rescale, что происходит под капотом?
Time for rescaling: depends on state size. Small state (менее 1 GiB) — seconds. Large state (100s GiB) — minutes (limited by checkpoint duration + state restore time).
Key group redistribution
When parallelism changes, state нужно redistribute между new parallel instances. Flink использует key group concept:
- При creation job-а, total key space разбит на
maxParallelismkey groups (default 128, configurable до 32768). - Каждая parallel instance owns subset of key groups.
- При rescale, key groups redistributed между new instances.
State per key group serialized в checkpoint, и каждая new instance reads только её assigned key groups.
Это NOT cheap: redistribution = read state from S3, redistribute по new partitioning. Для 100 GiB state может занять минуты.
Code: org.apache.flink.runtime.state.KeyGroupRangeAssignment — distribution logic.
Default Scheduler vs Adaptive Scheduler: failover поведение
Ключевое отличие в failure recovery:
Default Scheduler
- На task failure -> restart all tasks (full job restart).
- Использует region-based recovery (FLIP-1, FLIP-184): tasks из failure region restarted, не вся pipeline. Но это is still pipeline-region scope, не fine-grained.
Adaptive Scheduler
- Сначала пытается local recovery (если slot still available, just retry task).
- Если slot lost (TM crashed), request new slot, deploy task на нём.
- Это more granular, less disruptive than full job restart.
В практике Adaptive Scheduler значительно лучше для high-availability requirements.
REST API для Adaptive Scheduler
С Adaptive Scheduler доступны новые REST endpoints:
# Request rescale to specific parallelism
PUT /jobs/{job-id}/rescale?newParallelism=16
# Set min/max parallelism
PATCH /jobs/{job-id}/resources
{
"minParallelism": 4,
"maxParallelism": 64
}
# Get current parallelism
GET /jobs/{job-id} | jq '.tasks[]'
Это позволяет дать manual rescale (или через external automation) without savepoint + restart.
Code references
org.apache.flink.runtime.scheduler.SchedulerNG— top-level scheduler interface.org.apache.flink.runtime.scheduler.DefaultScheduler— Default Scheduler implementation.org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler— Adaptive Scheduler.org.apache.flink.runtime.scheduler.adaptive.State— base class состояний state machine (Created, WaitingForResources, Executing, Restarting, Stopped).org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler— Adaptive Batch Scheduler.
State machine Adaptive Scheduler — это central abstraction. Каждое state имеет transitions, и они encoded в код. Прочитайте org.apache.flink.runtime.scheduler.adaptive.State и его implementations — это даст clear mental model.
Что дальше
Модуль 02 завершён. Дальше — модуль 03: Network stack и credit-based flow control. Разберём, как байты передаются между Tasks, что такое credit, как diagnose backpressure через метрики.
После модуля 03 — memory model (04), state backends (05), checkpoint internals (06), savepoints (07), watermarks (08). Это фундаментальный блок, обязательный для всех.