Learning Platform
Глоссарий Troubleshooting
Урок 12.01 · 25 мин
Продвинутый
Adaptive SchedulerFLIP-160Resource AllocationParallelismJobManagerSlot Pool

Adaptive Scheduler — flexibility вместо fixed parallelism

Default Scheduler в Flink требует, чтобы все запрошенные slots были выделены до start обработки. Если запросили parallelism=32, но cluster может дать только 24 — job не запускается. Это блокирует гибкое использование ресурсов: при capacity changes job либо ждёт, либо fails.

Adaptive Scheduler (FLIP-160, GA в Flink 1.15) меняет parallel-это: job announces required resources, начинает с тем, что есть, потом dynamically increase/decrease parallelism. Это foundation для Reactive Mode и autoscaling. В этом уроке разбираемся, как работает Adaptive Scheduler внутри, какие state machines использует, и почему это foundational change в Flink runtime.

Adaptive Query Execution в Spark: runtime адаптация

Default Scheduler: fixed model

Default Scheduler workflow:

1. SQL/code требует parallelism = 32
2. JobGraph submitted в JobManager
3. JobManager строит ExecutionGraph
   - 32 ExecutionVertex per JobVertex
4. SlotPool.requestSlots(32)
5. ResourceManager allocates 32 slots
6. Если cluster может дать только 24:
   a. Wait для resource manager (минуты)
   b. Если timeout — fail job
7. Когда 32 slots available -> deploy
8. Job runs

Problems:
  - All-or-nothing: или 32 slots, или fail
  - Slow start при cluster contention
  - Rescaling требует stop + restart с новым parallelism
  - No reaction к changing capacity

Adaptive Scheduler: flexible model

Adaptive Scheduler workflow:

1. SQL/code requests parallelism = 32 (или "max")
2. JobGraph submitted в JobManager
3. AdaptiveScheduler анонсирует:
   - Required slots: 32 (preferred)
   - Minimum slots: 4 (start threshold)
4. ResourceManager allocates slots по мере доступности
5. Когда minimum reached (4 slots) -> start job at parallelism 4
6. Если new slots arrive (cluster scales up):
   - Trigger rescaling (без full restart)
   - Job continues at new parallelism
7. Если slots fail (TaskManager dies):
   - Continue at reduced parallelism
   - Wait для replacement slots
   - Rescale up когда available

Advantages:
  - Job starts при minimum resources
  - Adapts к available capacity
  - No fail при cluster contention
  - Foundation для autoscaling

Архитектура: State Machine

Adaptive Scheduler — finite state machine. Каждое state представляет phase job lifecycle:

States:

Created             -- начальное, после submission
Waiting4Resources   -- ждём minimum slots
Executing           -- job actively running
Restarting          -- recovery после failure
Failing             -- terminal: job failed
Finished            -- terminal: job complete
Cancelling          -- user requested cancel
StopWithSavepoint   -- graceful stop

Transitions:
  Created           -> Waiting4Resources
  Waiting4Resources -> Executing (when minimum slots available)
  Executing         -> Restarting (on failure)
  Restarting        -> Executing (after recovery)
  Executing         -> Finished (when job done)
  Executing         -> Cancelling (user cancel)
  Cancelling        -> Finished

Each state handles specific events differently:
  - JobStatusChange
  - SlotAvailable / SlotLost
  - CheckpointComplete
  - RescaleRequest

State pattern позволяет clear separation logic per phase. State machine лежит в AdaptiveScheduler Java class.

Slot allocation

Slot pool — главный resource component:

SlotPool работает с двумя concepts:

1. DeclarativeSlotPool (Adaptive Scheduler)
   - Declares required resources к ResourceManager
   - Receives slots по мере allocation
   - No timeouts (waits indefinitely)
   - Adapts к changes в available resources

vs Default SlotPool:
   - Requests specific number of slots
   - Strict timeout
   - All-or-nothing semantics

Slot lifecycle в Adaptive:

ResourceManager has 100 slots in pool
Job requires preferred=32, min=4

t=0: AdaptiveScheduler.declareResourceRequirements({preferred: 32, min: 4})
t=1: 4 slots arrive -> start job at parallelism 4
t=10: 8 more slots arrive -> rescale to parallelism 12
t=30: 20 more slots arrive -> rescale to parallelism 32 (reached preferred)
t=100: 2 TaskManagers fail (lose 16 slots) -> rescale to parallelism 16
t=110: ResourceManager allocates 8 replacement slots -> rescale to 24
t=120: 8 more slots -> rescale to 32

All this without job restart или manual intervention.

Rescaling механика

Rescaling Adaptive Scheduler — это NOT full restart. Это in-flight операция:

Rescaling process:

1. Trigger detected
   - New slots available
   - Lost slots
   - User RescaleRequest

2. AdaptiveScheduler determines new parallelism
   - Based on available slots
   - Respects min/max bounds
   - Considers parallelism guidance per JobVertex

3. Triggers savepoint
   - State backend creates savepoint
   - Все running tasks contribute
   - savepoint в S3 (cleaner) или ForstDB (faster)

4. Cancel current execution
   - Stop tasks gracefully
   - Release slots

5. Build new ExecutionGraph
   - New parallelism per JobVertex
   - Key groups redistributed

6. Request new slots
   - Use available pool

7. Restore from savepoint
   - State distributed по key groups в new parallelism

8. Resume processing

Time: 30 sec - 5 min в зависимости от state size и backend

С ForstDB rescaling drastically faster — state в S3 lazy loaded, no download phase.

