Flink Kubernetes Operator: FlinkDeployment и FlinkSessionJob
В production Flink почти всегда живёт в Kubernetes. Flink Kubernetes Operator — это официальный способ Apache Flink развернуть, обновить и поддерживать стриминговые джобы в K8s-кластере, не воюя руками с Deployment-ами, ConfigMap-ами и StatefulSet-ами. Вы описываете джоб декларативно через CRD, а оператор делает всё остальное — поднимает JobManager и TaskManager, сохраняет savepoint при обновлении, отслеживает HA через ConfigMap.
В этом уроке разберём, что внутри оператора, какие у него CRD, и напишем минимальный, но production-ready FlinkDeployment.
Зачем нужен оператор: декларативный Flink
Без оператора деплой Flink-джоба в Kubernetes выглядит так: вы вручную пишете Deployment для JobManager, Deployment (или ReplicaSet) для TaskManager, ConfigMap с flink-conf.yaml, Service для REST API JobManager, Service для blob-сервера, PVC для checkpoint-ов, Job для submit-а jar-ника. Это десятки YAML-ов на каждый стрим-джоб. И когда нужно обновить версию джара — savepoint, скейл-даун, замена образа, restore-from-savepoint, скейл-ап — всё руками или через сложные хелм-чарты.
Flink Kubernetes Operator решает эту боль. Он предоставляет два CRD: FlinkDeployment (полностью управляемый кластер с одним джобом или session cluster) и FlinkSessionJob (submit джоба в существующий session cluster). Оператор смотрит на эти ресурсы, генерирует все нужные Deployment-ы и ConfigMap-ы внутри себя, отслеживает их статус, делает savepoint при upgrade-е, перезапускает джоб при сбое.
Версия Flink K8s Operator 1.14.0 (Feb 2026) — это стабильный релиз, который поддерживает Flink 1.18 — 2.2. В уроке всё будет ориентировано на эту пару: оператор 1.14, Flink 2.2.
Ключевая идея — reconciliation loop. Оператор постоянно сравнивает желаемое состояние (то, что в CRD) с реальным (поды, конфиги).
Kubernetes CRD и operator pattern: reconciliation loop Если расхождение — приводит реальное к желаемому. Это тот же паттерн, что у нативных контроллеров Kubernetes (Deployment-контроллер, StatefulSet-контроллер).
CRD: FlinkDeployment vs FlinkSessionJob
Оператор работает с двумя CRD. Понимать разницу — обязательно, потому что от выбора зависит модель управления джобами.
FlinkDeployment
FlinkDeployment — это весь Flink-кластер, описанный одним ресурсом. Внутри может быть либо один application mode-джоб (один кластер = один джоб), либо session cluster (один кластер, в который потом отдельно сабмитятся джобы через FlinkSessionJob). Application mode — это рекомендованный production-паттерн: один кластер на один джоб, изоляция отказов, простой lifecycle.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: orders-pipeline
spec:
image: my-registry/orders-pipeline:1.2.3
flinkVersion: v2_2
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/usrlib/orders-pipeline.jar
parallelism: 4
upgradeMode: savepoint
state: running
Когда вы делаете kubectl apply -f с этим YAML, оператор создаёт отдельный кластер для этого джоба. Образ — ваш JAR упакован в Flink-образ. Через job.jarURI оператор знает, что запускать. Через job.state: running управляете жизненным циклом (running / suspended).
FlinkSessionJob
FlinkSessionJob — это только джоб, который сабмитится в уже работающий session-кластер. Session-кластер описывается отдельным FlinkDeployment без секции job:. Полезно когда у вас десятки маленьких джобов, которые делят один кластер ради экономии ресурсов JobManager-а.
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: realtime-alerts
spec:
deploymentName: shared-session-cluster
job:
jarURI: https://artifacts.internal/jars/realtime-alerts-0.4.0.jar
parallelism: 2
upgradeMode: savepoint
В production для критичных пайплайнов используйте application mode (отдельный FlinkDeployment с секцией job:), а не session-кластеры. Изоляция: упавший джоб не утянет за собой соседей. Деплой проще: savepoint и rollback на уровне всего кластера. Session-режим оправдан только для пачки лёгких ad-hoc джобов на edge-команды.
Установка оператора через Helm
Оператор ставится одним helm-чартом из официального репозитория Apache Flink. Чарт ставит сам оператор-под, его RBAC (ClusterRole для управления Deployment-ами в watched namespace-ах), CRD-схемы и webhook для валидации.
# Добавляем репо
helm repo add flink-operator-repo \
https://downloads.apache.org/flink/flink-kubernetes-operator-1.14.0/
helm repo update
# Установка в namespace flink
kubectl create namespace flink
helm install flink-kubernetes-operator \
flink-operator-repo/flink-kubernetes-operator \
--version 1.14.0 \
--namespace flink \
--set image.repository=ghcr.io/apache/flink-kubernetes-operator \
--set image.tag=1.14.0 \
--set watchNamespaces={flink-jobs}
watchNamespaces — это критичный параметр. Если оставить пустым, оператор будет следить за всем кластером (ему нужны cluster-wide права). Безопаснее явно указать namespace-ы, где будут жить FlinkDeployment-ы. Тогда оператор получит только Role в этих namespace-ах вместо ClusterRole — соответствует principle of least privilege.
После установки проверьте:
kubectl -n flink get pods
# NAME READY STATUS
# flink-kubernetes-operator-7d8b9c5d4f-x9k2m 1/1 Running
kubectl get crd | grep flink
# flinkdeployments.flink.apache.org
# flinksessionjobs.flink.apache.org
Cert-manager: оператор использует webhook для валидации CRD-манифестов. Helm-чарт по умолчанию ставит самоподписный cert через init-container, но в production-кластерах принято использовать cert-manager. Если у вас cert-manager есть — добавьте --set webhook.create=true --set certManager.create=false и оператор сгенерирует cert-manager Issuer.
Минимальный production-ready FlinkDeployment
Разберём по частям рабочий манифест для джоба, который читает Kafka и пишет в S3-Iceberg. Это шаблон, который вы можете прямо использовать как основу.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: orders-to-iceberg
namespace: flink-jobs
spec:
image: my-registry/orders-to-iceberg:v0.4.2
imagePullPolicy: IfNotPresent
flinkVersion: v2_2
serviceAccount: flink
flinkConfiguration:
# State backend - RocksDB + S3 checkpoints
state.backend.type: rocksdb
state.checkpoints.dir: s3://flink-state/orders-to-iceberg/checkpoints
state.savepoints.dir: s3://flink-state/orders-to-iceberg/savepoints
execution.checkpointing.interval: "60s"
execution.checkpointing.min-pause: "30s"
execution.checkpointing.timeout: "10min"
execution.checkpointing.unaligned: "true"
# HA через K8s ConfigMap
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/orders-to-iceberg/ha
# Метрики - Prometheus
metrics.reporters: prom
metrics.reporter.prom.factory.class: |
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9249"
jobManager:
replicas: 2
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 3
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/usrlib/orders-to-iceberg.jar
entryClass: com.acme.flink.OrdersToIceberg
parallelism: 6
upgradeMode: savepoint
state: running
Разберём по блокам, на что обратить внимание.
Метаданные и образ. image — это ваш Flink-образ с вшитым JAR. Стандартный подход: на базе flink:2.2.0-java17 копируете свой usrlib/orders-to-iceberg.jar. flinkVersion: v2_2 — оператор использует это для валидации совместимости и выбора правильных дефолтов конфига.
flinkConfiguration — это содержимое flink-conf.yaml. Оператор сгенерирует ConfigMap из этих ключей. Несколько критичных пунктов:
state.backend.type: rocksdb— для джобов с большим стейтом (миллионы записей). Для совсем маленьких можноhashmap, но в production почти всегда RocksDB.execution.checkpointing.interval: 60s— чекпойнты каждую минуту. Меньше — больше нагрузка, чаще снапшоты, меньше потерь при перезапуске. Больше — наоборот.execution.checkpointing.unaligned: true— выручает при backpressure, не блокирует чекпойнт ожиданием выравнивания барьеров. Включайте в production с backpressure-чувствительными джобами.high-availability.type: kubernetes— leader election JobManager через ConfigMap-ы Kubernetes. Без этого кластер не переживёт падение JM. Этому будет отдельный урок.
jobManager.replicas: 2 — обязательно для HA. Один JM активен, второй standby, переключение через leader election.
taskManager.replicas: 3 + parallelism: 6 — три TM по 2 слота (по умолчанию taskmanager.numberOfTaskSlots: 1, но рекомендуется 2 через flinkConfiguration). 3 TM × 2 slots = 6 slots для parallelism 6.
job.upgradeMode: savepoint — при изменении spec оператор сделает savepoint, выключит джоб, развернёт новый образ, восстановится из savepoint. Деталям upgrade modes посвящён следующий урок.
job.state: running — переключение на suspended останавливает джоб (с savepoint). Возврат на running — поднимает обратно.
serviceAccount: flink — оператор не создаёт SA автоматически. Вам нужно заранее создать SA с правами на доступ к S3 (если стейт хранится там через IRSA в EKS / Workload Identity в GKE) и Kubernetes RBAC для HA ConfigMap-ов. Helm-чарт ставит правильные права на cluster-роль операторa, но для job-ов нужен отдельный SA.
Лайфцикл: что делает оператор после apply
Когда вы делаете kubectl apply -f orders-to-iceberg.yaml, оператор проходит несколько фаз. Знание этих фаз помогает дебажить.
- CREATED — CRD принят, оператор увидел, начинает обработку.
- DEPLOYING — оператор создаёт ConfigMap (
flink-conf.yaml, log4j), JobManager Deployment (с 2 replicas если HA), TaskManager Deployment (с указанным числом replicas), Service для REST/blob, HA ConfigMap-ы. - DEPLOYED — поды стартанули, JM выбрал leader-а, REST API отвечает.
- SUBMITTED — JM получил JAR (через
local://из образа или через blob server), сабмитнул джоб, ExecutionGraph создан. - STABLE — джоб running, первый checkpoint успешно сделан.
Каждый переход виден в kubectl describe flinkdeployment orders-to-iceberg в секции Events и Status. Если что-то не так — kubectl logs -n flink deployment/flink-kubernetes-operator покажет, на каком шаге застряли.
kubectl -n flink-jobs get flinkdeployment
# NAME JOB STATUS LIFECYCLE STATE
# orders-to-iceberg RUNNING STABLE
kubectl -n flink-jobs describe flinkdeployment orders-to-iceberg
# ...
# Status:
# Job Status:
# Job Id: 7a8e3f2b1d4c9a5e
# Job Name: OrdersToIcebergJob
# State: RUNNING
# Start Time: 1715342401234
# Lifecycle State: STABLE
Чего оператор НЕ делает (на что обратить внимание)
-
Не управляет хранилищем для стейта. S3-бакет, IRSA-роль, политики доступа — это ваша инфра. Оператор только записывает туда чекпойнты через
state.checkpoints.dir. -
Не делает мониторинг. Prometheus reporter включается в
flinkConfiguration, но Prometheus-сервер, ServiceMonitor, дашборды Grafana — отдельная инфра. Этому посвящён урок 5. -
Не обновляет образ оператора сам. Helm upgrade при выходе новой версии — ручная операция. Перед upgrade проверьте
compatibility matrixдля вашей версии Flink. -
Не валидирует ваш JAR. Если в коде ошибка и джоб падает при старте — оператор увидит это и попробует пересоздать pod. Циклическое падение приведёт к
RestartingStateбез выхода. -
Не управляет секретами. Kafka SASL, S3 credentials — это ваши K8s Secret-ы, монтируются в job pod через
podTemplate.
podTemplate в FlinkDeployment.spec — это мощный механизм инъекции произвольной спецификации Pod (volumes, env, sidecars, securityContext). Используйте его для секретов, volume mounts и инициализационных контейнеров. Документация на сайте оператора показывает примеры.
Ключевые выводы
-
Flink Kubernetes Operator — декларативное управление Flink-кластерами через CRD: вы описываете желаемое состояние, оператор приводит реальное к нему.
-
FlinkDeployment — весь кластер (один JM-Deployment, один TM-Deployment, ConfigMap). С секцией
job:— application mode (рекомендуется). Без — session mode. -
FlinkSessionJob — отдельный джоб в существующем session-кластере. Удобно для пачки маленьких ad-hoc джобов.
-
Установка: Helm-чарт из официального репозитория Apache Flink, обязательно ограничить
watchNamespaces. -
Минимальный production-ready манифест включает: HA через K8s, RocksDB + S3 checkpoints, JM replicas: 2, TM resource limits, Prometheus reporter, upgradeMode: savepoint.
-
Lifecycle states: CREATED -> DEPLOYING -> DEPLOYED -> SUBMITTED -> STABLE. Видны в
kubectl describe. -
Оператор не делает за вас: инфру для стейта, мониторинг, секреты, кастомные образы.