Learning Platform
Глоссарий Troubleshooting
Урок 22.03 · 25 мин
Продвинутый
AutoscalingReactive modeFlink AutoscalerFLIP-271ML prediction

Autoscaling setup — reactive, Flink Autoscaler, ML-based prediction

Static parallelism — главный антипаттерн в production Flink. Job настроен на parallelism=16 для обычной нагрузки, в Black Friday приходит 10x traffic, lag растёт до часов. Команда вручную скейлит до 64, через 3 часа peak проходит, но parallelism остаётся 64 — overprovisioned. Autoscaling решает обе проблемы: автоматически scale up при load, scale down при quiet.

В этом уроке разберём три уровня автоскейлинга для Flink 2.2: Reactive mode, Flink Autoscaler (FLIP-271), и ML-based prediction поверх стандартных метрик. Для bursty workloads предсказание важнее реакции.

Kafka consumer lag и performance tuning

Уровень 1: Reactive Mode — самое простое решение

Reactive mode (Flink 1.13+) — autopilot для parallelism. Flink сам подбирает parallelism под доступные TaskManager-ы. Если в кластере 32 slot — job будет parallelism=32. Если добавили ноду и стало 48 slots — job увеличит parallelism до 48 при следующем checkpoint.

spec:
  flinkConfiguration:
    scheduler-mode: reactive
    jobmanager.adaptive-scheduler.resource-wait-timeout: 30 s
    jobmanager.adaptive-scheduler.resource-stabilization-timeout: 60 s
  job:
    parallelism: -1  # не нужно при reactive

Reactive mode требует:

  • Adaptive Scheduler (default в Flink 2.x).
  • Application Mode (per-job cluster, не session).
  • Externalized state backend (S3 / disaggregated). При rescaling Flink перераспределяет key groups, требуется savepoint или incremental checkpoint с remote storage.

Scale up через HPA на TaskManager pods:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fraud-detection-tm
  namespace: fraud-team
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fraud-detection-taskmanager
  minReplicas: 4
  maxReplicas: 32
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

Limitations:

  • Реактивный, не предиктивный — scale up после того, как load уже создал lag.
  • Scale на CPU не всегда коррелирует с реальной нагрузкой Flink (Kafka lag — лучший signal).
  • Rebalance overhead — каждый rescale = stop + restart from checkpoint, downtime секунды.

Reactive mode подходит для simple use cases. Для production с bursty workload нужен Flink Autoscaler.


Flink Kubernetes Operator 1.7+ включает встроенный Autoscaler. Он использует Flink-specific metrics: backpressure, Kafka lag, processing rate per partition. На основе этих метрик рассчитывает целевой parallelism per operator.

Flink Autoscaler control loop
1. Collect metricsМетрики собираются из JobManager REST API каждые scaling.interval (default 60s). Operator pull-ит numRecordsInPerSecond, busyTimeMsPerSecond, backpressure для каждого vertex.
2. Compute true rateCompute true processing rate: throughput / (1 - backpressure). Backpressure means downstream slow — true capacity outside backpressure ниже того, что мы видим.
3. Compute target rateTarget processing rate = current Kafka lag / target catch-up time + steady-state input rate. Если lag = 1M messages, target catch-up 5 min, input rate 10K/s — target rate = 1M/300 + 10K = 13.3K/s.
4. Compute target parallelismTarget parallelism = target rate / true rate per task. Это per-vertex calculation. Operator вычисляет parallelism для каждого оператора независимо.
5. ValidateValidation: проверить scaling decision against min/max bounds, scale-up cooldown, scale-down cooldown. Не скейлим если recent rescale был `< cooldown` ago.
6. Apply: trigger rescaleApply: update FlinkDeployment spec.job.parallelism. Operator triggers savepoint, stops job, starts with new parallelism. Downtime 10-60s в зависимости от state size.

Конфигурация Autoscaler:

spec:
  flinkConfiguration:
    # Включить Autoscaler
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 5min
    job.autoscaler.metrics.window: 10min

    # Target utilization (опасно <80%, мы хотим headroom)
    job.autoscaler.target.utilization: "0.7"
    job.autoscaler.target.utilization.boundary: "0.4"

    # Bounds per operator
    job.autoscaler.vertex.max-parallelism: "128"
    job.autoscaler.vertex.min-parallelism: "1"

    # Catch-up duration: за какое время planning to катch up Kafka lag
    job.autoscaler.catch-up.duration: 5min

    # Scaling cooldown
    job.autoscaler.scale-up.grace-period: 1h
    job.autoscaler.scale-down.interval: 10min

Ключевые параметры:

target.utilization=0.7 — целевая загрузка операторов 70%. 30% headroom для bursts. Если utilization > 0.7 — scale up. Если < 0.4 (boundary) — scale down.

