Learning Platform
Глоссарий Troubleshooting
Урок 03.05 · 24 мин
Продвинутый
SchedulerDefault SchedulerAdaptive SchedulerAdaptive BatchFLIP-160Rescaling

Scheduler в Flink — компонент, который решает, как и когда запускать Tasks. Это под-компонент JobMaster, и от его выбора зависит много production-поведения: может ли job стартовать с partial resources, может ли rescaling произойти без savepoint, как обрабатываются failures.

С Flink 1.18 в системе три main scheduler:

  1. Default Scheduler — eager allocation, all-or-nothing. Был default до 2.0.
  2. Adaptive Scheduler — lazy allocation, supports partial resources, runtime rescaling. Default для streaming с 2.0.
  3. Adaptive Batch Scheduler — per-stage parallelism для batch workloads. Default для BATCH mode.

В этом уроке разберём каждый, сравним их semantics, и обсудим, когда выбирать что.

Adaptive Query Execution в Spark Жизненный цикл задания Flink

Default Scheduler: eager all-or-nothing

Default Scheduler — это original scheduler Flink. Был default с самой первой версии до 2.0. Принцип: eager allocation, all resources at start.

Главный класс: org.apache.flink.runtime.scheduler.DefaultScheduler.

Алгоритм

  1. JobMaster инициализирует ExecutionGraph.
  2. Scheduler определяет, сколько slots нужно (sum slots per SlotSharingGroup).
  3. Scheduler делает slot requests к ResourceManager для всех needed slots одновременно.
  4. Ждёт все slots allocated.
  5. Если все slots allocated в slot.request.timeout (default 5 минут) — деплоит все Tasks одновременно.
  6. Если не все slots allocated — fails job, не запускает партиально.
  7. Job становится RUNNING.

Pros и cons

Pros:

  • Simple semantics: или работает с full parallelism, или fails.
  • Predictable performance: всё спланировано upfront.
  • Хорошо для job-ов с фиксированным parallelism и предсказуемой нагрузкой.

Cons:

  • Wastes resources при partial availability. Если кластер имеет 30 slots, а вам нужно 32 — fails. Это unfortunate.
  • No runtime rescaling. Чтобы изменить parallelism, нужно savepoint + restart с новым parallelism.
  • Slow start на больших job-ах. Все Tasks deployment одновременно — burst load на RM, на TM, на BlobServer (JAR distribution).

Когда использовать

В 2026 году Default Scheduler deprecated by default. Используется только в специфических scenarios:

  • Legacy compatibility (старые pipelines на 1.x, которые ещё не мигрировали).
  • Когда вам точно нужны predictable timings (например, для testing).
  • Batch workloads, где Adaptive Batch не подходит.

Для типичных production streaming workloads в 2026 — Adaptive Scheduler.

Adaptive Scheduler: lazy, runtime rescaling

Adaptive Scheduler был представлен в FLIP-160 (Flink 1.13, preview) и постепенно созревал. С Flink 2.0 — default scheduler для streaming jobs.

Главный класс: org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.

Принципы

  1. Lazy allocation: не требует всех slots upfront. Может стартовать с partial slots и масштабироваться позже.
  2. Runtime rescaling: если slot count меняется (TM добавлен/удалён), scheduler автоматически rescales parallelism.
  3. Job-level state machine: Adaptive Scheduler имеет explicit state machine с states: Created -> WaitingForResources -> Executing -> Stopped (и др).
  4. Reactive mode: extreme version, parallelism = available slots (auto-scale based on resources).

Алгоритм (упрощённо)

  1. JobMaster start. Scheduler в state Created.
  2. Scheduler considers slot requirements для job (min/max parallelism per JobVertex).
  3. Запрашивает enough slots для min parallelism к RM. State: WaitingForResources.
  4. После acquired enough slots, Scheduler decides parallelism dynamically (min ≤ chosen ≤ max), based on available resources.
  5. Деплоит Tasks с выбранным parallelism. State: Executing.
  6. Watch for slot changes:
    • Если new slots become available (TM added) — может rescale up. Triggers transition: Executing -> Restarting -> Executing с higher parallelism.
    • Если slots removed (TM crashed, не recovered) — rescale down. Same transition.
  7. На failure recovery, тоже через Restarting state.

