Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 24 мин
Продвинутый
AdaptiveBatchSchedulerBatch ModePer-stage ParallelismSpeculative ExecutionStatistics

AdaptiveBatchScheduler — batch mode adaptive scheduling

В прошлых уроках мы видели Adaptive Scheduler для streaming (gracefully adapt к available resources) и Reactive Mode (single job autoscaling). Для batch mode есть отдельный scheduler — AdaptiveBatchScheduler (FLIP-187, GA в Flink 1.16, default в Flink 2.0+ для batch).

Этот scheduler решает другую проблему: в batch jobs input size каждой stage заранее неизвестен. Static parallelism = либо undersized (slow) либо oversized (waste resources). AdaptiveBatchScheduler выбирает parallelism для каждой stage based на actual input size, который определяется только после завершения upstream stage.

В этом уроке разбираем, как работает AdaptiveBatchScheduler, как делается per-stage parallelism decision, и как работает speculative execution для slow tasks.

AQE coalescing partitions в Spark

Проблема static parallelism в batch

Classic batch job:
  Stage 1: Source scan (100 GB Parquet) - 100 partitions, parallelism = 100
  Stage 2: Aggregation (300 GB intermediate) - parallelism = ???
  Stage 3: Join (500 GB intermediate) - parallelism = ???

Если static parallelism = 100 для всех stages:
  Stage 1: 1 GB per task, fits in memory, OK
  Stage 2: 3 GB per task, OK с memory tuning
  Stage 3: 5 GB per task, spills to disk, slow

Если static parallelism = 500:
  Stage 1: 200 MB per task, тысячи tasks, overhead огромный
  Stage 2: 600 MB per task, optimal
  Stage 3: 1 GB per task, optimal

No single parallelism оптимально для всех stages.

Это fundamental проблема. AdaptiveBatchScheduler решает её dynamically — parallelism стage choices после measuring input size.

Архитектура AdaptiveBatchScheduler

Workflow:

1. Stage 1 (Source) executes с initial parallelism
   - Можно вычислить statically (based на file count, partition count)

2. Stage 1 completes, produces output к intermediate dataset
   - Flink measures total bytes produced
   - Records per-partition statistics

3. Stage 2 about to start
   - AdaptiveBatchScheduler reads Stage 1 output stats
   - Computes optimal parallelism для Stage 2 based на:
     * Target per-task data size (config: jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task = 1GB)
     * Min/max parallelism bounds
     * Available slots
   - Adjusts Stage 2 parallelism

4. Stage 2 executes с new parallelism

5. Repeat for each subsequent stage

Adaptive decision per stage based on actual data.

Configuration

# flink-conf.yaml для batch jobs

# Enable AdaptiveBatchScheduler
jobmanager.scheduler: AdaptiveBatch
# Default в Flink 2.0+

# Per-task data target
jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task: 1GB
# Higher: меньше tasks, less overhead, но bigger memory per task

# Parallelism bounds
jobmanager.adaptive-batch-scheduler.min-parallelism: 1
jobmanager.adaptive-batch-scheduler.max-parallelism: 128
jobmanager.adaptive-batch-scheduler.default-source-parallelism: 16

# Speculative execution
jobmanager.adaptive-batch-scheduler.speculative.enabled: true
jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions: 2

Parallelism calculation

Algorithm:

input_size = bytes produced by upstream stage (from statistics)
target_size = jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task (default 1GB)
parallelism = ceil(input_size / target_size)
parallelism = max(min_parallelism, parallelism)
parallelism = min(max_parallelism, parallelism)

Example:
  Stage 2 input = 300 GB
  target = 1 GB
  parallelism = 300

  Stage 3 input = 50 GB
  target = 1 GB
  parallelism = 50

  Total slots needed = max(300, 50, ...) = max stage parallelism

Это smart: each stage gets parallelism appropriate для its size. Сroll downstream stage parallelism грубо равно input_size / 1GB.

Speculative execution

