AdaptiveBatchScheduler — batch mode adaptive scheduling
В прошлых уроках мы видели Adaptive Scheduler для streaming (gracefully adapt к available resources) и Reactive Mode (single job autoscaling). Для batch mode есть отдельный scheduler — AdaptiveBatchScheduler (FLIP-187, GA в Flink 1.16, default в Flink 2.0+ для batch).
Этот scheduler решает другую проблему: в batch jobs input size каждой stage заранее неизвестен. Static parallelism = либо undersized (slow) либо oversized (waste resources). AdaptiveBatchScheduler выбирает parallelism для каждой stage based на actual input size, который определяется только после завершения upstream stage.
В этом уроке разбираем, как работает AdaptiveBatchScheduler, как делается per-stage parallelism decision, и как работает speculative execution для slow tasks.
AQE coalescing partitions в SparkПроблема static parallelism в batch
Classic batch job:
Stage 1: Source scan (100 GB Parquet) - 100 partitions, parallelism = 100
Stage 2: Aggregation (300 GB intermediate) - parallelism = ???
Stage 3: Join (500 GB intermediate) - parallelism = ???
Если static parallelism = 100 для всех stages:
Stage 1: 1 GB per task, fits in memory, OK
Stage 2: 3 GB per task, OK с memory tuning
Stage 3: 5 GB per task, spills to disk, slow
Если static parallelism = 500:
Stage 1: 200 MB per task, тысячи tasks, overhead огромный
Stage 2: 600 MB per task, optimal
Stage 3: 1 GB per task, optimal
No single parallelism оптимально для всех stages.
Это fundamental проблема. AdaptiveBatchScheduler решает её dynamically — parallelism стage choices после measuring input size.
Архитектура AdaptiveBatchScheduler
Workflow:
1. Stage 1 (Source) executes с initial parallelism
- Можно вычислить statically (based на file count, partition count)
2. Stage 1 completes, produces output к intermediate dataset
- Flink measures total bytes produced
- Records per-partition statistics
3. Stage 2 about to start
- AdaptiveBatchScheduler reads Stage 1 output stats
- Computes optimal parallelism для Stage 2 based на:
* Target per-task data size (config: jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task = 1GB)
* Min/max parallelism bounds
* Available slots
- Adjusts Stage 2 parallelism
4. Stage 2 executes с new parallelism
5. Repeat for each subsequent stage
Adaptive decision per stage based on actual data.
Configuration
# flink-conf.yaml для batch jobs
# Enable AdaptiveBatchScheduler
jobmanager.scheduler: AdaptiveBatch
# Default в Flink 2.0+
# Per-task data target
jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task: 1GB
# Higher: меньше tasks, less overhead, но bigger memory per task
# Parallelism bounds
jobmanager.adaptive-batch-scheduler.min-parallelism: 1
jobmanager.adaptive-batch-scheduler.max-parallelism: 128
jobmanager.adaptive-batch-scheduler.default-source-parallelism: 16
# Speculative execution
jobmanager.adaptive-batch-scheduler.speculative.enabled: true
jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions: 2
Parallelism calculation
Algorithm:
input_size = bytes produced by upstream stage (from statistics)
target_size = jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task (default 1GB)
parallelism = ceil(input_size / target_size)
parallelism = max(min_parallelism, parallelism)
parallelism = min(max_parallelism, parallelism)
Example:
Stage 2 input = 300 GB
target = 1 GB
parallelism = 300
Stage 3 input = 50 GB
target = 1 GB
parallelism = 50
Total slots needed = max(300, 50, ...) = max stage parallelism
Это smart: each stage gets parallelism appropriate для its size. Сroll downstream stage parallelism грубо равно input_size / 1GB.
Speculative execution
Большая batch jobs страдают от “straggler” tasks — медленные tasks которые задерживают весь job. Причины:
- Hardware degraded
- Data skew (один task имеет больше данных)
- GC pause на конкретной machine
- Noisy neighbour
AdaptiveBatchScheduler поддерживает speculative execution — копию slow task на другой node, первый завершивший wins.
Speculative execution flow:
1. Stage running, monitoring per-task progress
2. Identify straggler:
- Task running значительно дольше siblings
- Threshold: 1.5x median completion time
- Confirmed: task slow, not просто processing more data
3. Spawn speculative copy:
- Same input partition
- Different TaskExecutor (different node)
- Same task code
4. Both copies run в parallel
5. First to finish:
- Wins
- Other speculative cancelled
- Output deterministic (same input, same processing)
6. Continue execution
Configuration:
jobmanager.adaptive-batch-scheduler.speculative.enabled: true
jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration: 1min
-- Blocks slow node from future tasks (sticky to bad node)
Performance impact
Before AdaptiveBatchScheduler (static parallelism):
TPC-H query (1 TB data):
- Static parallelism = 200
- Stage 1 (scan): 200 tasks × 5 GB each = OK
- Stage 2 (filter+project): 200 tasks × 5 GB each = OK
- Stage 3 (aggregate): 200 tasks × 0.1 GB each = under-utilized
- Stage 4 (final sort): 200 tasks × 1 KB each = waste
Total time: 45 min
Idle time: 30%
Tasks created: 1000+
After AdaptiveBatchScheduler:
TPC-H query (1 TB data):
- Stage 1 (scan): 200 tasks × 5 GB = OK
- Stage 2 (filter+project): 100 tasks × 10 GB (adapted)
- Stage 3 (aggregate): 50 tasks × 0.4 GB (adapted)
- Stage 4 (final sort): 4 tasks × 50 KB (adapted)
Total time: 32 min (-30%)
Idle time: 10%
Tasks created: 354 (less overhead)
С speculative execution:
- Stragglers eliminated
- Total time additional -15% (some queries)
- 45 min -> 27 min total improvement
Differences from streaming AdaptiveScheduler
AdaptiveScheduler (streaming):
Single job state machine
Streaming parallelism (uniform для job)
Reacts к slot availability changes
Rescaling = stop + restart
Reactive Mode integration
Focus: graceful resource utilization
AdaptiveBatchScheduler (batch):
Per-stage state
Stage-specific parallelism (varies)
Computes based on input statistics
No rescaling (stages execute once)
Speculative execution
Focus: optimal parallelism per stage
Это разные components для разных needs. Streaming — uniform job, reacts к external changes. Batch — staged processing с known data flow, optimizes per-stage.
Limitations
1. Initial source parallelism
- Стage 1 нельзя adapt (нет prior stage)
- Static configuration based на partition count
- Tune через default-source-parallelism
2. Statistics overhead
- Tracking per-stage output requires bookkeeping
- Minor overhead but real
- Negligible для большие jobs, noticeable для маленьких
3. Memory planning
- Variable parallelism = variable memory needs
- Cluster нужно provision для max stage
- Может leave memory unused для smaller stages
4. Skewed data
- Per-task data size может vary в одной стage
- Speculative execution helps for stragglers
- Но не fully solves skew
5. Not all operators benefit
- Some operators имеют forced parallelism (e.g., single global aggregate)
- Not adapted
Use cases
Best fit для AdaptiveBatchScheduler:
1. ETL pipelines с variable stage sizes
- Source scan -> filter -> join -> aggregate -> output
- Stage sizes differ by orders of magnitude
2. Analytical queries (TPC-DS, TPC-H style)
- Multi-stage queries
- Selectivity varies stage to stage
3. Iceberg/Paimon batch reads
- Variable input size
- Adaptive scaling matches data
4. Mixed workloads
- Daily ETL: large data
- Hourly queries: smaller data
- Same cluster, different optimal parallelism
5. Backfills и one-time processing
- Volume может vary widely
- Adaptive handles edge cases
Sink considerations
Sinks могут also benefit от adaptive parallelism:
CSV/Parquet sink:
- Adapted parallelism = number of output files
- Smaller stages = fewer files (less metadata overhead)
- Larger stages = more files (better parallel write)
Iceberg sink:
- Adapted parallelism влияет file size в Iceberg
- Target file size конфигурируется per table
- Coordination с adaptive parallelism
Database sinks (JDBC, ClickHouse):
- Per-task = per-connection
- Less parallelism = less DB load
- Important для DBs with connection limits
Real production cases
Case 1: Daily TPC-DS-like analytics (Pinterest blog)
Before AdaptiveBatchScheduler:
Static p=512 для huge query
Half stages over-parallelized
Job time: 6 hours
After AdaptiveBatchScheduler:
Each stage right-sized
Job time: 3.5 hours (-42%)
Resource utilization +60%
Case 2: ETL pipeline с skewed joins (внутренний Alibaba blog)
Before:
Stragglers caused job to take 4x median time
Manual intervention needed
After с speculative:
Stragglers automatically replaced
Job time: predictable
Operational burden -80%
Case 3: Iceberg backfill (Netflix blog)
Before:
Static parallelism не подходил variable backfill sizes
Manual tuning per backfill
After AdaptiveBatchScheduler:
Each backfill автоматически правильно sized
No manual tuning needed
Backfills faster overall
AdaptiveBatchScheduler default включен в Flink 2.0+ для batch mode. Если у вас есть legacy batch jobs которые ожидают static parallelism, можно вернуться к Default Scheduler через jobmanager.scheduler: default. Но для большинства новых workloads AdaptiveBatchScheduler даёт лучшую производительность out of the box.
Чтение source
Source:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/
AdaptiveBatchScheduler.java -- main class
SpeculativeExecutionHandler.java -- speculative tasks logic
DefaultVertexParallelismAndInputInfosDecider.java -- parallelism algorithm
BlocklistTracker.java -- tracks slow nodes
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/
SchedulerNG.java -- scheduler interface
FLIP документы:
FLIP-187: Adaptive Batch Job Scheduler
FLIP-168: Speculative Execution
FLIP-283: Use AdaptiveBatchScheduler as the default scheduler for batch jobs
Production blogs:
Apache Flink blog: "Adaptive Batch Scheduler" deep dive
Pinterest engineering: "Speeding up batch jobs"