Configuration

# Adaptive Scheduler config
jobmanager.scheduler: adaptive    # explicitly (default в 2.0+)

# Min/max parallelism per job (можно tune через REST API):
parallelism.default: 4
jobmanager.adaptive-scheduler.min-parallelism-increase: 1
jobmanager.adaptive-scheduler.resource-wait-timeout: 5 min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10 s

resource-stabilization-timeout — это debouncing: если slots колеблются (TM комплекс starts/stops), wait this duration before triggering rescale. Default 10s — чтобы не делать rescale на каждый минор изменение.

Reactive mode: extreme version

Adaptive Scheduler имеет специальный mode — Reactive Mode. В нём parallelism = automatic based on available slots. Это полезно для:

  • Cloud-native auto-scaling: K8s HPA scales TM deployment вверх/вниз, Flink job автоматически использует все available slots.
  • Cost optimization: scale down ночью когда traffic low, scale up днём.
jobmanager.scheduler: adaptive
scheduler-mode: reactive

В Reactive Mode parallelism всегда = total slots (assuming default slot sharing). Если TM count = 4 (8 slots каждый), parallelism = 32. Если K8s HPA scales TM count to 8, parallelism rebalances to 64.

Pros и cons

Pros:

  • Partial resource start: можно стартовать с минимальными slots, no all-or-nothing.
  • Runtime rescaling без savepoint — это огромная фича для cloud-native deployments.
  • Auto-scale через Reactive Mode.
  • Better fault tolerance: при TM failure не нужно restart job с full parallelism — только tasks affected by TM crash.

Cons:

  • Slightly more complex. State machine более сложная, debug сложнее.
  • Rescaling triggers state redistribution (key groups reassigned, что может pause job на seconds-minutes для large state).
  • Не подходит для всех scenarios — например, если у вас strict latency SLO и rescaling pauses unacceptable.

Когда использовать

В 2026 году — default для streaming. Используйте если:

  • Cloud-native deployment (K8s).
  • Хотите elastic scaling под нагрузку.
  • Хотите быстрый partial recovery после TM crash.
  • Не критично short pauses при rescaling.

Когда не использовать:

  • Strict latency SLO (e.g., trading systems с p99 менее 10ms): pauses при rescaling unacceptable.
  • Тестирование, где предсказуемые timings critical.
  • Legacy jobs которые ещё не tested under Adaptive.

Adaptive Batch Scheduler: per-stage parallelism

Третий scheduler — специально для batch workloads (RuntimeExecutionMode.BATCH). Представлен в FLIP-187 (Flink 1.16+), стал default для BATCH в 2.0.

Главный класс: org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.

Принципы

Batch workloads имеют специфику:

  • Stage-based execution: job разбит на stages (separated by blocking shuffles).
  • Intermediate data known after stage: после завершения stage можем знать exact size produced data.
  • Optimal parallelism depends on data size: 1 GiB -> parallelism 4, 1 TiB -> parallelism 256.

Adaptive Batch Scheduler использует эту information: выбирает parallelism per-stage dynamically.

Алгоритм

  1. Стартует job со stage 1 (source stage).
  2. Использует fixed parallelism для source (или derived from source split count).
  3. После stage 1 completion, scheduler знает actual data size produced.
  4. Для stage 2 — calculates optimal parallelism (based on data size).
  5. Allocates slots, deploys stage 2.
  6. Repeat для каждой следующей stage.

Configuration

execution.batch.adaptive.auto-parallelism.enabled: true
execution.batch.adaptive.auto-parallelism.min-parallelism: 1
execution.batch.adaptive.auto-parallelism.max-parallelism: 256
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: 32mb

avg-data-volume-per-task — target size data per parallel task. Если data 1 GiB, parallelism = 1024 MiB / 32 MiB = 32. Если data 10 GiB, parallelism = 320 (capped to max 256).