Большая batch jobs страдают от “straggler” tasks — медленные tasks которые задерживают весь job. Причины:

  • Hardware degraded
  • Data skew (один task имеет больше данных)
  • GC pause на конкретной machine
  • Noisy neighbour

AdaptiveBatchScheduler поддерживает speculative execution — копию slow task на другой node, первый завершивший wins.

Speculative execution flow:

1. Stage running, monitoring per-task progress

2. Identify straggler:
   - Task running значительно дольше siblings
   - Threshold: 1.5x median completion time
   - Confirmed: task slow, not просто processing more data

3. Spawn speculative copy:
   - Same input partition
   - Different TaskExecutor (different node)
   - Same task code

4. Both copies run в parallel

5. First to finish:
   - Wins
   - Other speculative cancelled
   - Output deterministic (same input, same processing)

6. Continue execution

Configuration:
  jobmanager.adaptive-batch-scheduler.speculative.enabled: true
  jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration: 1min
  -- Blocks slow node from future tasks (sticky to bad node)
AdaptiveBatchScheduler workflow
Stage 1 (p=100)Stage 1: Source scan. Initial parallelism вычислен из file/partition count (default-source-parallelism). 100 partitions, p=100.
execute
Stats: 300 GB outputStage 1 completes. Output statistics collected: total bytes, per-partition sizes, key distribution.
Compute Stage 2 p=300AdaptiveBatchScheduler computes Stage 2 parallelism. 300 GB / 1 GB target = 300 tasks. Bound by max-parallelism config.
schedule
Stage 2 (p=300)Stage 2 executes с adapted parallelism. Each task gets ~1 GB input. Memory tuned correctly.
Detect stragglerMonitoring per-task progress. Median time 60s. Task #42 still running 120s — straggler detected.
spawn speculative
Speculative #42 copySpeculative copy task #42-spec launched на другой ноде. Both copies run in parallel.
first wins
Spec wins, original cancelledSpeculative copy finishes first (30s on healthy node). Original cancelled. Stage progress restored.

Performance impact

Before AdaptiveBatchScheduler (static parallelism):

TPC-H query (1 TB data):
  - Static parallelism = 200
  - Stage 1 (scan): 200 tasks × 5 GB each = OK
  - Stage 2 (filter+project): 200 tasks × 5 GB each = OK
  - Stage 3 (aggregate): 200 tasks × 0.1 GB each = under-utilized
  - Stage 4 (final sort): 200 tasks × 1 KB each = waste
  Total time: 45 min
  Idle time: 30%
  Tasks created: 1000+

After AdaptiveBatchScheduler:

TPC-H query (1 TB data):
  - Stage 1 (scan): 200 tasks × 5 GB = OK
  - Stage 2 (filter+project): 100 tasks × 10 GB (adapted)
  - Stage 3 (aggregate): 50 tasks × 0.4 GB (adapted)
  - Stage 4 (final sort): 4 tasks × 50 KB (adapted)
  Total time: 32 min (-30%)
  Idle time: 10%
  Tasks created: 354 (less overhead)

С speculative execution:
  - Stragglers eliminated
  - Total time additional -15% (some queries)
  - 45 min -> 27 min total improvement

Differences from streaming AdaptiveScheduler

AdaptiveScheduler (streaming):
  Single job state machine
  Streaming parallelism (uniform для job)
  Reacts к slot availability changes
  Rescaling = stop + restart
  Reactive Mode integration
  Focus: graceful resource utilization

AdaptiveBatchScheduler (batch):
  Per-stage state
  Stage-specific parallelism (varies)
  Computes based on input statistics
  No rescaling (stages execute once)
  Speculative execution
  Focus: optimal parallelism per stage

Это разные components для разных needs. Streaming — uniform job, reacts к external changes. Batch — staged processing с known data flow, optimizes per-stage.

Limitations

1. Initial source parallelism
   - Стage 1 нельзя adapt (нет prior stage)
   - Static configuration based на partition count
   - Tune через default-source-parallelism

