Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 24 мин
Продвинутый
Reactive ModeAutoscalingSingle JobExternal AutoscalerML AutoscalingGrab

Reactive Mode — autoscaling режим для Flink

Reactive Mode — это специальный mode operation, который превращает Flink cluster в полностью elastic single-job system. На фоне Adaptive Scheduler (foundation), Reactive Mode добавляет правила: один job per cluster, всегда использует ВСЕ available resources. Это превращает Flink в систему, которой может управлять external autoscaler — добавляешь nodes, job растёт; удаляешь — job уменьшается.

В этом уроке мы разбираем Reactive Mode внутренности, конфиги, и реальный production example: ML-based autoscaling в Grab (one of largest Flink users in Asia).

KEDA и HPA-based autoscaling для Flink

Что такое Reactive Mode

Default Flink cluster:
  - Multiple jobs может работать одновременно
  - Each job requires specific parallelism
  - Resource allocation shared
  - Manual scaling intervention

Reactive Mode cluster:
  - SINGLE job per cluster
  - Job parallelism = total available slots
  - Любые added slots -> автоматически использованы
  - Любые removed slots -> job rescales down
  - External component (autoscaler) controls cluster size

Запуск:
  jobmanager.scheduler: adaptive
  scheduler-mode: reactive

Это упрощение compute — cluster = one job — позволяет clean separation: scheduling logic в Flink, scaling decisions в external system.

Architectural change

Standard Flink:
  JobManager (multiple jobs)
    +-- Job A (parallelism=20, fixed)
    +-- Job B (parallelism=10, fixed)
    +-- Job C (parallelism=30, fixed)
  Total slots = 60+
  External: difficult to right-size, jobs contend

Reactive Mode:
  JobManager (single job)
    +-- Job (parallelism=current_slots)
  Total slots = changes dynamically
  External autoscaler:
    1. Monitor metrics (CPU, lag, throughput)
    2. Decide scale up/down
    3. Add/remove TaskManagers
  Flink reacts automatically

Configuration

# flink-conf.yaml

# Включить Adaptive Scheduler (required)
jobmanager.scheduler: adaptive

# Reactive Mode
scheduler-mode: reactive

# Job submission
# В Reactive Mode задаётся max parallelism per JobVertex
# Это upper bound; actual parallelism = available slots

# Resource limits
taskmanager.numberOfTaskSlots: 4    # slots per TM
taskmanager.memory.flink.size: 8gb

# Adaptive Scheduler tuning
jobmanager.adaptive-scheduler.resource-wait-timeout: 5min
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10s
jobmanager.adaptive-scheduler.scaling-interval.min: 30s

# Что job делает при изменении resources
# Default: trigger rescale immediately
# Tunable: можно delay через stabilization-timeout

Внешний autoscaler — что нужно мониторить

Автоматическое scaling — это decision based на metrics. Какие metrics использовать?

1. CPU utilization
   - Самый простой метрика
   - Easy to interpret
   - But: для Flink CPU не всегда bottleneck
   - State I/O, network, GC могут быть bottleneck
   - OK для CPU-bound jobs

2. Backpressure metrics
   - busyTimeMsPerSecond, backPressureTimeMsPerSecond
   - Direct indicator overload
   - При backpressure > threshold (например, 500ms/sec) -> scale up
   - При backpressure = 0 длительно -> можно scale down

3. Lag в source (Kafka, Pulsar)
   - records-lag-max, fetch-lag
   - Direct measurement processing capacity
   - Growing lag -> behind, scale up
   - Lag = 0 -> caught up, may scale down

4. Throughput vs target
   - records-in-rate vs target rate
   - When records-out-rate < records-in-rate sustained -> backpressure будет
   - Predictive metric для scaling

5. State size growth
   - State growing -> может требовать more memory/CPU
   - При state explosion -> scale up для capacity

6. Custom business metrics
   - Per-event processing time
   - Specific SLA (event freshness)
   - Domain-specific signals

Простой autoscaler: pseudo-code

# Pseudo-code External Autoscaler для Reactive Mode

