Reactive Mode — autoscaling режим для Flink
Reactive Mode — это специальный mode operation, который превращает Flink cluster в полностью elastic single-job system. На фоне Adaptive Scheduler (foundation), Reactive Mode добавляет правила: один job per cluster, всегда использует ВСЕ available resources. Это превращает Flink в систему, которой может управлять external autoscaler — добавляешь nodes, job растёт; удаляешь — job уменьшается.
В этом уроке мы разбираем Reactive Mode внутренности, конфиги, и реальный production example: ML-based autoscaling в Grab (one of largest Flink users in Asia).
KEDA и HPA-based autoscaling для FlinkЧто такое Reactive Mode
Default Flink cluster:
- Multiple jobs может работать одновременно
- Each job requires specific parallelism
- Resource allocation shared
- Manual scaling intervention
Reactive Mode cluster:
- SINGLE job per cluster
- Job parallelism = total available slots
- Любые added slots -> автоматически использованы
- Любые removed slots -> job rescales down
- External component (autoscaler) controls cluster size
Запуск:
jobmanager.scheduler: adaptive
scheduler-mode: reactive
Это упрощение compute — cluster = one job — позволяет clean separation: scheduling logic в Flink, scaling decisions в external system.
Architectural change
Standard Flink:
JobManager (multiple jobs)
+-- Job A (parallelism=20, fixed)
+-- Job B (parallelism=10, fixed)
+-- Job C (parallelism=30, fixed)
Total slots = 60+
External: difficult to right-size, jobs contend
Reactive Mode:
JobManager (single job)
+-- Job (parallelism=current_slots)
Total slots = changes dynamically
External autoscaler:
1. Monitor metrics (CPU, lag, throughput)
2. Decide scale up/down
3. Add/remove TaskManagers
Flink reacts automatically
Configuration
# flink-conf.yaml
# Включить Adaptive Scheduler (required)
jobmanager.scheduler: adaptive
# Reactive Mode
scheduler-mode: reactive
# Job submission
# В Reactive Mode задаётся max parallelism per JobVertex
# Это upper bound; actual parallelism = available slots
# Resource limits
taskmanager.numberOfTaskSlots: 4 # slots per TM
taskmanager.memory.flink.size: 8gb
# Adaptive Scheduler tuning
jobmanager.adaptive-scheduler.resource-wait-timeout: 5min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10s
jobmanager.adaptive-scheduler.scaling-interval.min: 30s
# Что job делает при изменении resources
# Default: trigger rescale immediately
# Tunable: можно delay через stabilization-timeout
Внешний autoscaler — что нужно мониторить
Автоматическое scaling — это decision based на metrics. Какие metrics использовать?
1. CPU utilization
- Самый простой метрика
- Easy to interpret
- But: для Flink CPU не всегда bottleneck
- State I/O, network, GC могут быть bottleneck
- OK для CPU-bound jobs
2. Backpressure metrics
- busyTimeMsPerSecond, backPressureTimeMsPerSecond
- Direct indicator overload
- При backpressure > threshold (например, 500ms/sec) -> scale up
- При backpressure = 0 длительно -> можно scale down
3. Lag в source (Kafka, Pulsar)
- records-lag-max, fetch-lag
- Direct measurement processing capacity
- Growing lag -> behind, scale up
- Lag = 0 -> caught up, may scale down
4. Throughput vs target
- records-in-rate vs target rate
- When records-out-rate < records-in-rate sustained -> backpressure будет
- Predictive metric для scaling
5. State size growth
- State growing -> может требовать more memory/CPU
- При state explosion -> scale up для capacity
6. Custom business metrics
- Per-event processing time
- Specific SLA (event freshness)
- Domain-specific signals
Простой autoscaler: pseudo-code
# Pseudo-code External Autoscaler для Reactive Mode
class FlinkAutoscaler:
def __init__(self, cluster_id, min_slots=4, max_slots=128):
self.cluster_id = cluster_id
self.min_slots = min_slots
self.max_slots = max_slots
self.scale_cooldown = 60 # seconds between decisions
def evaluate(self):
metrics = self.get_metrics()
current_slots = metrics['total_slots']
# Decision rules
if metrics['busy_time_ms_per_sec'] > 800:
# Heavy backpressure -> scale up
target = int(current_slots * 1.5)
elif metrics['kafka_lag_growing']:
# Lag accumulating -> scale up
target = int(current_slots * 1.3)
elif metrics['busy_time_ms_per_sec'] < 100 and \
metrics['kafka_lag'] == 0:
# Idle -> scale down
target = int(current_slots * 0.75)
else:
# Stable -> no change
return
# Apply bounds
target = max(self.min_slots, min(self.max_slots, target))
if target != current_slots:
self.scale_cluster(target)
def scale_cluster(self, target_slots):
target_tms = math.ceil(target_slots / SLOTS_PER_TM)
# K8s: scale Flink Deployment
k8s_api.scale_deployment(
namespace="flink",
name=f"taskmanager-{self.cluster_id}",
replicas=target_tms
)
# Flink Adaptive Scheduler автоматически rescales job
Real production: Grab ML-based autoscaling
Grab (Southeast Asia super-app) использует Flink для real-time analytics, fraud detection, ETA prediction. Описывали свой ML-based autoscaler в blog post 2024.
Background:
- Flink workloads imbalanced throughout day
- Asia traffic: morning rush, evening commute, midnight quiet
- Demand 10x variation between peak и trough
- Static sizing wasteful (peak provisioning) или risky (avg sizing)
- Manual scaling slow и labor-intensive
Solution: ML-based predictive autoscaler
Architecture:
1. Telemetry collection
- Flink metrics (CPU, throughput, lag, backpressure)
- Business metrics (transaction rate, geographic distribution)
- External signals (weather, events, holidays)
2. ML model
- LSTM-based time series prediction
- Trained on 6 months historical data
- Predicts traffic in next 30 minutes
- Updated daily
3. Capacity planner
- Given predicted traffic, computes required slots
- Pre-scales before traffic arrives (vs reactive)
- Considers cold start time (5-10 min для new TMs)
4. K8s integration
- Scales Flink Deployment via K8s API
- Sometimes also scales node pool (cluster autoscaler)
- Reactive Mode handles job rescaling
Results:
- 40% cost reduction (over-provisioning eliminated)
- 99.9% SLA maintenance (predicted scaling)
- 5 minute advance warning для capacity changes
- Reduced manual ops by 80%
ML model architecture (Grab approach)
Input features (per minute):
- Last 60 minutes Flink metrics
* records-in-rate
* records-out-rate
* busy_time_ms
* kafka_lag
- Last 60 minutes business metrics
* transaction count
* unique users
* geographic distribution heatmap
- Time features
* Hour of day, day of week, day of month
* Holiday indicator, special event indicator
* Weather features (для outdoor workloads)
Model:
- LSTM (Long Short-Term Memory)
- 2 LSTM layers (128 units each)
- Dropout 0.2
- Dense output: predicted records-in-rate next 30 min
Training:
- 6 months historical data
- Train/validation/test 70/20/10 split
- Loss: MAE
- Optimizer: Adam
- Update model weekly
Inference:
- Every 5 minutes
- Get latest 60 min features
- Predict next 30 min
- If predicted load > current capacity * 1.2 -> scale up
- If predicted load < current capacity * 0.6 -> scale down
Decision smoothing:
- Confidence interval
- Hysteresis (avoid flapping)
- Cool-down period (min 5 min между scales)
Cold start considerations
Cold start time для new TaskManager:
K8s pod creation: ~30 sec
TaskManager JVM startup: ~30 sec
Class loading: ~5 sec
RegisterTaskExecutor: ~5 sec
First slot allocation: ~10 sec
Total cold start: ~80 sec
При predicted traffic spike в 5 min:
- Decision сейчас (t=0)
- Pre-scale: provision новых TMs
- Warm-up: cold start (~80 sec)
- Ready by t=80 sec
- Spike arrives t=5 min — system ready
vs Reactive-only (без predictive):
- Spike arrives t=0
- Backpressure начинается
- Reactive autoscaler detects (~30 sec)
- Scale up decision
- Cold start 80 sec
- Total response time: ~110 sec from spike start
- Lag accumulated during this time
Predictive ML model даёт advance warning, что критично для streaming jobs где caught up важно.
Sins of Reactive Mode
Reactive Mode не perfect:
1. Single job per cluster
- Может быть wasteful если jobs small
- Multi-tenancy harder
- Cost через multiple clusters
2. Rescaling overhead
- Каждый rescale = savepoint + restore
- С RocksDB local: minutes
- С ForstDB: seconds
- Frequent scaling может delay processing
3. State management
- Skewed data after rescale
- Hot key handling complicated
- Some sinks (Kafka TX) handle poorly
4. Latency spikes during rescale
- Brief halt processing
- Watermark может regress
- Recovery period after rescale
5. Autoscaler complexity
- Requires sophisticated monitoring
- Tuning thresholds non-trivial
- Failures могут cascade (scale down too aggressive -> backpressure -> scale up)
Когда использовать Reactive Mode
Good fit:
1. Variable load patterns
- Daily/weekly cycles
- Event-driven spikes
- Burst workloads
2. Cost optimization priority
- Cloud deployments
- Pay-per-use model
- Spot/preemptible instances
3. Single primary job per cluster
- Main analytics pipeline
- One large fraud detector
- Не множественные small jobs
4. ForstDB или Reactive-friendly state
- Fast rescaling
- Recovery в seconds
Bad fit:
1. Predictable steady load
- Static sizing efficient
- No need для adaptivity
2. Latency-critical (sub-ms)
- Rescale spikes недопустимы
- Static parallelism лучше
3. Multi-tenant requirement
- Не one job per cluster
4. Complex coordination requirements
- Tightly coupled jobs
- Cross-job dependencies
Operational gotchas
1. Scaling thrashing
- Too aggressive thresholds
- Не дать stabilize между scales
- Mitigation: cooldown period, hysteresis
2. Cluster autoscaler interaction
- K8s cluster autoscaler провисion новых nodes
- Latency: 2-5 min для new nodes
- Need integration с Flink autoscaler
3. State backend choice
- RocksDB rescale = minutes
- Reactive frequent rescales = slow
- ForstDB strongly recommended
4. Sink coordination
- Kafka transactional sink: рекс должен sync
- Some sinks: pause/resume не gracefully
- Test specific sinks при rescale
5. Watermark coordination
- After rescale: watermarks могут regress
- Pipeline.watermark-alignment helps
- Може require waiting recovery period
В Kubernetes Flink Operator (Apache Flink Kubernetes Operator) есть built-in autoscaler — Flink Autoscaler. Это production-ready решение, не нужно писать свой. Поддерживает CPU, throughput, lag-based decisions. См. урок 4 этого модуля для деталей.
Чтение source
Source:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/
AdaptiveScheduler.java
-- supports Reactive Mode когда scheduler-mode: reactive
flink-kubernetes-operator/
src/main/java/org/apache/flink/kubernetes/operator/autoscaler/
AutoScaler.java
-- production K8s autoscaler
Production case studies:
Grab blog: "Machine learning-driven autoscaling for Apache Flink"
Netflix blog: "Cell-based architecture for Flink"
Pinterest blog: "Real-time pipelines с auto-scaling Flink"