Pros

  • Optimal parallelism per stage без manual tuning.
  • No wasted resources: не запускает 256 tasks for stage that processes 100 MiB.
  • Adapts to data skew: можно scale up stages с heavy data, scale down с light data.

Cons

  • Sequential execution между stages (waits for previous stage to finish). Это inherent for batch, но в streaming это was not the case.
  • Не работает для streaming (только BATCH mode).
  • Complexity: harder to predict resource usage upfront.

Когда использовать

  • All BATCH workloads в 2026. Default для RuntimeExecutionMode.BATCH с 2.0.

Сравнительная таблица

Default vs Adaptive vs Adaptive Batch
Default SchedulerEager, all-or-nothing. Был default до 2.0. Сейчас deprecated by default.
Adaptive SchedulerDefault для streaming с 2.0. Lazy, supports partial resources, runtime rescaling.
Adaptive Batch SchedulerDefault для BATCH mode с 2.0. Per-stage parallelism, data-driven decisions.
Allocation: eagerВсе slots requested upfront. Если не все available — fails.
Allocation: lazyСтартует с min parallelism slots, scales позже по availability.
Allocation: per-stageКаждая stage запрашивает slots по своему предполагаемому parallelism.
Runtime rescaling: NOЧтобы изменить parallelism — savepoint + restart.
Runtime rescaling: YESSlot count changes -> automatic rescale через Restarting state.
Runtime rescaling: per-stageКаждая новая stage может иметь different parallelism. Уже completed stages — нет.
Mode: STREAMING + BATCHУниверсален, поддерживает оба execution mode.
Mode: STREAMING (only)Только для streaming. Для BATCH использует Adaptive Batch (или fallback to Default).
Mode: BATCH (only)Только для BATCH execution mode.
Use case: legacyMostly legacy compatibility в 2026.
Use case: cloud-native streamingK8s deployments, elastic scaling, fault recovery без savepoint.
Use case: ETL и analytics batchHive-like ETL, periodic batch jobs, data warehouse refresh.

Rescaling механика (Adaptive Scheduler)

Когда Adaptive Scheduler решает rescale, что происходит под капотом?

Rescaling sequence
1. Trigger eventTM added/removed, или manual request через REST API (PUT /jobs/{id}/rescale).
2. DebouncingWait resource-stabilization-timeout (default 10s) чтобы избежать частых rescale на noisy slot changes.
3. Trigger checkpointAdaptive Scheduler triggers savepoint-like checkpoint (использует существующий CheckpointCoordinator). Это даёт consistent state snapshot.
4. Cancel tasksПосле successful checkpoint — cancel все running Tasks. Job в state Restarting.
5. Recalculate parallelismBased на available slots, choose new parallelism для каждого JobVertex.
6. Build new ExecutionGraphExecutionGraph rebuilt с new parallelism. Key groups redistributed.
7. Restore from checkpointTasks deployed с restore from just-taken checkpoint. State разделён по new key group distribution.
8. Resume processingState machine: Restarting -> Executing. Job RUNNING с новым parallelism.

Time for rescaling: depends on state size. Small state (менее 1 GiB) — seconds. Large state (100s GiB) — minutes (limited by checkpoint duration + state restore time).

Key group redistribution

When parallelism changes, state нужно redistribute между new parallel instances. Flink использует key group concept:

  • При creation job-а, total key space разбит на maxParallelism key groups (default 128, configurable до 32768).
  • Каждая parallel instance owns subset of key groups.
  • При rescale, key groups redistributed между new instances.

State per key group serialized в checkpoint, и каждая new instance reads только её assigned key groups.

Это NOT cheap: redistribution = read state from S3, redistribute по new partitioning. Для 100 GiB state может занять минуты.

Code: org.apache.flink.runtime.state.KeyGroupRangeAssignment — distribution logic.

Default Scheduler vs Adaptive Scheduler: failover поведение

Ключевое отличие в failure recovery:

Default Scheduler

  • На task failure -> restart all tasks (full job restart).
  • Использует region-based recovery (FLIP-1, FLIP-184): tasks из failure region restarted, не вся pipeline. Но это is still pipeline-region scope, не fine-grained.