class FlinkAutoscaler:
    def __init__(self, cluster_id, min_slots=4, max_slots=128):
        self.cluster_id = cluster_id
        self.min_slots = min_slots
        self.max_slots = max_slots
        self.scale_cooldown = 60  # seconds between decisions
    
    def evaluate(self):
        metrics = self.get_metrics()
        current_slots = metrics['total_slots']
        
        # Decision rules
        if metrics['busy_time_ms_per_sec'] > 800:
            # Heavy backpressure -> scale up
            target = int(current_slots * 1.5)
        elif metrics['kafka_lag_growing']:
            # Lag accumulating -> scale up
            target = int(current_slots * 1.3)
        elif metrics['busy_time_ms_per_sec'] < 100 and \
             metrics['kafka_lag'] == 0:
            # Idle -> scale down
            target = int(current_slots * 0.75)
        else:
            # Stable -> no change
            return
        
        # Apply bounds
        target = max(self.min_slots, min(self.max_slots, target))
        
        if target != current_slots:
            self.scale_cluster(target)
    
    def scale_cluster(self, target_slots):
        target_tms = math.ceil(target_slots / SLOTS_PER_TM)
        # K8s: scale Flink Deployment
        k8s_api.scale_deployment(
            namespace="flink",
            name=f"taskmanager-{self.cluster_id}",
            replicas=target_tms
        )
        # Flink Adaptive Scheduler автоматически rescales job

Real production: Grab ML-based autoscaling

Grab (Southeast Asia super-app) использует Flink для real-time analytics, fraud detection, ETA prediction. Описывали свой ML-based autoscaler в blog post 2024.

Background:
  - Flink workloads imbalanced throughout day
  - Asia traffic: morning rush, evening commute, midnight quiet
  - Demand 10x variation between peak и trough
  - Static sizing wasteful (peak provisioning) или risky (avg sizing)
  - Manual scaling slow и labor-intensive

Solution: ML-based predictive autoscaler

Architecture:
  1. Telemetry collection
     - Flink metrics (CPU, throughput, lag, backpressure)
     - Business metrics (transaction rate, geographic distribution)
     - External signals (weather, events, holidays)
  
  2. ML model
     - LSTM-based time series prediction
     - Trained on 6 months historical data
     - Predicts traffic in next 30 minutes
     - Updated daily
  
  3. Capacity planner
     - Given predicted traffic, computes required slots
     - Pre-scales before traffic arrives (vs reactive)
     - Considers cold start time (5-10 min для new TMs)
  
  4. K8s integration
     - Scales Flink Deployment via K8s API
     - Sometimes also scales node pool (cluster autoscaler)
     - Reactive Mode handles job rescaling

Results:
  - 40% cost reduction (over-provisioning eliminated)
  - 99.9% SLA maintenance (predicted scaling)
  - 5 minute advance warning для capacity changes
  - Reduced manual ops by 80%

ML model architecture (Grab approach)

Input features (per minute):
  - Last 60 minutes Flink metrics
    * records-in-rate
    * records-out-rate
    * busy_time_ms
    * kafka_lag
  - Last 60 minutes business metrics
    * transaction count
    * unique users
    * geographic distribution heatmap
  - Time features
    * Hour of day, day of week, day of month
    * Holiday indicator, special event indicator
    * Weather features (для outdoor workloads)

Model:
  - LSTM (Long Short-Term Memory)
  - 2 LSTM layers (128 units each)
  - Dropout 0.2
  - Dense output: predicted records-in-rate next 30 min

Training:
  - 6 months historical data
  - Train/validation/test 70/20/10 split
  - Loss: MAE
  - Optimizer: Adam
  - Update model weekly

Inference:
  - Every 5 minutes
  - Get latest 60 min features
  - Predict next 30 min
  - If predicted load > current capacity * 1.2 -> scale up
  - If predicted load < current capacity * 0.6 -> scale down

Decision smoothing:
  - Confidence interval
  - Hysteresis (avoid flapping)
  - Cool-down period (min 5 min между scales)
Reactive Mode + ML Autoscaler architecture
Kafka eventsSource events: real-time stream events (Kafka). Volume varies hourly через day.
consume
Flink Cluster (Reactive)Flink cluster в Reactive Mode. Single job uses ALL available slots. Job parallelism = current cluster size.
output
SinksDownstream sinks: Kafka, ClickHouse, Cassandra, etc. Latency target dependent.
Metrics PipelineMetrics collection: Flink JMX, Prometheus, custom metrics. Аккумулирует за минуты для ML model.
features
ML Predictor (LSTM)LSTM ML model: predicts next 30 min throughput. Trained weekly на historical data.
prediction
Capacity PlannerCapacity planner: predicted throughput -> required slots. С учётом cold start time, hysteresis.
K8s Deployment ScalerK8s API: scales Flink TaskManager Deployment. May trigger cluster autoscaler (node provisioning).
add/remove TMs
TaskManager PoolNew TaskManagers come online (или existing ones removed). Flink Adaptive Scheduler reacts.
adaptive rescale
Flink job adaptsFlink rescales job parallelism через savepoint+restore. С ForstDB transparent в секунды.

