Adaptive Scheduler — flexibility вместо fixed parallelism
Default Scheduler в Flink требует, чтобы все запрошенные slots были выделены до start обработки. Если запросили parallelism=32, но cluster может дать только 24 — job не запускается. Это блокирует гибкое использование ресурсов: при capacity changes job либо ждёт, либо fails.
Adaptive Scheduler (FLIP-160, GA в Flink 1.15) меняет parallel-это: job announces required resources, начинает с тем, что есть, потом dynamically increase/decrease parallelism. Это foundation для Reactive Mode и autoscaling. В этом уроке разбираемся, как работает Adaptive Scheduler внутри, какие state machines использует, и почему это foundational change в Flink runtime.
Adaptive Query Execution в Spark: runtime адаптацияDefault Scheduler: fixed model
Default Scheduler workflow:
1. SQL/code требует parallelism = 32
2. JobGraph submitted в JobManager
3. JobManager строит ExecutionGraph
- 32 ExecutionVertex per JobVertex
4. SlotPool.requestSlots(32)
5. ResourceManager allocates 32 slots
6. Если cluster может дать только 24:
a. Wait для resource manager (минуты)
b. Если timeout — fail job
7. Когда 32 slots available -> deploy
8. Job runs
Problems:
- All-or-nothing: или 32 slots, или fail
- Slow start при cluster contention
- Rescaling требует stop + restart с новым parallelism
- No reaction к changing capacity
Adaptive Scheduler: flexible model
Adaptive Scheduler workflow:
1. SQL/code requests parallelism = 32 (или "max")
2. JobGraph submitted в JobManager
3. AdaptiveScheduler анонсирует:
- Required slots: 32 (preferred)
- Minimum slots: 4 (start threshold)
4. ResourceManager allocates slots по мере доступности
5. Когда minimum reached (4 slots) -> start job at parallelism 4
6. Если new slots arrive (cluster scales up):
- Trigger rescaling (без full restart)
- Job continues at new parallelism
7. Если slots fail (TaskManager dies):
- Continue at reduced parallelism
- Wait для replacement slots
- Rescale up когда available
Advantages:
- Job starts при minimum resources
- Adapts к available capacity
- No fail при cluster contention
- Foundation для autoscaling
Архитектура: State Machine
Adaptive Scheduler — finite state machine. Каждое state представляет phase job lifecycle:
States:
Created -- начальное, после submission
Waiting4Resources -- ждём minimum slots
Executing -- job actively running
Restarting -- recovery после failure
Failing -- terminal: job failed
Finished -- terminal: job complete
Cancelling -- user requested cancel
StopWithSavepoint -- graceful stop
Transitions:
Created -> Waiting4Resources
Waiting4Resources -> Executing (when minimum slots available)
Executing -> Restarting (on failure)
Restarting -> Executing (after recovery)
Executing -> Finished (when job done)
Executing -> Cancelling (user cancel)
Cancelling -> Finished
Each state handles specific events differently:
- JobStatusChange
- SlotAvailable / SlotLost
- CheckpointComplete
- RescaleRequest
State pattern позволяет clear separation logic per phase. State machine лежит в AdaptiveScheduler Java class.
Slot allocation
Slot pool — главный resource component:
SlotPool работает с двумя concepts:
1. DeclarativeSlotPool (Adaptive Scheduler)
- Declares required resources к ResourceManager
- Receives slots по мере allocation
- No timeouts (waits indefinitely)
- Adapts к changes в available resources
vs Default SlotPool:
- Requests specific number of slots
- Strict timeout
- All-or-nothing semantics
Slot lifecycle в Adaptive:
ResourceManager has 100 slots in pool
Job requires preferred=32, min=4
t=0: AdaptiveScheduler.declareResourceRequirements({preferred: 32, min: 4})
t=1: 4 slots arrive -> start job at parallelism 4
t=10: 8 more slots arrive -> rescale to parallelism 12
t=30: 20 more slots arrive -> rescale to parallelism 32 (reached preferred)
t=100: 2 TaskManagers fail (lose 16 slots) -> rescale to parallelism 16
t=110: ResourceManager allocates 8 replacement slots -> rescale to 24
t=120: 8 more slots -> rescale to 32
All this without job restart или manual intervention.
Rescaling механика
Rescaling Adaptive Scheduler — это NOT full restart. Это in-flight операция:
Rescaling process:
1. Trigger detected
- New slots available
- Lost slots
- User RescaleRequest
2. AdaptiveScheduler determines new parallelism
- Based on available slots
- Respects min/max bounds
- Considers parallelism guidance per JobVertex
3. Triggers savepoint
- State backend creates savepoint
- Все running tasks contribute
- savepoint в S3 (cleaner) или ForstDB (faster)
4. Cancel current execution
- Stop tasks gracefully
- Release slots
5. Build new ExecutionGraph
- New parallelism per JobVertex
- Key groups redistributed
6. Request new slots
- Use available pool
7. Restore from savepoint
- State distributed по key groups в new parallelism
8. Resume processing
Time: 30 sec - 5 min в зависимости от state size и backend
С ForstDB rescaling drastically faster — state в S3 lazy loaded, no download phase.
Configuration
# flink-conf.yaml
# Включить Adaptive Scheduler
jobmanager.scheduler: adaptive
# (Default: default, который Default Scheduler)
# Опционально: Reactive Mode (use ALL available resources)
scheduler-mode: reactive
# (когда reactive, все slots cluster used для one job)
# Parallelism bounds
jobmanager.adaptive-scheduler.min-parallelism-increase: 1
jobmanager.adaptive-scheduler.resource-wait-timeout: 5min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10s
# Per-vertex parallelism (если нужно)
pipeline.max-parallelism: 32
Когда использовать
Use Adaptive Scheduler when:
1. Dynamic cluster capacity
- K8s с HPA (horizontal pod autoscaler)
- YARN с queue resources changing
- Need flexible start
2. Reactive Mode required
- Autoscaling по нагрузке
- Single-job-per-cluster
- Подробнее в уроке 2
3. Faster startup desired
- Не ждать all slots
- Start partial, scale up
4. Cluster contention common
- Не fail когда не enough slots
- Adapt к available
Use Default Scheduler when:
1. Strict parallelism requirement
- Critical SLA на throughput
- Need exactly N tasks для downstream coordination
2. Mature production stability
- Default Scheduler battle-tested
- Less moving parts
3. Don't need rescaling
- Static cluster
- Predictable load
4. Compatibility с legacy
- Some custom operators assume fixed parallelism
- Some sinks не handle rescaling gracefully
Implementation в коде
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/
AdaptiveScheduler.java -- main entry point, holds state
ExecutionGraphHandler.java -- manages ExecutionGraph rebuilds
StateTransitions.java -- state machine transitions
states/
Created.java -- initial state
WaitingForResources.java
Executing.java
Restarting.java
Failing.java
Finished.java
Cancelling.java
StopWithSavepoint.java
SlotAllocator.java -- determines parallelism from slots
RescaleManager.java -- triggers rescaling decisions
declarative/
DeclarativeSlotPool.java -- declarative resource model
DefaultDeclarativeSlotPool.java
Comparison metrics
Aspect Default Scheduler Adaptive Scheduler
Startup time Wait для all slots Start с min slots
(potentially min) (immediately)
Rescaling Stop + manual restart In-flight rescale
(manual интervention) (automatic)
Capacity changes Job stuck or fails Job adapts
Resource utilization 100% if available 100% with reactive
fail if not grace if not
Recovery from TM loss Full restart Partial degradation
(continue or restart)
State preservation Through checkpoint Through savepoint
(limited) (cleaner)
Compatibility All operators Limited (multi-input,
some sinks)
Use case fit Static deployments Dynamic, cloud-native
critical SLA autoscaling
Limitations
Не все features compatible с Adaptive Scheduler:
1. Не поддерживается:
- Streaming + Batch unified mode (Flink 1.14-1.17)
- Some legacy connectors
- Multi-input operators (некоторые)
- Async I/O operators (с Flink 1.15)
2. Rescaling overhead:
- Savepoint creates +1-5 min latency
- State backend dependent
- ForstDB significantly reduces overhead
3. Watermark coordination:
- Rescale могут cause watermark regression
- Tools to handle: pipeline.watermark-alignment
4. Sink coordination:
- Some sinks (Kafka TX) не handle rescale gracefully
- Need to use exactly-once aware sinks
Adaptive Scheduler не silver bullet. Для critical jobs с strict SLA лучше остаться на Default Scheduler. Для exploratory или autoscaling workloads — Adaptive преимущественнее. Лучшее решение: test specific job на staging до production switch.
Чтение source
FLIP документы:
FLIP-160: Declarative Resource Management
FLIP-159: Flink-defaults для Reactive Mode
Source:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/
flink-runtime/src/main/java/org/apache/flink/runtime/slots/
Configuration documentation:
flink-docs: Adaptive Scheduler configuration page
Blog posts:
"Adaptive Batch Scheduler" — Flink blog
"Reactive Mode in Flink" — Flink blog