catch-up.duration=5min — за сколько минут хотим catch up существующий Kafka lag. Меньшее значение = более агрессивный scale up.

scale-up.grace-period=1h — после scale up следующий scale up разрешён только через час. Защита от oscillation.

Метрики, экспонируемые Autoscaler в Prometheus:

# Текущая загрузка vertex
flink_autoscaler_vertex_busy_time_avg{vertex="Source: kafka"}

# Текущий vs target parallelism
flink_autoscaler_vertex_recommended_parallelism{vertex="Source: kafka"}
flink_autoscaler_vertex_current_parallelism{vertex="Source: kafka"}

# Kafka lag по vertex
flink_autoscaler_vertex_lag{vertex="Source: kafka"}
TIP

target.utilization=0.7 — это conservative. Для batch-like jobs можно 0.85+. Для критичных low-latency — 0.5. Tuning надо делать на основе actual production load profile, не теоретически.


Custom autoscaling: Kafka lag-based controller

Built-in Autoscaler работает на basic метриках. Для специфичных use cases пишут custom controller. Пример — scale based on absolute Kafka lag:

# kafka-lag-autoscaler.py
# Custom Python controller для scaling Flink job based on Kafka lag
import time
from kubernetes import client, config
from prometheus_api_client import PrometheusConnect

config.load_incluster_config()
api = client.CustomObjectsApi()
prom = PrometheusConnect(url="http://prometheus.monitoring:9090")

JOB_NAME = "fraud-detection"
NAMESPACE = "fraud-team"
LAG_THRESHOLD_SCALE_UP = 1_000_000  # messages
LAG_THRESHOLD_SCALE_DOWN = 100_000
MIN_PARALLELISM = 4
MAX_PARALLELISM = 64

def get_current_lag():
    query = f'sum(kafka_consumer_records_lag_max{{consumergroup="{JOB_NAME}"}})'
    result = prom.custom_query(query=query)
    return float(result[0]["value"][1]) if result else 0

def get_current_parallelism():
    deployment = api.get_namespaced_custom_object(
        group="flink.apache.org",
        version="v1beta1",
        namespace=NAMESPACE,
        plural="flinkdeployments",
        name=JOB_NAME
    )
    return deployment["spec"]["job"]["parallelism"]

def update_parallelism(new_parallelism):
    patch = {"spec": {"job": {"parallelism": new_parallelism}}}
    api.patch_namespaced_custom_object(
        group="flink.apache.org",
        version="v1beta1",
        namespace=NAMESPACE,
        plural="flinkdeployments",
        name=JOB_NAME,
        body=patch
    )
    print(f"Updated parallelism to {new_parallelism}")

