Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 25 мин
Продвинутый
Flink AutoscalerK8s OperatorSavepoint ScalingProductionMonitoringDecision Algorithm

Flink Autoscaler — production practice в K8s

Apache Flink Kubernetes Operator (с версии 1.7) включает built-in Flink Autoscaler — production-ready решение для autoscaling Flink jobs. Это не просто wrapper над Reactive Mode — это sophisticated decision engine с metric collection, scaling decisions, savepoint coordination.

В этом финальном уроке модуля разбираем, как Flink Autoscaler работает в production: какие metrics использует, как принимает решения, какие gotchas в реальной practice, и как настроить для своих workloads.

Performance tuning Kafka в production

Architecture overview

Apache Flink Kubernetes Operator:
  - K8s native operator для Flink jobs
  - Manages FlinkDeployment resources (CRDs)
  - Built-in Autoscaler module (с 1.7)

Architecture:
  FlinkDeployment CR (job spec)
    +-- JobManager Pod
    +-- TaskManager Deployment (scalable)
    +-- Autoscaler reconciler
        +-- Collects metrics
        +-- Computes scaling decisions
        +-- Triggers savepoint-based rescaling

Autoscaler работает как часть operator, не как отдельный процесс. Это упрощает deployment и coordination.

Decision algorithm

Algorithm более sophisticated чем simple “CPU > 80% scale up”:

Two main metrics для каждого vertex:

1. processingRate (records/sec):
   - Сколько events vertex может process с current parallelism
   - Measured за scaling-decision-window

2. targetProcessingRate:
   - Сколько records/sec нужно для caught-up
   - Computed from upstream rate + backlog

Decision:
  required_parallelism = ceil(targetProcessingRate / (processingRate / current_parallelism))

Example:
  Vertex: aggregate operator
  current_parallelism: 8
  processingRate: 50,000 records/sec (per current setup)
  Per-task: 50000/8 = 6,250 records/sec
  
  Upstream rate: 80,000 records/sec
  Lag growing: yes, accumulated 1M records
  Target rate: 90,000 records/sec (clear backlog в 5 min)
  
  required = ceil(90000 / 6250) = 15 tasks
  New parallelism: 15

Это smart — фactors в backlog, не просто instantaneous load.

Configuration

# FlinkDeployment manifest
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: my-flink-job
spec:
  image: flink:2.2
  flinkVersion: v2_2
  
  flinkConfiguration:
    # Adaptive Scheduler
    jobmanager.scheduler: adaptive
    
    # Autoscaler configuration
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 5m
    job.autoscaler.metrics.window: 10m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    
    # Per-vertex parallelism bounds
    job.autoscaler.vertex.min-parallelism: "1"
    job.autoscaler.vertex.max-parallelism: "100"
    
    # Cooldown
    job.autoscaler.scale.up.grace-period: 1h
    job.autoscaler.scale.down.grace-period: 10m

  taskManager:
    resource:
      memory: "8gb"
      cpu: 2
    replicas: 4    # initial
    
  job:
    jarURI: ...
    parallelism: 4   # initial
    upgradeMode: savepoint

Detailed metrics

Per-vertex metrics:

1. backpressure:
   - busyTimeMsPerSecond (percentage времени busy)
   - softBackPressureTimeMsPerSecond (waiting на downstream)
   - hardBackPressureTimeMsPerSecond (channel buffer full)

2. throughput:
   - numRecordsInPerSecond
   - numRecordsOutPerSecond
   - numLateRecordsDropped (если processing time issues)

3. processing:
   - processingRate (computed: out / time when busy)
   - parallelism (current)
   - true-output-rate-per-task

4. state:
   - state-size
   - state-size-growth

5. job-level:
   - lastCheckpointDuration
   - numberOfRestarts
   - uptime

Этот rich data позволяет autoscaler делать informed decisions, не reactive guessing.

Scaling decision flow

Autoscaler reconciliation loop (every 1 min):

1. Collect metrics за scaling-decision-window (10 min default)
   - Average rates
   - Median values для smoothing
   - Skip spikes (outliers)

2. Compute target utilization
   - Target: 0.6 (60% loaded -> comfortable)
   - Boundary: 0.2 (±20% acceptable range)
   - Outside [0.4, 0.8] -> consider scaling

3. Per-vertex decision
   - Foreach vertex in job:
     compute required parallelism
   - Check bounds (min, max)
   - Consider stability (grace periods)

4. Cluster-level coordination
   - Total slots required = max(vertex parallelisms)
   - If > current cluster size -> scale up cluster
   - If < current -> scale down cluster

