JobManager HA через Kubernetes leader election
JobManager — это мозг Flink-кластера. Он держит граф джоба, координирует чекпойнты, общается с TaskManager-ами через heartbeat, принимает REST-запросы. Если JM падает — без High Availability весь джоб умирает: даже если TaskManager-ы живы, без JM-а они не знают, что делать.
В Kubernetes есть способ сделать так, чтобы при падении одного JM-пода второй мгновенно подхватил роль leader-а — через ConfigMap-based leader election. Это родной Kubernetes-механизм, тот же, что используют etcd-сервисы и контроллеры. Этот урок — про то, как это работает в Flink и как правильно настроить.
Без HA: одна точка отказа
Минимальный Flink-кластер без HA: один JobManager-под (Deployment, replicas: 1) и N TaskManager-подов. JM держит в памяти:
- ExecutionGraph: граф операторов с их parallelism и slot assignments.
- Чекпойнт-координатор: знает, какой checkpoint когда триггерить, куда писать.
- Slot pool: кто из TM-ов какие слоты занимает.
- Метаданные джоба: job ID, jar URI, savepoint trigger nonces.
Если JM-под падает (OOM, K8s eviction, node crash), Kubernetes пересоздаст его через 10-30 секунд. Но новый JM-под не знает ничего о текущем джобе.
Kubernetes leader election и operator pattern Он стартует пустой. TaskManager-ы, потеряв heartbeat, прибивают свои tasks. Чтобы джоб поднялся снова, оператор должен запустить его с нуля.
Что теряется без HA при падении JM:
- Текущий джоб полностью прерывается, все tasks убиваются.
- Прогресс с последнего checkpoint теряется (откат на last checkpoint).
- Время на recovery: 30-60 секунд минимум (новый JM, новый план, deploy tasks, restore state).
- При нестабильной ноде — циклический crash, never reaches STABLE.
В production без HA Flink-джоб уязвим к любому ребуту ноды, OOM-у JM (а это случается чаще, чем кажется, особенно при работе с большими ExecutionGraph), а также к плановым maintenance node-ов. JobManager HA — обязателен для всех production джобов.
С HA: два JM-а, один активный
При HA Kubernetes мы поднимаем два или более JobManager-подов. Один становится leader (активный — обслуживает запросы, координирует чекпойнты), остальные — standby (ждут своего часа). Если leader падает, один из standby становится новым leader-ом, подхватывает джоб и продолжает работу — без потери, без рестарта tasks.
Ключевые моменты:
- Lease в ConfigMap — это annotation вида
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"jm-0","leaseDurationSeconds":15,...}'. Leader периодически делает PATCH на эту annotation. - Failover timing: ~15 секунд по умолчанию (lease duration). Можно настроить меньше — но риск ложного failover при network blip. Больше — медленнее реакция на real failure.
- Восстановление графа: новый leader загружает JobGraph и checkpoint pointer из
high-availability.storageDir(например,s3://flink-state/orders/ha). TM-ы регистрируются у нового JM, tasks продолжают (а не рестартуют) — это и есть смысл HA.
Конфигурация HA в FlinkDeployment
Минимальная конфигурация HA в манифесте:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: orders-pipeline
spec:
image: my-registry/orders-pipeline:1.2.3
flinkVersion: v2_2
flinkConfiguration:
# HA через K8s
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/orders-pipeline/ha
kubernetes.cluster-id: orders-pipeline-cluster
# Опционально: настройка timing-а
high-availability.kubernetes.leader-election.lease-duration: "15s"
high-availability.kubernetes.leader-election.renew-deadline: "10s"
high-availability.kubernetes.leader-election.retry-period: "2s"
jobManager:
replicas: 2 # обязательно >= 2 для HA
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 3
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/usrlib/orders-pipeline.jar
parallelism: 6
upgradeMode: savepoint
Разберём ключевые параметры:
high-availability.type: kubernetes— используем встроенный в Flink K8s HA service. Альтернативы (legacy):zookeeper. В production-Kubernetes 99% случаев — kubernetes-type.high-availability.storageDir— путь, где хранится восстанавливаемое содержимое: JobGraph, checkpoint metadata. ConfigMap содержит только pointer на этот storage. Без storageDir HA не работает — новый leader не сможет восстановить ExecutionGraph.kubernetes.cluster-id— уникальный идентификатор кластера в namespace. Используется как часть имени ConfigMap-ов (например,orders-pipeline-cluster-config-map).jobManager.replicas: 2— обязательно. С 1 replica HA не работает (некому failover).
Оператор сам создаёт три ConfigMap-а для leader election: один для cluster (resource manager), один для dispatcher (job submission), один для джоба (checkpoint coordinator). Вы их увидите в namespace через kubectl get cm | grep orders-pipeline. Их не нужно менять руками — оператор управляет ими.
RBAC: чтобы JM мог писать в ConfigMap
JobManager-pod должен иметь права на чтение и запись в ConfigMap своего namespace. Без RBAC он не сможет обновлять lease — failover сломан.
Helm-чарт Flink Kubernetes Operator при установке создаёт flink ServiceAccount в watched namespace-ах с правильными правами:
# Из чарта оператора (упрощено)
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: flink
namespace: flink-jobs
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["pods", "services", "events"]
verbs: ["get", "list", "watch"]
В spec FlinkDeployment-а указывайте spec.serviceAccount: flink — это то, под чем будут запускаться JM-поды.
Если используете кастомный SA — обязательно дайте ему верные права. Без update configmaps failover не сработает, и при падении leader-а второй JM не сможет забрать lease.
# Проверка прав
kubectl auth can-i update configmaps \
--as=system:serviceaccount:flink-jobs:flink \
-n flink-jobs
# yes <- так должно быть
Что происходит при failover: пошагово
Допустим, кластер запущен с JM-0 (leader) и JM-1 (standby). 100 секунд работает нормально. Потом JM-0 падает (OOM в JVM).
t=0s: JM-0 OOM, kubelet прибивает контейнер. JM-0 pod в CrashLoopBackOff.
t=2s: kubelet перестартовывает JM-0 контейнер. Новый JVM, но в нём ничего не загружено.
t=5s: JM-1 видит, что lease в ConfigMap всё ещё annotated на JM-0 (с timestamp t=0). Ждёт истечения lease.
t=15s (leaseDurationSeconds): lease истёк. JM-1 делает atomic update на ConfigMap: holderIdentity = “jm-1”. Успешно (JM-0 ещё не успел вернуться).
t=16s: JM-1 теперь leader. Начинает восстановление:
- Загружает submittedJobGraph из storageDir (S3).
- Читает latest checkpoint metadata оттуда же.
- Объявляет себя через Kubernetes Service (JM Service selector обновляется через label change).
t=20s: TaskManagers получают timeout heartbeat с JM-0, переключаются на новый адрес (через Service). TM-ы повторно регистрируются у JM-1.
t=25s: JM-1 видит свои TM-ы, ExecutionGraph восстановлен. tasks продолжают работать (они не рестартовали, только потеряли координацию). Чекпойнт coordinator берёт следующий checkpoint.
Итого: failover ~25 секунд, tasks не рестартуют, стейт в памяти TM-ов сохранён. Это огромная разница с no-HA, где tasks бы все умерли и пересоздавались с откатом на checkpoint.
Если ваш джоб ОЧЕНЬ чувствителен к downtime — можно уменьшить leaseDurationSeconds до 5 (failover ~5 секунд). Но это увеличивает риск false failover при network blip между JM и K8s API. По умолчанию 15s — хороший балланс.
Когда JM-1 НЕ может failover
HA не магия — есть случаи, когда failover не сработает:
- storageDir недоступен. Если JM-1 не может прочитать JobGraph из S3 (creds сломаны, bucket удалён) — failover не сработает.
- ConfigMap удалён или повреждён (кто-то прошёлся
kubectl delete cm). Restart кластера от оператора. - TaskManager-ы умерли вместе с JM-0 (node failure целиком уносит несколько pod-ов). JM-1 поднимется, но без TM-ов нечего координировать — оператор подождёт переподнятия TM.
- Сетевой split: JM-1 не видит K8s API, не может обновить lease. Если в это же время умирает JM-0 — кластер недоступен до восстановления связи.
Для критичных пайплайнов используйте PodAntiAffinity, чтобы JM-0 и JM-1 жили на разных нодах:
spec:
jobManager:
podTemplate:
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
component: jobmanager
topologyKey: kubernetes.io/hostname
Тогда падение ноды никогда не уносит оба JM сразу.
Проверка, что HA работает
После apply манифеста с HA проверьте:
# 1. Два JM-пода
kubectl -n flink-jobs get pods -l component=jobmanager
# orders-pipeline-jobmanager-7c8f5d4b-x1k2m 1/1 Running
# orders-pipeline-jobmanager-7c8f5d4b-y3l4n 1/1 Running
# 2. ConfigMap-ы для leader election созданы
kubectl -n flink-jobs get cm | grep orders-pipeline
# orders-pipeline-cluster-config-map
# orders-pipeline-dispatcher-leader
# orders-pipeline-resourcemanager-leader
# orders-pipeline-restserver-leader
# 7a8e3f2b1d4c9a5e-jobmanager-leader <- per-job
# 3. Lease аннотация активна
kubectl -n flink-jobs get cm orders-pipeline-restserver-leader -o yaml | grep holderIdentity
# control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"jm-0",...}'
# 4. Тест: убиваем leader-под, проверяем переключение
LEADER_POD=$(kubectl -n flink-jobs get cm orders-pipeline-restserver-leader \
-o jsonpath='{.metadata.annotations.control-plane\.alpha\.kubernetes\.io/leader}' \
| jq -r .holderIdentity)
echo "Leader: $LEADER_POD"
kubectl -n flink-jobs delete pod $LEADER_POD
# Подождать 20 секунд и проверить нового лидера
sleep 20
kubectl -n flink-jobs get cm orders-pipeline-restserver-leader \
-o jsonpath='{.metadata.annotations.control-plane\.alpha\.kubernetes\.io/leader}' | jq -r .holderIdentity
# Должен быть другой pod ID
Если новый pod после failover стал leader-ом и джоб продолжает работать (running tasks остались, новые checkpoints успешно) — HA работает.
Ключевые выводы
-
JobManager — single point of failure без HA. Падение JM-а без HA убивает весь джоб, требует рестарта с откатом на last checkpoint.
-
HA через Kubernetes leader election: 2+ JM-пода, leader держит lease в ConfigMap, при падении standby забирает lease (~15s default).
-
Конфигурация требует трёх вещей:
high-availability.type: kubernetes,high-availability.storageDir: s3://...,jobManager.replicas: 2+. -
storageDir обязателен — без него standby не сможет восстановить ExecutionGraph при failover.
-
RBAC: ServiceAccount JM-пода должен иметь права на configmaps в namespace. Helm-чарт оператора создаёт правильную Role.
-
Failover ~25 секунд — leader election + восстановление графа. Tasks не рестартуют — только переподключаются к новому JM.
-
PodAntiAffinity на JM-ах раскидывает их по разным нодам — защита от падения целой ноды.
-
Проверка работы: kill leader-pod, через 20s другой pod становится leader-ом, джоб continues.