Cold start considerations

Cold start time для new TaskManager:

K8s pod creation:                ~30 sec
TaskManager JVM startup:         ~30 sec
Class loading:                   ~5 sec
RegisterTaskExecutor:           ~5 sec
First slot allocation:           ~10 sec
Total cold start:                ~80 sec

При predicted traffic spike в 5 min:
  - Decision сейчас (t=0)
  - Pre-scale: provision новых TMs
  - Warm-up: cold start (~80 sec)
  - Ready by t=80 sec
  - Spike arrives t=5 min — system ready

vs Reactive-only (без predictive):
  - Spike arrives t=0
  - Backpressure начинается
  - Reactive autoscaler detects (~30 sec)
  - Scale up decision
  - Cold start 80 sec
  - Total response time: ~110 sec from spike start
  - Lag accumulated during this time

Predictive ML model даёт advance warning, что критично для streaming jobs где caught up важно.

Sins of Reactive Mode

Reactive Mode не perfect:

1. Single job per cluster
   - Может быть wasteful если jobs small
   - Multi-tenancy harder
   - Cost через multiple clusters

2. Rescaling overhead
   - Каждый rescale = savepoint + restore
   - С RocksDB local: minutes
   - С ForstDB: seconds
   - Frequent scaling может delay processing

3. State management
   - Skewed data after rescale
   - Hot key handling complicated
   - Some sinks (Kafka TX) handle poorly

4. Latency spikes during rescale
   - Brief halt processing
   - Watermark может regress
   - Recovery period after rescale

5. Autoscaler complexity
   - Requires sophisticated monitoring
   - Tuning thresholds non-trivial
   - Failures могут cascade (scale down too aggressive -> backpressure -> scale up)

Когда использовать Reactive Mode

Good fit:

1. Variable load patterns
   - Daily/weekly cycles
   - Event-driven spikes
   - Burst workloads

2. Cost optimization priority
   - Cloud deployments
   - Pay-per-use model
   - Spot/preemptible instances

3. Single primary job per cluster
   - Main analytics pipeline
   - One large fraud detector
   - Не множественные small jobs

4. ForstDB или Reactive-friendly state
   - Fast rescaling
   - Recovery в seconds

Bad fit:

1. Predictable steady load
   - Static sizing efficient
   - No need для adaptivity

2. Latency-critical (sub-ms)
   - Rescale spikes недопустимы
   - Static parallelism лучше

3. Multi-tenant requirement
   - Не one job per cluster

4. Complex coordination requirements
   - Tightly coupled jobs
   - Cross-job dependencies

Operational gotchas

1. Scaling thrashing
   - Too aggressive thresholds
   - Не дать stabilize между scales
   - Mitigation: cooldown period, hysteresis

2. Cluster autoscaler interaction
   - K8s cluster autoscaler провисion новых nodes
   - Latency: 2-5 min для new nodes
   - Need integration с Flink autoscaler

3. State backend choice
   - RocksDB rescale = minutes
   - Reactive frequent rescales = slow
   - ForstDB strongly recommended

4. Sink coordination
   - Kafka transactional sink: рекс должен sync
   - Some sinks: pause/resume не gracefully
   - Test specific sinks при rescale

5. Watermark coordination
   - After rescale: watermarks могут regress
   - Pipeline.watermark-alignment helps
   - Може require waiting recovery period
TIP

В Kubernetes Flink Operator (Apache Flink Kubernetes Operator) есть built-in autoscaler — Flink Autoscaler. Это production-ready решение, не нужно писать свой. Поддерживает CPU, throughput, lag-based decisions. См. урок 4 этого модуля для деталей.

Чтение source

Source:
  flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/
    AdaptiveScheduler.java
    -- supports Reactive Mode когда scheduler-mode: reactive

  flink-kubernetes-operator/
    src/main/java/org/apache/flink/kubernetes/operator/autoscaler/
    AutoScaler.java
    -- production K8s autoscaler

Production case studies:
  Grab blog: "Machine learning-driven autoscaling for Apache Flink"
  Netflix blog: "Cell-based architecture for Flink"
  Pinterest blog: "Real-time pipelines с auto-scaling Flink"
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие правила определяют Reactive Mode по сравнению с обычным Adaptive Scheduler?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4