5. Apply decisions
   - Update FlinkDeployment parallelism specs
   - Trigger savepoint
   - Cancel job
   - Reapply с new parallelism
   - Restore from savepoint
   - Resume processing

6. Monitor recovery
   - Verify job started
   - Verify watermark recovering
   - Verify throughput restored

Savepoint-based scaling

Critical: scaling в Flink Autoscaler идёт через savepoint (не просто rescale). Это safer но more expensive:

Why savepoint vs rescale:

Pros savepoint:
  - Clean restart с new parallelism
  - State backend-agnostic
  - Compatible с любыми sinks/sources
  - Predictable behavior

Cons savepoint:
  - Downtime: savepoint duration + restart
  - С RocksDB: minutes
  - С ForstDB: 30-60 sec
  - Frequent scaling = более downtime

Decision factors:
  - Scaling frequency: rare -> savepoint OK
  - Latency requirements: tight -> minimize scaling
  - State backend: ForstDB minimizes overhead

Real production deployment

Production setup example (medium ecommerce real-time analytics):

K8s cluster:
  - 50 nodes (m6i.4xlarge, 16 vCPU, 64 GB)
  - K8s autoscaler (cluster-level)
  - Flink Operator deployed

Flink job:
  - Parallelism: dynamic (4-100)
  - State: ForstDB, 300 GB state
  - Sources: 2x Kafka topics, ~50K events/sec peak
  - Sinks: Kafka, ClickHouse

Autoscaler config:
  job.autoscaler.target.utilization: "0.7"
  job.autoscaler.stabilization.interval: 5m
  job.autoscaler.metrics.window: 10m
  job.autoscaler.scale.up.grace-period: 30m
  job.autoscaler.scale.down.grace-period: 10m

Daily pattern (in production):
  00:00 - 06:00: low load, scale down к min=8
  06:00 - 09:00: morning ramp, scale up к 40
  09:00 - 18:00: business hours, scale to ~50-70
  18:00 - 23:00: evening high, scale to peak ~85
  23:00 - 00:00: cool down к 20

Cost impact:
  - Static p=100 sizing: $5000/мес
  - Dynamic scaling avg p=50: $2500/мес
  - Savings: $2500/мес (50%)

Latency impact:
  - Static: avg 100 ms
  - Dynamic: avg 120 ms (small overhead at scale events)
  - Acceptable trade-off

Monitoring и alerting

Critical metrics для monitoring:

1. Autoscaler health:
   - autoscaler.scale.events.count (per direction)
   - autoscaler.scale.duration (savepoint + restart time)
   - autoscaler.evaluation.errors

2. Job health после scaling:
   - First checkpoint after rescale
   - Watermark recovery time
   - Throughput restoration

3. Cluster health:
   - K8s pod failures
   - TaskManager restarts
   - Memory pressure

4. Cost metrics:
   - Average parallelism over time
   - Cluster size over time
   - Cost per million events processed

Alerts:
  - Frequent scaling (>10 events/hour) — possible thrashing
  - Scaling stuck (>30 min без progress)
  - Job degradation after scale (lag growing)
  - Autoscaler errors
Flink Autoscaler decision loop
Reconciler tickReconciler triggered (default every 1 min). Collects metrics over scaling-decision-window (10 min).
collect metrics
Aggregated metricsMetrics: backpressure, throughput, lag, processing rate, parallelism per vertex. Median values to avoid spikes.
Compute new parallelismPer-vertex decision: required = ceil(target_rate / per_task_rate). Bounded by min/max. Skips if within stability boundary (0.4-0.8 default).
if changed
Check grace periodsCheck grace periods: scale.up.grace-period (30 min default), scale.down.grace-period (10 min). Prevent thrashing.
Trigger savepointTrigger savepoint. Duration depends on state backend: RocksDB minutes, ForstDB 30-60 sec. Job stops during savepoint.
save
Update spec + restartUpdate FlinkDeployment spec с new parallelism. K8s controller reconciles. Cancel job, scale TaskManager Deployment.
restore
Job restarts at new sizeJob restarts с new parallelism, restores from savepoint. State distributed по new key groups. Watermark recovery period.

Gotchas в production

1. Hot key skew
   Problem: одна subtask имеет 10x больше load
   Symptom: autoscaler scales up cluster, но одна subtask remains bottleneck
   Solution:
     - SplitAggregateRule для distinct
     - Salt keys для distribution
     - Per-vertex max-parallelism