Adaptive Scheduler

  • Сначала пытается local recovery (если slot still available, just retry task).
  • Если slot lost (TM crashed), request new slot, deploy task на нём.
  • Это more granular, less disruptive than full job restart.

В практике Adaptive Scheduler значительно лучше для high-availability requirements.

REST API для Adaptive Scheduler

С Adaptive Scheduler доступны новые REST endpoints:

# Request rescale to specific parallelism
PUT /jobs/{job-id}/rescale?newParallelism=16

# Set min/max parallelism
PATCH /jobs/{job-id}/resources
{
  "minParallelism": 4,
  "maxParallelism": 64
}

# Get current parallelism
GET /jobs/{job-id} | jq '.tasks[]'

Это позволяет дать manual rescale (или через external automation) without savepoint + restart.

Code references

  • org.apache.flink.runtime.scheduler.SchedulerNG — top-level scheduler interface.
  • org.apache.flink.runtime.scheduler.DefaultScheduler — Default Scheduler implementation.
  • org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler — Adaptive Scheduler.
  • org.apache.flink.runtime.scheduler.adaptive.State — base class состояний state machine (Created, WaitingForResources, Executing, Restarting, Stopped).
  • org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler — Adaptive Batch Scheduler.

State machine Adaptive Scheduler — это central abstraction. Каждое state имеет transitions, и они encoded в код. Прочитайте org.apache.flink.runtime.scheduler.adaptive.State и его implementations — это даст clear mental model.

Что дальше

Модуль 02 завершён. Дальше — модуль 03: Network stack и credit-based flow control. Разберём, как байты передаются между Tasks, что такое credit, как diagnose backpressure через метрики.

После модуля 03 — memory model (04), state backends (05), checkpoint internals (06), savepoints (07), watermarks (08). Это фундаментальный блок, обязательный для всех.

Проверка знанийKnowledge check
У вас Flink streaming job на K8s, 16 TM × 4 slots = 64 slots total, parallelism 32. Используется Adaptive Scheduler. Сценарий: один TM crashed (вы видите это в K8s logs — OOMKilled), pod рестартует через 30 секунд. Что произойдёт с job в течение этих 30 секунд и затем? Сравните с тем, что было бы с Default Scheduler.
ОтветAnswer
Adaptive Scheduler behavior: T=0 TM crash. 4 Tasks running в crashed TM становятся unreachable. Heartbeat lost после 30s (default heartbeat.timeout). T~30s ResourceManager removes crashed TM, 4 slots disappear. Total slots: 60 (а нам нужно 32 для parallelism). Adaptive Scheduler triggers checkpoint, cancels affected tasks (или все, depending on recovery scope), rebuilds ExecutionGraph. Поскольку slots всё ещё достаточно (60 > 32), может deploy с same parallelism 32. Restore from checkpoint, resume. Total downtime: 30s heartbeat timeout + 5-15s rescaling overhead + state restore time. State 200 GiB / 32 parallel = ~6 GiB per task, restore ~30-60s в зависимости от S3 throughput. Total job downtime: ~60-90s. Если K8s рестартует pod за 30s, новый TM становится available, total slots return to 64. Adaptive Scheduler debounces (resource-stabilization-timeout 10s), потом может rescale back to higher parallelism если configured min/max позволяет. С Default Scheduler: T=0 same crash. T~30s heartbeat lost, slots disappeared. Default Scheduler attempts restart: requests 32 slots, но только 60 available — fine, 32 OK. Triggers restart all tasks (or region-based если configured), restore from checkpoint, resume. Подобная total downtime, но с two main differences: (a) Default Scheduler не может handle сценарий "только 30 slots available", он бы fail если parallelism > available; (b) после TM recovery Default Scheduler ничего не делает — parallelism остаётся 32, новые 4 slots simply unused. Adaptive Scheduler может scale up или down dynamically. В этом сценарии Adaptive Scheduler superior because: (1) more graceful handling slot fluctuations, (2) can opportunistically rescale, (3) recovery automation lower-friction.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Три scheduler в Flink 2.x — какой default для streaming, какой для batch, и какой legacy?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5