Adaptive Scheduler flow
Job Submit (p=32, min=4)Job submitted с preferred parallelism = 32, min = 4. AdaptiveScheduler начинает FSM в state Created.
declare
Waiting4ResourcesState Waiting4Resources. AdaptiveScheduler announce требований к ResourceManager. Job не работает ещё.
4 slots -> start p=44 slots arrive — достигнут минимум. AdaptiveScheduler transitions в Executing с parallelism 4.
more slots
+20 slots -> rescale p=2020 slots arrive — rescale без restart. Trigger savepoint, redeploy в parallelism 20.
reach preferred
+12 slots -> p=32 (preferred)Preferred 32 reached. Job stable, runs at maximum throughput.
2 TM dead -> rescale p=162 TaskManagers die, lose 16 slots. AdaptiveScheduler rescales down к p=16, continues processing degraded.
replacement
16 replacement -> p=32ResourceManager allocates 16 replacement slots. Rescale up к p=32. Total downtime ~30 sec.
continue
Self-healing loopAdaptive cycle: scale up/down based on cluster capacity, без user intervention. Foundation для autoscaling.

Configuration

# flink-conf.yaml

# Включить Adaptive Scheduler
jobmanager.scheduler: adaptive
# (Default: default, который Default Scheduler)

# Опционально: Reactive Mode (use ALL available resources)
scheduler-mode: reactive
# (когда reactive, все slots cluster used для one job)

# Parallelism bounds
jobmanager.adaptive-scheduler.min-parallelism-increase: 1
jobmanager.adaptive-scheduler.resource-wait-timeout: 5min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10s

# Per-vertex parallelism (если нужно)
pipeline.max-parallelism: 32

Когда использовать

Use Adaptive Scheduler when:

1. Dynamic cluster capacity
   - K8s с HPA (horizontal pod autoscaler)
   - YARN с queue resources changing
   - Need flexible start

2. Reactive Mode required
   - Autoscaling по нагрузке
   - Single-job-per-cluster
   - Подробнее в уроке 2

3. Faster startup desired
   - Не ждать all slots
   - Start partial, scale up

4. Cluster contention common
   - Не fail когда не enough slots
   - Adapt к available

Use Default Scheduler when:

1. Strict parallelism requirement
   - Critical SLA на throughput
   - Need exactly N tasks для downstream coordination

2. Mature production stability
   - Default Scheduler battle-tested
   - Less moving parts

3. Don't need rescaling
   - Static cluster
   - Predictable load

4. Compatibility с legacy
   - Some custom operators assume fixed parallelism
   - Some sinks не handle rescaling gracefully

Implementation в коде

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/

  AdaptiveScheduler.java         -- main entry point, holds state
  ExecutionGraphHandler.java     -- manages ExecutionGraph rebuilds
  StateTransitions.java          -- state machine transitions
  
  states/
    Created.java                  -- initial state
    WaitingForResources.java
    Executing.java
    Restarting.java
    Failing.java
    Finished.java
    Cancelling.java
    StopWithSavepoint.java

  SlotAllocator.java              -- determines parallelism from slots
  RescaleManager.java             -- triggers rescaling decisions
  
  declarative/
    DeclarativeSlotPool.java      -- declarative resource model
    DefaultDeclarativeSlotPool.java

Comparison metrics

Aspect                     Default Scheduler          Adaptive Scheduler

Startup time               Wait для all slots         Start с min slots
                          (potentially min)           (immediately)

Rescaling                  Stop + manual restart      In-flight rescale
                          (manual интervention)       (automatic)

Capacity changes           Job stuck or fails         Job adapts
                          
Resource utilization       100% if available          100% with reactive
                          fail if not                 grace if not

Recovery from TM loss      Full restart               Partial degradation
                                                      (continue or restart)

State preservation         Through checkpoint         Through savepoint
                          (limited)                   (cleaner)

Compatibility              All operators              Limited (multi-input,
                                                      some sinks)

Use case fit               Static deployments         Dynamic, cloud-native
                          critical SLA                autoscaling

Limitations

Не все features compatible с Adaptive Scheduler:

1. Не поддерживается:
   - Streaming + Batch unified mode (Flink 1.14-1.17)
   - Some legacy connectors
   - Multi-input operators (некоторые)
   - Async I/O operators (с Flink 1.15)

2. Rescaling overhead:
   - Savepoint creates +1-5 min latency
   - State backend dependent
   - ForstDB significantly reduces overhead

3. Watermark coordination:
   - Rescale могут cause watermark regression
   - Tools to handle: pipeline.watermark-alignment

4. Sink coordination:
   - Some sinks (Kafka TX) не handle rescale gracefully
   - Need to use exactly-once aware sinks
WARNING

Adaptive Scheduler не silver bullet. Для critical jobs с strict SLA лучше остаться на Default Scheduler. Для exploratory или autoscaling workloads — Adaptive преимущественнее. Лучшее решение: test specific job на staging до production switch.

Чтение source

FLIP документы:
  FLIP-160: Declarative Resource Management
  FLIP-159: Flink-defaults для Reactive Mode

Source:
  flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/
  flink-runtime/src/main/java/org/apache/flink/runtime/slots/

Configuration documentation:
  flink-docs: Adaptive Scheduler configuration page
  
Blog posts:
  "Adaptive Batch Scheduler" — Flink blog
  "Reactive Mode in Flink" — Flink blog
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём ключевое отличие Adaptive Scheduler от Default Scheduler?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4