def main():
    last_scale_time = 0
    while True:
        now = time.time()
        if now - last_scale_time < 300:  # 5 min cooldown
            time.sleep(60)
            continue

        lag = get_current_lag()
        current = get_current_parallelism()

        if lag > LAG_THRESHOLD_SCALE_UP and current < MAX_PARALLELISM:
            new_parallelism = min(current * 2, MAX_PARALLELISM)
            update_parallelism(new_parallelism)
            last_scale_time = now
        elif lag < LAG_THRESHOLD_SCALE_DOWN and current > MIN_PARALLELISM:
            new_parallelism = max(current // 2, MIN_PARALLELISM)
            update_parallelism(new_parallelism)
            last_scale_time = now

        time.sleep(60)

if __name__ == "__main__":
    main()

Этот controller простой, но illustrates паттерн: external script читает metrics, принимает scaling decisions, patches FlinkDeployment через Kubernetes API. В production нужно добавить: hysteresis, gradient-based scaling (не doubling, а percentage), per-operator scaling вместо job-wide.


Уровень 3: ML-based prediction для bursty workloads

Reactive scaling работает для steady-state changes. Для bursty workloads (Black Friday, sport events, news cycles) предсказание лучше реакции: scale up до того, как load создаст lag.

Базовая ML-based strategy через ARIMA на исторических Kafka lag:

# ml-predictor.py
# Прогноз Kafka lag на следующий час через ARIMA
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from prometheus_api_client import PrometheusConnect

prom = PrometheusConnect(url="http://prometheus.monitoring:9090")

def get_historical_lag(hours_back=24):
    """Get Kafka lag time series за последние N часов."""
    query = 'sum(kafka_consumer_records_lag_max{consumergroup="fraud-detection"})'
    result = prom.custom_query_range(
        query=query,
        start_time=f"{hours_back}h",
        end_time="now",
        step="60s"
    )
    values = result[0]["values"]
    df = pd.DataFrame(values, columns=["timestamp", "lag"])
    df["lag"] = df["lag"].astype(float)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
    df = df.set_index("timestamp")
    return df["lag"]

def predict_next_hour(historical_lag):
    """ARIMA forecast на 60 минут вперёд."""
    model = ARIMA(historical_lag, order=(5, 1, 0))
    fit = model.fit()
    forecast = fit.forecast(steps=60)  # 60 minutes
    return forecast

def calculate_required_parallelism(predicted_peak_lag, processing_rate_per_task=1000):
    """Вычислить parallelism чтобы обработать predicted peak за 5 минут."""
    target_rate = predicted_peak_lag / 300  # over 5 min
    required_tasks = int(target_rate / processing_rate_per_task) + 1
    return required_tasks

if __name__ == "__main__":
    historical = get_historical_lag(hours_back=24)
    forecast = predict_next_hour(historical)
    peak_predicted = forecast.max()
    required_parallelism = calculate_required_parallelism(peak_predicted)
    print(f"Predicted peak lag (next hour): {peak_predicted}")
    print(f"Recommended pre-scale parallelism: {required_parallelism}")

Этот predictor запускается каждые 15 минут, и если рекомендуемый parallelism > current, проактивно scale up. Хорошо работает для seasonal patterns (daily/weekly cycles).

Для more sophisticated prediction:

  • Prophet (Facebook open-source) — лучше handles holidays, seasonality.
  • LSTM — для сложных non-linear patterns, но требует много training data.
  • Calendar-based augmentation — feature engineering: hour-of-day, day-of-week, is-holiday, is-promo-event.
WARNING

ML prediction даёт false positive (scale up без реального roads) и false negative (proскипали burst). Не используйте ML без safety net в виде reactive scaling. Pre-scale ML рекомендует к минимуму, reactive скейлит дальше при необходимости.


Production deployment: combined approach

В production используется combination всех трёх уровней:

  1. Reactive mode disabled (используем Adaptive Scheduler standard).
  2. Flink Autoscaler включён с conservative settings (target.utilization=0.7, scale-up grace 30min).
  3. ML predictor runs каждые 15 минут, может pre-scale до Autoscaler.
  4. Safety net — Prometheus alert на extreme lag spikes для manual escalation.
# Combined autoscaling config
spec:
  flinkConfiguration:
    # Adaptive Scheduler (стандарт в 2.x)
    scheduler-mode: adaptive

    # Built-in Autoscaler — main scaling механизм
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 5min
    job.autoscaler.target.utilization: "0.7"
    job.autoscaler.target.utilization.boundary: "0.4"
    job.autoscaler.scale-up.grace-period: 30min
    job.autoscaler.scale-down.interval: 10min
    job.autoscaler.catch-up.duration: 5min

    # Vertex bounds
    job.autoscaler.vertex.max-parallelism: "64"
    job.autoscaler.vertex.min-parallelism: "4"

Plus отдельный Deployment с ML predictor:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-predictor
  namespace: fraud-team
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: predictor
        image: registry.company.com/ml-predictor:1.0
        env:
        - name: TARGET_JOB
          value: fraud-detection
        - name: PROMETHEUS_URL
          value: http://prometheus.monitoring:9090
        - name: PREDICTION_INTERVAL
          value: "900"  # 15 min

Итоги

Autoscaling Flink — трёхуровневая стратегия: Adaptive Scheduler + Flink Autoscaler для reactive scaling, ML-based prediction для proactive pre-scale, manual escalation для unexpected outliers. Combined approach даёт лучший balance между cost (не over-provisioned) и SLA (не страдает от lag spikes).

В следующем уроке разберём savepoint automation: периодические savepoints, retention policy, cross-region copy, rollback playbook.

Проверка знанийKnowledge check
Flink Autoscaler настроен с target.utilization=0.7 и catch-up.duration=5min. После Black Friday peak lag в Kafka = 50M сообщений, обычная processing rate = 10K msg/s per task. Какой parallelism порекомендует Autoscaler, и нужен ли ML predictor если у нас уже есть reactive Autoscaler?
ОтветAnswer
Calculation: target processing rate = lag / catch-up + steady input. 50M / 300s = 167K msg/s for catch-up, plus steady input rate. Per task processing rate = 10K msg/s, so raw parallelism = 17. С учётом target.utilization=0.7 (хотим работать на 70 процентов, 30 процентов headroom): 17 / 0.7 примерно 24. Это правильный recovery scaling. Но главное — это REACTIVE. Lag 50M уже создан, downstream consumers уже опаздывают, business impact уже произошёл (например, fraud transactions проскочили). ML predictor позволяет PROACTIVELY scale up за час до известного peak event, избегая lag спайка вообще. Для bursty workloads с predictable patterns (sales events, scheduled promotions, daily cycles) reactive scaling всегда отстаёт — ML prediction критична.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Flink Autoscaler (FLIP-271) использует backpressure-based scaling. Объясните формулу 'true processing rate' и почему backpressure correction необходима.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5