Learning Platform
Глоссарий Troubleshooting
Урок 16.04 · 22 мин
Средний
High AvailabilityJobManagerLeader ElectionConfigMapKubernetes

JobManager HA через Kubernetes leader election

JobManager — это мозг Flink-кластера. Он держит граф джоба, координирует чекпойнты, общается с TaskManager-ами через heartbeat, принимает REST-запросы. Если JM падает — без High Availability весь джоб умирает: даже если TaskManager-ы живы, без JM-а они не знают, что делать.

В Kubernetes есть способ сделать так, чтобы при падении одного JM-пода второй мгновенно подхватил роль leader-а — через ConfigMap-based leader election. Это родной Kubernetes-механизм, тот же, что используют etcd-сервисы и контроллеры. Этот урок — про то, как это работает в Flink и как правильно настроить.


Без HA: одна точка отказа

Минимальный Flink-кластер без HA: один JobManager-под (Deployment, replicas: 1) и N TaskManager-подов. JM держит в памяти:

  • ExecutionGraph: граф операторов с их parallelism и slot assignments.
  • Чекпойнт-координатор: знает, какой checkpoint когда триггерить, куда писать.
  • Slot pool: кто из TM-ов какие слоты занимает.
  • Метаданные джоба: job ID, jar URI, savepoint trigger nonces.

Если JM-под падает (OOM, K8s eviction, node crash), Kubernetes пересоздаст его через 10-30 секунд. Но новый JM-под не знает ничего о текущем джобе.

Kubernetes leader election и operator pattern Он стартует пустой. TaskManager-ы, потеряв heartbeat, прибивают свои tasks. Чтобы джоб поднялся снова, оператор должен запустить его с нуля.

Что теряется без HA при падении JM:

  • Текущий джоб полностью прерывается, все tasks убиваются.
  • Прогресс с последнего checkpoint теряется (откат на last checkpoint).
  • Время на recovery: 30-60 секунд минимум (новый JM, новый план, deploy tasks, restore state).
  • При нестабильной ноде — циклический crash, never reaches STABLE.
WARNING

В production без HA Flink-джоб уязвим к любому ребуту ноды, OOM-у JM (а это случается чаще, чем кажется, особенно при работе с большими ExecutionGraph), а также к плановым maintenance node-ов. JobManager HA — обязателен для всех production джобов.


С HA: два JM-а, один активный

При HA Kubernetes мы поднимаем два или более JobManager-подов. Один становится leader (активный — обслуживает запросы, координирует чекпойнты), остальные — standby (ждут своего часа). Если leader падает, один из standby становится новым leader-ом, подхватывает джоб и продолжает работу — без потери, без рестарта tasks.

JobManager HA: leader election
JM-0 (Leader)Активный JobManager. Держит ExecutionGraph, координирует чекпойнты, отвечает на REST API, шлёт heartbeats в TaskManagers.
lease
K8s ConfigMapLeader Election ConfigMap. Содержит lease annotation с current leader ID и timestamp. Leader регулярно (каждые 5s) обновляет lease.
watch
JM-1 (Standby)Standby JobManager. Watching ConfigMap. Если lease истечёт (15s без обновления) - попытается забрать lease на себя через atomic update.
JM-0 падаетOOM, node failure, eviction - JM-0 больше не обновляет lease.
15s timeout
JM-1 takes overJM-1 видит stale lease - делает CompareAndSet на ConfigMap. Если успешно - становится leader. Загружает ExecutionGraph из metadata storage (S3).
No task restartTaskManagers продолжают работать. Они подключаются к новому leader через service discovery (через Service в K8s).