2. Statistics overhead
   - Tracking per-stage output requires bookkeeping
   - Minor overhead but real
   - Negligible для большие jobs, noticeable для маленьких

3. Memory planning
   - Variable parallelism = variable memory needs
   - Cluster нужно provision для max stage
   - Может leave memory unused для smaller stages

4. Skewed data
   - Per-task data size может vary в одной стage
   - Speculative execution helps for stragglers
   - Но не fully solves skew

5. Not all operators benefit
   - Some operators имеют forced parallelism (e.g., single global aggregate)
   - Not adapted

Use cases

Best fit для AdaptiveBatchScheduler:

1. ETL pipelines с variable stage sizes
   - Source scan -> filter -> join -> aggregate -> output
   - Stage sizes differ by orders of magnitude

2. Analytical queries (TPC-DS, TPC-H style)
   - Multi-stage queries
   - Selectivity varies stage to stage

3. Iceberg/Paimon batch reads
   - Variable input size
   - Adaptive scaling matches data

4. Mixed workloads
   - Daily ETL: large data
   - Hourly queries: smaller data
   - Same cluster, different optimal parallelism

5. Backfills и one-time processing
   - Volume может vary widely
   - Adaptive handles edge cases

Sink considerations

Sinks могут also benefit от adaptive parallelism:

CSV/Parquet sink:
  - Adapted parallelism = number of output files
  - Smaller stages = fewer files (less metadata overhead)
  - Larger stages = more files (better parallel write)

Iceberg sink:
  - Adapted parallelism влияет file size в Iceberg
  - Target file size конфигурируется per table
  - Coordination с adaptive parallelism

Database sinks (JDBC, ClickHouse):
  - Per-task = per-connection
  - Less parallelism = less DB load
  - Important для DBs with connection limits

Real production cases

Case 1: Daily TPC-DS-like analytics (Pinterest blog)
  Before AdaptiveBatchScheduler:
    Static p=512 для huge query
    Half stages over-parallelized
    Job time: 6 hours

  After AdaptiveBatchScheduler:
    Each stage right-sized
    Job time: 3.5 hours (-42%)
    Resource utilization +60%

Case 2: ETL pipeline с skewed joins (внутренний Alibaba blog)
  Before:
    Stragglers caused job to take 4x median time
    Manual intervention needed

  After с speculative:
    Stragglers automatically replaced
    Job time: predictable
    Operational burden -80%

Case 3: Iceberg backfill (Netflix blog)
  Before:
    Static parallelism не подходил variable backfill sizes
    Manual tuning per backfill

  After AdaptiveBatchScheduler:
    Each backfill автоматически правильно sized
    No manual tuning needed
    Backfills faster overall
TIP

AdaptiveBatchScheduler default включен в Flink 2.0+ для batch mode. Если у вас есть legacy batch jobs которые ожидают static parallelism, можно вернуться к Default Scheduler через jobmanager.scheduler: default. Но для большинства новых workloads AdaptiveBatchScheduler даёт лучшую производительность out of the box.

Чтение source

Source:
  flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/
    AdaptiveBatchScheduler.java         -- main class
    SpeculativeExecutionHandler.java    -- speculative tasks logic
    DefaultVertexParallelismAndInputInfosDecider.java -- parallelism algorithm
    BlocklistTracker.java               -- tracks slow nodes

  flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/
    SchedulerNG.java                    -- scheduler interface

FLIP документы:
  FLIP-187: Adaptive Batch Job Scheduler
  FLIP-168: Speculative Execution
  FLIP-283: Use AdaptiveBatchScheduler as the default scheduler for batch jobs

Production blogs:
  Apache Flink blog: "Adaptive Batch Scheduler" deep dive
  Pinterest engineering: "Speeding up batch jobs"
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Какую проблему решает AdaptiveBatchScheduler что не решает Default Scheduler?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4