Flink Autoscaler — production practice в K8s
Apache Flink Kubernetes Operator (с версии 1.7) включает built-in Flink Autoscaler — production-ready решение для autoscaling Flink jobs. Это не просто wrapper над Reactive Mode — это sophisticated decision engine с metric collection, scaling decisions, savepoint coordination.
В этом финальном уроке модуля разбираем, как Flink Autoscaler работает в production: какие metrics использует, как принимает решения, какие gotchas в реальной practice, и как настроить для своих workloads.
Performance tuning Kafka в productionArchitecture overview
Apache Flink Kubernetes Operator:
- K8s native operator для Flink jobs
- Manages FlinkDeployment resources (CRDs)
- Built-in Autoscaler module (с 1.7)
Architecture:
FlinkDeployment CR (job spec)
+-- JobManager Pod
+-- TaskManager Deployment (scalable)
+-- Autoscaler reconciler
+-- Collects metrics
+-- Computes scaling decisions
+-- Triggers savepoint-based rescaling
Autoscaler работает как часть operator, не как отдельный процесс. Это упрощает deployment и coordination.
Decision algorithm
Algorithm более sophisticated чем simple “CPU > 80% scale up”:
Two main metrics для каждого vertex:
1. processingRate (records/sec):
- Сколько events vertex может process с current parallelism
- Measured за scaling-decision-window
2. targetProcessingRate:
- Сколько records/sec нужно для caught-up
- Computed from upstream rate + backlog
Decision:
required_parallelism = ceil(targetProcessingRate / (processingRate / current_parallelism))
Example:
Vertex: aggregate operator
current_parallelism: 8
processingRate: 50,000 records/sec (per current setup)
Per-task: 50000/8 = 6,250 records/sec
Upstream rate: 80,000 records/sec
Lag growing: yes, accumulated 1M records
Target rate: 90,000 records/sec (clear backlog в 5 min)
required = ceil(90000 / 6250) = 15 tasks
New parallelism: 15
Это smart — фactors в backlog, не просто instantaneous load.
Configuration
# FlinkDeployment manifest
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-flink-job
spec:
image: flink:2.2
flinkVersion: v2_2
flinkConfiguration:
# Adaptive Scheduler
jobmanager.scheduler: adaptive
# Autoscaler configuration
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 5m
job.autoscaler.metrics.window: 10m
job.autoscaler.target.utilization: "0.6"
job.autoscaler.target.utilization.boundary: "0.2"
# Per-vertex parallelism bounds
job.autoscaler.vertex.min-parallelism: "1"
job.autoscaler.vertex.max-parallelism: "100"
# Cooldown
job.autoscaler.scale.up.grace-period: 1h
job.autoscaler.scale.down.grace-period: 10m
taskManager:
resource:
memory: "8gb"
cpu: 2
replicas: 4 # initial
job:
jarURI: ...
parallelism: 4 # initial
upgradeMode: savepoint
Detailed metrics
Per-vertex metrics:
1. backpressure:
- busyTimeMsPerSecond (percentage времени busy)
- softBackPressureTimeMsPerSecond (waiting на downstream)
- hardBackPressureTimeMsPerSecond (channel buffer full)
2. throughput:
- numRecordsInPerSecond
- numRecordsOutPerSecond
- numLateRecordsDropped (если processing time issues)
3. processing:
- processingRate (computed: out / time when busy)
- parallelism (current)
- true-output-rate-per-task
4. state:
- state-size
- state-size-growth
5. job-level:
- lastCheckpointDuration
- numberOfRestarts
- uptime
Этот rich data позволяет autoscaler делать informed decisions, не reactive guessing.
Scaling decision flow
Autoscaler reconciliation loop (every 1 min):
1. Collect metrics за scaling-decision-window (10 min default)
- Average rates
- Median values для smoothing
- Skip spikes (outliers)
2. Compute target utilization
- Target: 0.6 (60% loaded -> comfortable)
- Boundary: 0.2 (±20% acceptable range)
- Outside [0.4, 0.8] -> consider scaling
3. Per-vertex decision
- Foreach vertex in job:
compute required parallelism
- Check bounds (min, max)
- Consider stability (grace periods)
4. Cluster-level coordination
- Total slots required = max(vertex parallelisms)
- If > current cluster size -> scale up cluster
- If < current -> scale down cluster
5. Apply decisions
- Update FlinkDeployment parallelism specs
- Trigger savepoint
- Cancel job
- Reapply с new parallelism
- Restore from savepoint
- Resume processing
6. Monitor recovery
- Verify job started
- Verify watermark recovering
- Verify throughput restored
Savepoint-based scaling
Critical: scaling в Flink Autoscaler идёт через savepoint (не просто rescale). Это safer но more expensive:
Why savepoint vs rescale:
Pros savepoint:
- Clean restart с new parallelism
- State backend-agnostic
- Compatible с любыми sinks/sources
- Predictable behavior
Cons savepoint:
- Downtime: savepoint duration + restart
- С RocksDB: minutes
- С ForstDB: 30-60 sec
- Frequent scaling = более downtime
Decision factors:
- Scaling frequency: rare -> savepoint OK
- Latency requirements: tight -> minimize scaling
- State backend: ForstDB minimizes overhead
Real production deployment
Production setup example (medium ecommerce real-time analytics):
K8s cluster:
- 50 nodes (m6i.4xlarge, 16 vCPU, 64 GB)
- K8s autoscaler (cluster-level)
- Flink Operator deployed
Flink job:
- Parallelism: dynamic (4-100)
- State: ForstDB, 300 GB state
- Sources: 2x Kafka topics, ~50K events/sec peak
- Sinks: Kafka, ClickHouse
Autoscaler config:
job.autoscaler.target.utilization: "0.7"
job.autoscaler.stabilization.interval: 5m
job.autoscaler.metrics.window: 10m
job.autoscaler.scale.up.grace-period: 30m
job.autoscaler.scale.down.grace-period: 10m
Daily pattern (in production):
00:00 - 06:00: low load, scale down к min=8
06:00 - 09:00: morning ramp, scale up к 40
09:00 - 18:00: business hours, scale to ~50-70
18:00 - 23:00: evening high, scale to peak ~85
23:00 - 00:00: cool down к 20
Cost impact:
- Static p=100 sizing: $5000/мес
- Dynamic scaling avg p=50: $2500/мес
- Savings: $2500/мес (50%)
Latency impact:
- Static: avg 100 ms
- Dynamic: avg 120 ms (small overhead at scale events)
- Acceptable trade-off
Monitoring и alerting
Critical metrics для monitoring:
1. Autoscaler health:
- autoscaler.scale.events.count (per direction)
- autoscaler.scale.duration (savepoint + restart time)
- autoscaler.evaluation.errors
2. Job health после scaling:
- First checkpoint after rescale
- Watermark recovery time
- Throughput restoration
3. Cluster health:
- K8s pod failures
- TaskManager restarts
- Memory pressure
4. Cost metrics:
- Average parallelism over time
- Cluster size over time
- Cost per million events processed
Alerts:
- Frequent scaling (>10 events/hour) — possible thrashing
- Scaling stuck (>30 min без progress)
- Job degradation after scale (lag growing)
- Autoscaler errors
Gotchas в production
1. Hot key skew
Problem: одна subtask имеет 10x больше load
Symptom: autoscaler scales up cluster, но одна subtask remains bottleneck
Solution:
- SplitAggregateRule для distinct
- Salt keys для distribution
- Per-vertex max-parallelism
2. Scale-down too aggressive
Problem: cluster scales down at low load, then traffic returns
Symptom: lag accumulates, no time для caught up
Solution:
- Longer scale-down.grace-period
- Larger target.utilization.boundary
- Predictive scaling (как Grab ML)
3. Frequent scaling (thrashing)
Problem: workload oscillates around threshold
Symptom: 20+ scale events per hour, constant overhead
Solution:
- Larger stabilization.interval (10-15 min)
- Smoothed metrics (median over longer window)
- Hysteresis в decisions
4. Savepoint failures
Problem: savepoint times out, autoscaler fails decision
Symptom: scaling blocked, manual intervention needed
Solution:
- Increase savepoint timeout
- Switch к ForstDB (faster savepoints)
- Tune checkpoint interval
5. State migration after scale
Problem: new parallelism не distributes state evenly
Symptom: некоторые tasks idle, others overloaded
Solution:
- Verify key groups divisible
- Use max-parallelism multiple of common factors
- Monitor per-subtask metrics
6. Source/sink coordination
Problem: Kafka source partitions не match parallelism
Symptom: некоторые tasks читают multiple partitions, uneven
Solution:
- Number Kafka partitions matches max parallelism
- Use partition assigning strategies
- Some sinks (Iceberg) coordinate writers
Comparison с Reactive Mode
Reactive Mode:
- Built-in Flink (no external operator)
- Single job per cluster
- Uses all available slots
- External actor (autoscaler) scales cluster
- You build the autoscaler
Flink Autoscaler в K8s Operator:
- Production-ready full solution
- Multiple jobs supported (each independently scaled)
- Vertex-level parallelism decisions
- Sophisticated decision engine
- Pre-built и tested
Use Reactive Mode when:
- You want simplicity
- External autoscaler уже existing (Grab ML)
- Custom decision logic
Use Flink Autoscaler when:
- Want full solution
- K8s native deployment
- Need vertex-level control
- Less custom code preferred
Manual override и hints
Autoscaler можно override:
# Disable autoscaling temporarily
job.autoscaler.enabled: "false"
# Force specific parallelism (override autoscaler)
spec:
job:
parallelism: 50
# Vertex-specific bounds
job.autoscaler.vertex.bottleneck-vertex.min-parallelism: "20"
job.autoscaler.vertex.bottleneck-vertex.max-parallelism: "50"
# Per-vertex weight (priority)
job.autoscaler.vertex.bottleneck-vertex.scaling-priority: "high"
Hints для autoscaler через job annotations:
# Через Annotation в FlinkDeployment
metadata:
annotations:
flink.k8s.io/autoscaler.scaling-priority: "high"
flink.k8s.io/autoscaler.allowed-utilization-range: "0.5-0.8"
Cost optimization patterns
1. Spot/preemptible instances
- K8s autoscaler использует spot для TaskManagers
- Flink Autoscaler handles preemption gracefully
- 60-80% cost savings vs on-demand
- Risk: tasks могут быть restarted
2. Multi-zone deployment
- TaskManagers spread across AZs
- Flink fault tolerance handles zone failure
- Better availability + cost через spot
3. Schedule-based scaling
- Predictable patterns (business hours)
- Cron-based pre-scaling
- Combine с reactive autoscaler
4. Pool reuse
- K8s NodePool persistent
- Fast TaskManager startup (no node provisioning)
- Cost: persistent baseline + variable burst
5. Right-sizing state backend
- ForstDB: cache size sufficient для working set
- Avoid over-provision cache (RAM waste)
- Monitor cache hit rate
Чтение source
Flink Kubernetes Operator:
https://github.com/apache/flink-kubernetes-operator
Source:
flink-kubernetes-operator/
src/main/java/org/apache/flink/kubernetes/operator/autoscaler/
AutoScaler.java -- main autoscaler logic
ScalingMetricCollector.java
ScalingExecutor.java
JobAutoScalerImpl.java
src/main/java/org/apache/flink/kubernetes/operator/api/
spec/AutoscalerConfig.java -- configuration
Documentation:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs/
"Autoscaler" section
Production blogs:
Apple: "Autoscaling Flink в Kubernetes for cost optimization"
Pinterest: "Real-time pipelines autoscaling"
Apache Flink blog: "Flink Autoscaler in Operator"