Ключевые моменты:

  • Lease в ConfigMap — это annotation вида control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"jm-0","leaseDurationSeconds":15,...}'. Leader периодически делает PATCH на эту annotation.
  • Failover timing: ~15 секунд по умолчанию (lease duration). Можно настроить меньше — но риск ложного failover при network blip. Больше — медленнее реакция на real failure.
  • Восстановление графа: новый leader загружает JobGraph и checkpoint pointer из high-availability.storageDir (например, s3://flink-state/orders/ha). TM-ы регистрируются у нового JM, tasks продолжают (а не рестартуют) — это и есть смысл HA.

Конфигурация HA в FlinkDeployment

Минимальная конфигурация HA в манифесте:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: orders-pipeline
spec:
  image: my-registry/orders-pipeline:1.2.3
  flinkVersion: v2_2

  flinkConfiguration:
    # HA через K8s
    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-state/orders-pipeline/ha
    kubernetes.cluster-id: orders-pipeline-cluster
    # Опционально: настройка timing-а
    high-availability.kubernetes.leader-election.lease-duration: "15s"
    high-availability.kubernetes.leader-election.renew-deadline: "10s"
    high-availability.kubernetes.leader-election.retry-period: "2s"

  jobManager:
    replicas: 2   # обязательно >= 2 для HA
    resource:
      memory: "2048m"
      cpu: 1

  taskManager:
    replicas: 3
    resource:
      memory: "4096m"
      cpu: 2

  job:
    jarURI: local:///opt/flink/usrlib/orders-pipeline.jar
    parallelism: 6
    upgradeMode: savepoint

Разберём ключевые параметры:

  • high-availability.type: kubernetes — используем встроенный в Flink K8s HA service. Альтернативы (legacy): zookeeper. В production-Kubernetes 99% случаев — kubernetes-type.
  • high-availability.storageDir — путь, где хранится восстанавливаемое содержимое: JobGraph, checkpoint metadata. ConfigMap содержит только pointer на этот storage. Без storageDir HA не работает — новый leader не сможет восстановить ExecutionGraph.
  • kubernetes.cluster-id — уникальный идентификатор кластера в namespace. Используется как часть имени ConfigMap-ов (например, orders-pipeline-cluster-config-map).
  • jobManager.replicas: 2 — обязательно. С 1 replica HA не работает (некому failover).
NOTE

Оператор сам создаёт три ConfigMap-а для leader election: один для cluster (resource manager), один для dispatcher (job submission), один для джоба (checkpoint coordinator). Вы их увидите в namespace через kubectl get cm | grep orders-pipeline. Их не нужно менять руками — оператор управляет ими.


RBAC: чтобы JM мог писать в ConfigMap

JobManager-pod должен иметь права на чтение и запись в ConfigMap своего namespace. Без RBAC он не сможет обновлять lease — failover сломан.

Helm-чарт Flink Kubernetes Operator при установке создаёт flink ServiceAccount в watched namespace-ах с правильными правами:

# Из чарта оператора (упрощено)
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: flink
  namespace: flink-jobs
rules:
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["pods", "services", "events"]
    verbs: ["get", "list", "watch"]

В spec FlinkDeployment-а указывайте spec.serviceAccount: flink — это то, под чем будут запускаться JM-поды.

Если используете кастомный SA — обязательно дайте ему верные права. Без update configmaps failover не сработает, и при падении leader-а второй JM не сможет забрать lease.

# Проверка прав
kubectl auth can-i update configmaps \
  --as=system:serviceaccount:flink-jobs:flink \
  -n flink-jobs
# yes  <- так должно быть

Что происходит при failover: пошагово

Допустим, кластер запущен с JM-0 (leader) и JM-1 (standby). 100 секунд работает нормально. Потом JM-0 падает (OOM в JVM).

t=0s: JM-0 OOM, kubelet прибивает контейнер. JM-0 pod в CrashLoopBackOff.

t=2s: kubelet перестартовывает JM-0 контейнер. Новый JVM, но в нём ничего не загружено.

t=5s: JM-1 видит, что lease в ConfigMap всё ещё annotated на JM-0 (с timestamp t=0). Ждёт истечения lease.

t=15s (leaseDurationSeconds): lease истёк. JM-1 делает atomic update на ConfigMap: holderIdentity = “jm-1”. Успешно (JM-0 ещё не успел вернуться).

t=16s: JM-1 теперь leader. Начинает восстановление:

  • Загружает submittedJobGraph из storageDir (S3).
  • Читает latest checkpoint metadata оттуда же.
  • Объявляет себя через Kubernetes Service (JM Service selector обновляется через label change).

t=20s: TaskManagers получают timeout heartbeat с JM-0, переключаются на новый адрес (через Service). TM-ы повторно регистрируются у JM-1.

t=25s: JM-1 видит свои TM-ы, ExecutionGraph восстановлен. tasks продолжают работать (они не рестартовали, только потеряли координацию). Чекпойнт coordinator берёт следующий checkpoint.

Итого: failover ~25 секунд, tasks не рестартуют, стейт в памяти TM-ов сохранён. Это огромная разница с no-HA, где tasks бы все умерли и пересоздавались с откатом на checkpoint.

WARNING

Если ваш джоб ОЧЕНЬ чувствителен к downtime — можно уменьшить leaseDurationSeconds до 5 (failover ~5 секунд). Но это увеличивает риск false failover при network blip между JM и K8s API. По умолчанию 15s — хороший балланс.


Когда JM-1 НЕ может failover

HA не магия — есть случаи, когда failover не сработает:

  1. storageDir недоступен. Если JM-1 не может прочитать JobGraph из S3 (creds сломаны, bucket удалён) — failover не сработает.
  2. ConfigMap удалён или повреждён (кто-то прошёлся kubectl delete cm). Restart кластера от оператора.
  3. TaskManager-ы умерли вместе с JM-0 (node failure целиком уносит несколько pod-ов). JM-1 поднимется, но без TM-ов нечего координировать — оператор подождёт переподнятия TM.
  4. Сетевой split: JM-1 не видит K8s API, не может обновить lease. Если в это же время умирает JM-0 — кластер недоступен до восстановления связи.

Для критичных пайплайнов используйте PodAntiAffinity, чтобы JM-0 и JM-1 жили на разных нодах:

spec:
  jobManager:
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchLabels:
                    component: jobmanager
                topologyKey: kubernetes.io/hostname

Тогда падение ноды никогда не уносит оба JM сразу.


Проверка, что HA работает

После apply манифеста с HA проверьте:

# 1. Два JM-пода
kubectl -n flink-jobs get pods -l component=jobmanager
# orders-pipeline-jobmanager-7c8f5d4b-x1k2m   1/1   Running
# orders-pipeline-jobmanager-7c8f5d4b-y3l4n   1/1   Running

# 2. ConfigMap-ы для leader election созданы
kubectl -n flink-jobs get cm | grep orders-pipeline
# orders-pipeline-cluster-config-map
# orders-pipeline-dispatcher-leader
# orders-pipeline-resourcemanager-leader
# orders-pipeline-restserver-leader
# 7a8e3f2b1d4c9a5e-jobmanager-leader   <- per-job

# 3. Lease аннотация активна
kubectl -n flink-jobs get cm orders-pipeline-restserver-leader -o yaml | grep holderIdentity
# control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"jm-0",...}'

# 4. Тест: убиваем leader-под, проверяем переключение
LEADER_POD=$(kubectl -n flink-jobs get cm orders-pipeline-restserver-leader \
  -o jsonpath='{.metadata.annotations.control-plane\.alpha\.kubernetes\.io/leader}' \
  | jq -r .holderIdentity)
echo "Leader: $LEADER_POD"

kubectl -n flink-jobs delete pod $LEADER_POD

# Подождать 20 секунд и проверить нового лидера
sleep 20
kubectl -n flink-jobs get cm orders-pipeline-restserver-leader \
  -o jsonpath='{.metadata.annotations.control-plane\.alpha\.kubernetes\.io/leader}' | jq -r .holderIdentity
# Должен быть другой pod ID

Если новый pod после failover стал leader-ом и джоб продолжает работать (running tasks остались, новые checkpoints успешно) — HA работает.


Ключевые выводы

  1. JobManager — single point of failure без HA. Падение JM-а без HA убивает весь джоб, требует рестарта с откатом на last checkpoint.

  2. HA через Kubernetes leader election: 2+ JM-пода, leader держит lease в ConfigMap, при падении standby забирает lease (~15s default).

  3. Конфигурация требует трёх вещей: high-availability.type: kubernetes, high-availability.storageDir: s3://..., jobManager.replicas: 2+.

  4. storageDir обязателен — без него standby не сможет восстановить ExecutionGraph при failover.

  5. RBAC: ServiceAccount JM-пода должен иметь права на configmaps в namespace. Helm-чарт оператора создаёт правильную Role.

  6. Failover ~25 секунд — leader election + восстановление графа. Tasks не рестартуют — только переподключаются к новому JM.

  7. PodAntiAffinity на JM-ах раскидывает их по разным нодам — защита от падения целой ноды.

  8. Проверка работы: kill leader-pod, через 20s другой pod становится leader-ом, джоб continues.

Проверка знанийKnowledge check
После настройки HA вы делаете тест: убиваете leader JM-под через kubectl delete pod. Ожидаете failover за 25 секунд. Но через 60 секунд новый leader так и не появился, кластер недоступен. Какие самые вероятные причины и как продиагностировать?
ОтветAnswer
Топ-причины: 1. ServiceAccount не имеет прав на ConfigMap update: - Проверка: kubectl auth can-i update configmaps --as=system:serviceaccount:flink-jobs:flink -n flink-jobs - Если "no" - JM не может обновить lease. Standby не сможет забрать lease, потому что нет прав. - Фикс: применить правильную Role/RoleBinding. Helm-чарт ставит её, проверьте, что не перезаписана кастомным SA. 2. storageDir недоступен или нет прав: - Проверка: kubectl logs standby-jm-pod | grep -i "high-availability|storage" - При попытке загрузить JobGraph из S3 - 403 AccessDenied или connection timeout. - Фикс: проверить IRSA-роль на pod-е (EKS), Workload Identity (GKE), или S3 endpoint reachability. 3. ConfigMap-ы повреждены или удалены: - Проверка: kubectl get cm -n flink-jobs | grep orders-pipeline - Если ConfigMap-ов нет - оператор должен их пересоздать (или вы их случайно удалили). - Фикс: kubectl delete flinkdeployment + recreate (последний резерв). 4. Standby JM в CrashLoopBackOff: - Проверка: kubectl get pods -l component=jobmanager - Если standby тоже падает - двойной отказ. Смотреть kubectl describe pod и логи. - Фикс: причина указана в crash - чаще всего OOM или missing config. 5. Lease duration слишком большой: - Проверка: значение high-availability.kubernetes.leader-election.lease-duration в flinkConfiguration. - Если 60s+ - failover потребует столько же времени. 6. PodAntiAffinity не сработал, оба JM на одной ноде, нода упала: - Проверка: kubectl describe nodes - есть ли pending pods. - Если новая нода ещё scheduling - подождать. Если pending из-за taints - проверить tolerations. Алгоритм диагностики: 1. kubectl logs standby-jm-pod -c flink-main-container --tail=100 2. kubectl auth can-i update configmaps как SA 3. kubectl get events -n flink-jobs --sort-by=.lastTimestamp | tail -20 4. kubectl get cm orders-pipeline-restserver-leader -o yaml

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Конфигурация FlinkDeployment включает jobManager.replicas: 2, но не указан high-availability.storageDir. Что произойдёт при падении leader JobManager-а?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5