Autoscaling setup — reactive, Flink Autoscaler, ML-based prediction
Static parallelism — главный антипаттерн в production Flink. Job настроен на parallelism=16 для обычной нагрузки, в Black Friday приходит 10x traffic, lag растёт до часов. Команда вручную скейлит до 64, через 3 часа peak проходит, но parallelism остаётся 64 — overprovisioned. Autoscaling решает обе проблемы: автоматически scale up при load, scale down при quiet.
В этом уроке разберём три уровня автоскейлинга для Flink 2.2: Reactive mode, Flink Autoscaler (FLIP-271), и ML-based prediction поверх стандартных метрик. Для bursty workloads предсказание важнее реакции.
Kafka consumer lag и performance tuningУровень 1: Reactive Mode — самое простое решение
Reactive mode (Flink 1.13+) — autopilot для parallelism. Flink сам подбирает parallelism под доступные TaskManager-ы. Если в кластере 32 slot — job будет parallelism=32. Если добавили ноду и стало 48 slots — job увеличит parallelism до 48 при следующем checkpoint.
spec:
flinkConfiguration:
scheduler-mode: reactive
jobmanager.adaptive-scheduler.resource-wait-timeout: 30 s
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 60 s
job:
parallelism: -1 # не нужно при reactive
Reactive mode требует:
- Adaptive Scheduler (default в Flink 2.x).
- Application Mode (per-job cluster, не session).
- Externalized state backend (S3 / disaggregated). При rescaling Flink перераспределяет key groups, требуется savepoint или incremental checkpoint с remote storage.
Scale up через HPA на TaskManager pods:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fraud-detection-tm
namespace: fraud-team
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fraud-detection-taskmanager
minReplicas: 4
maxReplicas: 32
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Limitations:
- Реактивный, не предиктивный — scale up после того, как load уже создал lag.
- Scale на CPU не всегда коррелирует с реальной нагрузкой Flink (Kafka lag — лучший signal).
- Rebalance overhead — каждый rescale = stop + restart from checkpoint, downtime секунды.
Reactive mode подходит для simple use cases. Для production с bursty workload нужен Flink Autoscaler.
Уровень 2: Flink Autoscaler (FLIP-271)
Flink Kubernetes Operator 1.7+ включает встроенный Autoscaler. Он использует Flink-specific metrics: backpressure, Kafka lag, processing rate per partition. На основе этих метрик рассчитывает целевой parallelism per operator.
Конфигурация Autoscaler:
spec:
flinkConfiguration:
# Включить Autoscaler
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 5min
job.autoscaler.metrics.window: 10min
# Target utilization (опасно <80%, мы хотим headroom)
job.autoscaler.target.utilization: "0.7"
job.autoscaler.target.utilization.boundary: "0.4"
# Bounds per operator
job.autoscaler.vertex.max-parallelism: "128"
job.autoscaler.vertex.min-parallelism: "1"
# Catch-up duration: за какое время planning to катch up Kafka lag
job.autoscaler.catch-up.duration: 5min
# Scaling cooldown
job.autoscaler.scale-up.grace-period: 1h
job.autoscaler.scale-down.interval: 10min
Ключевые параметры:
target.utilization=0.7 — целевая загрузка операторов 70%. 30% headroom для bursts. Если utilization > 0.7 — scale up. Если < 0.4 (boundary) — scale down.
catch-up.duration=5min — за сколько минут хотим catch up существующий Kafka lag. Меньшее значение = более агрессивный scale up.
scale-up.grace-period=1h — после scale up следующий scale up разрешён только через час. Защита от oscillation.
Метрики, экспонируемые Autoscaler в Prometheus:
# Текущая загрузка vertex
flink_autoscaler_vertex_busy_time_avg{vertex="Source: kafka"}
# Текущий vs target parallelism
flink_autoscaler_vertex_recommended_parallelism{vertex="Source: kafka"}
flink_autoscaler_vertex_current_parallelism{vertex="Source: kafka"}
# Kafka lag по vertex
flink_autoscaler_vertex_lag{vertex="Source: kafka"}
target.utilization=0.7 — это conservative. Для batch-like jobs можно 0.85+. Для критичных low-latency — 0.5. Tuning надо делать на основе actual production load profile, не теоретически.
Custom autoscaling: Kafka lag-based controller
Built-in Autoscaler работает на basic метриках. Для специфичных use cases пишут custom controller. Пример — scale based on absolute Kafka lag:
# kafka-lag-autoscaler.py
# Custom Python controller для scaling Flink job based on Kafka lag
import time
from kubernetes import client, config
from prometheus_api_client import PrometheusConnect
config.load_incluster_config()
api = client.CustomObjectsApi()
prom = PrometheusConnect(url="http://prometheus.monitoring:9090")
JOB_NAME = "fraud-detection"
NAMESPACE = "fraud-team"
LAG_THRESHOLD_SCALE_UP = 1_000_000 # messages
LAG_THRESHOLD_SCALE_DOWN = 100_000
MIN_PARALLELISM = 4
MAX_PARALLELISM = 64
def get_current_lag():
query = f'sum(kafka_consumer_records_lag_max{{consumergroup="{JOB_NAME}"}})'
result = prom.custom_query(query=query)
return float(result[0]["value"][1]) if result else 0
def get_current_parallelism():
deployment = api.get_namespaced_custom_object(
group="flink.apache.org",
version="v1beta1",
namespace=NAMESPACE,
plural="flinkdeployments",
name=JOB_NAME
)
return deployment["spec"]["job"]["parallelism"]
def update_parallelism(new_parallelism):
patch = {"spec": {"job": {"parallelism": new_parallelism}}}
api.patch_namespaced_custom_object(
group="flink.apache.org",
version="v1beta1",
namespace=NAMESPACE,
plural="flinkdeployments",
name=JOB_NAME,
body=patch
)
print(f"Updated parallelism to {new_parallelism}")
def main():
last_scale_time = 0
while True:
now = time.time()
if now - last_scale_time < 300: # 5 min cooldown
time.sleep(60)
continue
lag = get_current_lag()
current = get_current_parallelism()
if lag > LAG_THRESHOLD_SCALE_UP and current < MAX_PARALLELISM:
new_parallelism = min(current * 2, MAX_PARALLELISM)
update_parallelism(new_parallelism)
last_scale_time = now
elif lag < LAG_THRESHOLD_SCALE_DOWN and current > MIN_PARALLELISM:
new_parallelism = max(current // 2, MIN_PARALLELISM)
update_parallelism(new_parallelism)
last_scale_time = now
time.sleep(60)
if __name__ == "__main__":
main()
Этот controller простой, но illustrates паттерн: external script читает metrics, принимает scaling decisions, patches FlinkDeployment через Kubernetes API. В production нужно добавить: hysteresis, gradient-based scaling (не doubling, а percentage), per-operator scaling вместо job-wide.
Уровень 3: ML-based prediction для bursty workloads
Reactive scaling работает для steady-state changes. Для bursty workloads (Black Friday, sport events, news cycles) предсказание лучше реакции: scale up до того, как load создаст lag.
Базовая ML-based strategy через ARIMA на исторических Kafka lag:
# ml-predictor.py
# Прогноз Kafka lag на следующий час через ARIMA
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from prometheus_api_client import PrometheusConnect
prom = PrometheusConnect(url="http://prometheus.monitoring:9090")
def get_historical_lag(hours_back=24):
"""Get Kafka lag time series за последние N часов."""
query = 'sum(kafka_consumer_records_lag_max{consumergroup="fraud-detection"})'
result = prom.custom_query_range(
query=query,
start_time=f"{hours_back}h",
end_time="now",
step="60s"
)
values = result[0]["values"]
df = pd.DataFrame(values, columns=["timestamp", "lag"])
df["lag"] = df["lag"].astype(float)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df = df.set_index("timestamp")
return df["lag"]
def predict_next_hour(historical_lag):
"""ARIMA forecast на 60 минут вперёд."""
model = ARIMA(historical_lag, order=(5, 1, 0))
fit = model.fit()
forecast = fit.forecast(steps=60) # 60 minutes
return forecast
def calculate_required_parallelism(predicted_peak_lag, processing_rate_per_task=1000):
"""Вычислить parallelism чтобы обработать predicted peak за 5 минут."""
target_rate = predicted_peak_lag / 300 # over 5 min
required_tasks = int(target_rate / processing_rate_per_task) + 1
return required_tasks
if __name__ == "__main__":
historical = get_historical_lag(hours_back=24)
forecast = predict_next_hour(historical)
peak_predicted = forecast.max()
required_parallelism = calculate_required_parallelism(peak_predicted)
print(f"Predicted peak lag (next hour): {peak_predicted}")
print(f"Recommended pre-scale parallelism: {required_parallelism}")
Этот predictor запускается каждые 15 минут, и если рекомендуемый parallelism > current, проактивно scale up. Хорошо работает для seasonal patterns (daily/weekly cycles).
Для more sophisticated prediction:
- Prophet (Facebook open-source) — лучше handles holidays, seasonality.
- LSTM — для сложных non-linear patterns, но требует много training data.
- Calendar-based augmentation — feature engineering: hour-of-day, day-of-week, is-holiday, is-promo-event.
ML prediction даёт false positive (scale up без реального roads) и false negative (proскипали burst). Не используйте ML без safety net в виде reactive scaling. Pre-scale ML рекомендует к минимуму, reactive скейлит дальше при необходимости.
Production deployment: combined approach
В production используется combination всех трёх уровней:
- Reactive mode disabled (используем Adaptive Scheduler standard).
- Flink Autoscaler включён с conservative settings (target.utilization=0.7, scale-up grace 30min).
- ML predictor runs каждые 15 минут, может pre-scale до Autoscaler.
- Safety net — Prometheus alert на extreme lag spikes для manual escalation.
# Combined autoscaling config
spec:
flinkConfiguration:
# Adaptive Scheduler (стандарт в 2.x)
scheduler-mode: adaptive
# Built-in Autoscaler — main scaling механизм
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 5min
job.autoscaler.target.utilization: "0.7"
job.autoscaler.target.utilization.boundary: "0.4"
job.autoscaler.scale-up.grace-period: 30min
job.autoscaler.scale-down.interval: 10min
job.autoscaler.catch-up.duration: 5min
# Vertex bounds
job.autoscaler.vertex.max-parallelism: "64"
job.autoscaler.vertex.min-parallelism: "4"
Plus отдельный Deployment с ML predictor:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-predictor
namespace: fraud-team
spec:
replicas: 1
template:
spec:
containers:
- name: predictor
image: registry.company.com/ml-predictor:1.0
env:
- name: TARGET_JOB
value: fraud-detection
- name: PROMETHEUS_URL
value: http://prometheus.monitoring:9090
- name: PREDICTION_INTERVAL
value: "900" # 15 min
Итоги
Autoscaling Flink — трёхуровневая стратегия: Adaptive Scheduler + Flink Autoscaler для reactive scaling, ML-based prediction для proactive pre-scale, manual escalation для unexpected outliers. Combined approach даёт лучший balance между cost (не over-provisioned) и SLA (не страдает от lag spikes).
В следующем уроке разберём savepoint automation: периодические savepoints, retention policy, cross-region copy, rollback playbook.