2. Scale-down too aggressive
   Problem: cluster scales down at low load, then traffic returns
   Symptom: lag accumulates, no time для caught up
   Solution:
     - Longer scale-down.grace-period
     - Larger target.utilization.boundary
     - Predictive scaling (как Grab ML)

3. Frequent scaling (thrashing)
   Problem: workload oscillates around threshold
   Symptom: 20+ scale events per hour, constant overhead
   Solution:
     - Larger stabilization.interval (10-15 min)
     - Smoothed metrics (median over longer window)
     - Hysteresis в decisions

4. Savepoint failures
   Problem: savepoint times out, autoscaler fails decision
   Symptom: scaling blocked, manual intervention needed
   Solution:
     - Increase savepoint timeout
     - Switch к ForstDB (faster savepoints)
     - Tune checkpoint interval

5. State migration after scale
   Problem: new parallelism не distributes state evenly
   Symptom: некоторые tasks idle, others overloaded
   Solution:
     - Verify key groups divisible
     - Use max-parallelism multiple of common factors
     - Monitor per-subtask metrics

6. Source/sink coordination
   Problem: Kafka source partitions не match parallelism
   Symptom: некоторые tasks читают multiple partitions, uneven
   Solution:
     - Number Kafka partitions matches max parallelism
     - Use partition assigning strategies
     - Some sinks (Iceberg) coordinate writers

Comparison с Reactive Mode

Reactive Mode:
  - Built-in Flink (no external operator)
  - Single job per cluster
  - Uses all available slots
  - External actor (autoscaler) scales cluster
  - You build the autoscaler

Flink Autoscaler в K8s Operator:
  - Production-ready full solution
  - Multiple jobs supported (each independently scaled)
  - Vertex-level parallelism decisions
  - Sophisticated decision engine
  - Pre-built и tested

Use Reactive Mode when:
  - You want simplicity
  - External autoscaler уже existing (Grab ML)
  - Custom decision logic

Use Flink Autoscaler when:
  - Want full solution
  - K8s native deployment
  - Need vertex-level control
  - Less custom code preferred

Manual override и hints

Autoscaler можно override:

# Disable autoscaling temporarily
job.autoscaler.enabled: "false"

# Force specific parallelism (override autoscaler)
spec:
  job:
    parallelism: 50

# Vertex-specific bounds
job.autoscaler.vertex.bottleneck-vertex.min-parallelism: "20"
job.autoscaler.vertex.bottleneck-vertex.max-parallelism: "50"

# Per-vertex weight (priority)
job.autoscaler.vertex.bottleneck-vertex.scaling-priority: "high"

Hints для autoscaler через job annotations:

# Через Annotation в FlinkDeployment
metadata:
  annotations:
    flink.k8s.io/autoscaler.scaling-priority: "high"
    flink.k8s.io/autoscaler.allowed-utilization-range: "0.5-0.8"

Cost optimization patterns

1. Spot/preemptible instances
   - K8s autoscaler использует spot для TaskManagers
   - Flink Autoscaler handles preemption gracefully
   - 60-80% cost savings vs on-demand
   - Risk: tasks могут быть restarted

2. Multi-zone deployment
   - TaskManagers spread across AZs
   - Flink fault tolerance handles zone failure
   - Better availability + cost через spot

3. Schedule-based scaling
   - Predictable patterns (business hours)
   - Cron-based pre-scaling
   - Combine с reactive autoscaler

4. Pool reuse
   - K8s NodePool persistent
   - Fast TaskManager startup (no node provisioning)
   - Cost: persistent baseline + variable burst

5. Right-sizing state backend
   - ForstDB: cache size sufficient для working set
   - Avoid over-provision cache (RAM waste)
   - Monitor cache hit rate

Чтение source

Flink Kubernetes Operator:
  https://github.com/apache/flink-kubernetes-operator
  
Source:
  flink-kubernetes-operator/
    src/main/java/org/apache/flink/kubernetes/operator/autoscaler/
      AutoScaler.java                    -- main autoscaler logic
      ScalingMetricCollector.java
      ScalingExecutor.java
      JobAutoScalerImpl.java
    
  src/main/java/org/apache/flink/kubernetes/operator/api/
    spec/AutoscalerConfig.java          -- configuration

Documentation:
  https://nightlies.apache.org/flink/flink-kubernetes-operator-docs/
  "Autoscaler" section

Production blogs:
  Apple: "Autoscaling Flink в Kubernetes for cost optimization"
  Pinterest: "Real-time pipelines autoscaling"
  Apache Flink blog: "Flink Autoscaler in Operator"
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Как Flink Autoscaler в K8s Operator принимает решение о parallelism для vertex?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4