Learning Platform
Глоссарий Troubleshooting
Урок 16.01 · 25 мин
Средний
KubernetesOperatorFlinkDeploymentFlinkSessionJobCRDHelm

Flink Kubernetes Operator: FlinkDeployment и FlinkSessionJob

В production Flink почти всегда живёт в Kubernetes. Flink Kubernetes Operator — это официальный способ Apache Flink развернуть, обновить и поддерживать стриминговые джобы в K8s-кластере, не воюя руками с Deployment-ами, ConfigMap-ами и StatefulSet-ами. Вы описываете джоб декларативно через CRD, а оператор делает всё остальное — поднимает JobManager и TaskManager, сохраняет savepoint при обновлении, отслеживает HA через ConfigMap.

В этом уроке разберём, что внутри оператора, какие у него CRD, и напишем минимальный, но production-ready FlinkDeployment.


Без оператора деплой Flink-джоба в Kubernetes выглядит так: вы вручную пишете Deployment для JobManager, Deployment (или ReplicaSet) для TaskManager, ConfigMap с flink-conf.yaml, Service для REST API JobManager, Service для blob-сервера, PVC для checkpoint-ов, Job для submit-а jar-ника. Это десятки YAML-ов на каждый стрим-джоб. И когда нужно обновить версию джара — savepoint, скейл-даун, замена образа, restore-from-savepoint, скейл-ап — всё руками или через сложные хелм-чарты.

Flink Kubernetes Operator решает эту боль. Он предоставляет два CRD: FlinkDeployment (полностью управляемый кластер с одним джобом или session cluster) и FlinkSessionJob (submit джоба в существующий session cluster). Оператор смотрит на эти ресурсы, генерирует все нужные Deployment-ы и ConfigMap-ы внутри себя, отслеживает их статус, делает savepoint при upgrade-е, перезапускает джоб при сбое.

NOTE

Версия Flink K8s Operator 1.14.0 (Feb 2026) — это стабильный релиз, который поддерживает Flink 1.18 — 2.2. В уроке всё будет ориентировано на эту пару: оператор 1.14, Flink 2.2.

Что делает оператор
Вы пишетеОдин декларативный YAML с FlinkDeployment CRD. Описываете образ, parallelism, ресурсы, конфиг — что хотите получить.
kubectl apply
K8s APIAPI-сервер принимает CRD, валидирует через openAPI-схему, складывает в etcd. Никаких подов ещё нет.
watch
ОператорКонтроллер оператора слушает изменения CRD через watch API. Видит новый FlinkDeployment — запускает reconcile loop.
create
Реальные ресурсыОператор создаёт JobManager Deployment, TaskManager Deployment, ConfigMap с flink-conf.yaml, Service для REST API, HA ConfigMap-ы для leader election.

Ключевая идея — reconciliation loop. Оператор постоянно сравнивает желаемое состояние (то, что в CRD) с реальным (поды, конфиги).

Kubernetes CRD и operator pattern: reconciliation loop Если расхождение — приводит реальное к желаемому. Это тот же паттерн, что у нативных контроллеров Kubernetes (Deployment-контроллер, StatefulSet-контроллер).


CRD: FlinkDeployment vs FlinkSessionJob

Оператор работает с двумя CRD. Понимать разницу — обязательно, потому что от выбора зависит модель управления джобами.

FlinkDeployment

FlinkDeployment — это весь Flink-кластер, описанный одним ресурсом. Внутри может быть либо один application mode-джоб (один кластер = один джоб), либо session cluster (один кластер, в который потом отдельно сабмитятся джобы через FlinkSessionJob). Application mode — это рекомендованный production-паттерн: один кластер на один джоб, изоляция отказов, простой lifecycle.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: orders-pipeline
spec:
  image: my-registry/orders-pipeline:1.2.3
  flinkVersion: v2_2
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2
  job:
    jarURI: local:///opt/flink/usrlib/orders-pipeline.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Когда вы делаете kubectl apply -f с этим YAML, оператор создаёт отдельный кластер для этого джоба. Образ — ваш JAR упакован в Flink-образ. Через job.jarURI оператор знает, что запускать. Через job.state: running управляете жизненным циклом (running / suspended).

FlinkSessionJob

FlinkSessionJob — это только джоб, который сабмитится в уже работающий session-кластер. Session-кластер описывается отдельным FlinkDeployment без секции job:. Полезно когда у вас десятки маленьких джобов, которые делят один кластер ради экономии ресурсов JobManager-а.

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: realtime-alerts
spec:
  deploymentName: shared-session-cluster
  job:
    jarURI: https://artifacts.internal/jars/realtime-alerts-0.4.0.jar
    parallelism: 2
    upgradeMode: savepoint
WARNING

В production для критичных пайплайнов используйте application mode (отдельный FlinkDeployment с секцией job:), а не session-кластеры. Изоляция: упавший джоб не утянет за собой соседей. Деплой проще: savepoint и rollback на уровне всего кластера. Session-режим оправдан только для пачки лёгких ad-hoc джобов на edge-команды.

FlinkDeployment vs FlinkSessionJob
Application modeОдин FlinkDeployment = один кластер = один джоб. Изоляция отказов. Рекомендованный паттерн для production пайплайнов.
ИзоляцияКаждый джоб в своём кластере. Сбой одного не влияет на других. Ресурсы JM и TM выделены отдельно.
Session modeFlinkDeployment без job: запускает пустой кластер. FlinkSessionJob сабмитятся туда. Один JM управляет несколькими джобами.
Совместное использованиеВсе джобы делят JM и TM-пул. Экономия ресурсов, но риск: один глюк может уронить весь кластер с остальными джобами.

Установка оператора через Helm

Оператор ставится одним helm-чартом из официального репозитория Apache Flink. Чарт ставит сам оператор-под, его RBAC (ClusterRole для управления Deployment-ами в watched namespace-ах), CRD-схемы и webhook для валидации.

# Добавляем репо
helm repo add flink-operator-repo \
  https://downloads.apache.org/flink/flink-kubernetes-operator-1.14.0/

helm repo update

# Установка в namespace flink
kubectl create namespace flink

helm install flink-kubernetes-operator \
  flink-operator-repo/flink-kubernetes-operator \
  --version 1.14.0 \
  --namespace flink \
  --set image.repository=ghcr.io/apache/flink-kubernetes-operator \
  --set image.tag=1.14.0 \
  --set watchNamespaces={flink-jobs}

watchNamespaces — это критичный параметр. Если оставить пустым, оператор будет следить за всем кластером (ему нужны cluster-wide права). Безопаснее явно указать namespace-ы, где будут жить FlinkDeployment-ы. Тогда оператор получит только Role в этих namespace-ах вместо ClusterRole — соответствует principle of least privilege.

После установки проверьте:

kubectl -n flink get pods
# NAME                                          READY   STATUS
# flink-kubernetes-operator-7d8b9c5d4f-x9k2m    1/1     Running

kubectl get crd | grep flink
# flinkdeployments.flink.apache.org
# flinksessionjobs.flink.apache.org
TIP

Cert-manager: оператор использует webhook для валидации CRD-манифестов. Helm-чарт по умолчанию ставит самоподписный cert через init-container, но в production-кластерах принято использовать cert-manager. Если у вас cert-manager есть — добавьте --set webhook.create=true --set certManager.create=false и оператор сгенерирует cert-manager Issuer.


Минимальный production-ready FlinkDeployment

Разберём по частям рабочий манифест для джоба, который читает Kafka и пишет в S3-Iceberg. Это шаблон, который вы можете прямо использовать как основу.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: orders-to-iceberg
  namespace: flink-jobs
spec:
  image: my-registry/orders-to-iceberg:v0.4.2
  imagePullPolicy: IfNotPresent
  flinkVersion: v2_2
  serviceAccount: flink

  flinkConfiguration:
    # State backend - RocksDB + S3 checkpoints
    state.backend.type: rocksdb
    state.checkpoints.dir: s3://flink-state/orders-to-iceberg/checkpoints
    state.savepoints.dir: s3://flink-state/orders-to-iceberg/savepoints
    execution.checkpointing.interval: "60s"
    execution.checkpointing.min-pause: "30s"
    execution.checkpointing.timeout: "10min"
    execution.checkpointing.unaligned: "true"

    # HA через K8s ConfigMap
    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-state/orders-to-iceberg/ha

    # Метрики - Prometheus
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: |
      org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9249"

  jobManager:
    replicas: 2
    resource:
      memory: "2048m"
      cpu: 1

  taskManager:
    replicas: 3
    resource:
      memory: "4096m"
      cpu: 2

  job:
    jarURI: local:///opt/flink/usrlib/orders-to-iceberg.jar
    entryClass: com.acme.flink.OrdersToIceberg
    parallelism: 6
    upgradeMode: savepoint
    state: running

Разберём по блокам, на что обратить внимание.

Метаданные и образ. image — это ваш Flink-образ с вшитым JAR. Стандартный подход: на базе flink:2.2.0-java17 копируете свой usrlib/orders-to-iceberg.jar. flinkVersion: v2_2 — оператор использует это для валидации совместимости и выбора правильных дефолтов конфига.

flinkConfiguration — это содержимое flink-conf.yaml. Оператор сгенерирует ConfigMap из этих ключей. Несколько критичных пунктов:

  • state.backend.type: rocksdb — для джобов с большим стейтом (миллионы записей). Для совсем маленьких можно hashmap, но в production почти всегда RocksDB.
  • execution.checkpointing.interval: 60s — чекпойнты каждую минуту. Меньше — больше нагрузка, чаще снапшоты, меньше потерь при перезапуске. Больше — наоборот.
  • execution.checkpointing.unaligned: true — выручает при backpressure, не блокирует чекпойнт ожиданием выравнивания барьеров. Включайте в production с backpressure-чувствительными джобами.
  • high-availability.type: kubernetes — leader election JobManager через ConfigMap-ы Kubernetes. Без этого кластер не переживёт падение JM. Этому будет отдельный урок.

jobManager.replicas: 2 — обязательно для HA. Один JM активен, второй standby, переключение через leader election.

taskManager.replicas: 3 + parallelism: 6 — три TM по 2 слота (по умолчанию taskmanager.numberOfTaskSlots: 1, но рекомендуется 2 через flinkConfiguration). 3 TM × 2 slots = 6 slots для parallelism 6.

job.upgradeMode: savepoint — при изменении spec оператор сделает savepoint, выключит джоб, развернёт новый образ, восстановится из savepoint. Деталям upgrade modes посвящён следующий урок.

job.state: running — переключение на suspended останавливает джоб (с savepoint). Возврат на running — поднимает обратно.

WARNING

serviceAccount: flink — оператор не создаёт SA автоматически. Вам нужно заранее создать SA с правами на доступ к S3 (если стейт хранится там через IRSA в EKS / Workload Identity в GKE) и Kubernetes RBAC для HA ConfigMap-ов. Helm-чарт ставит правильные права на cluster-роль операторa, но для job-ов нужен отдельный SA.


Лайфцикл: что делает оператор после apply

Когда вы делаете kubectl apply -f orders-to-iceberg.yaml, оператор проходит несколько фаз. Знание этих фаз помогает дебажить.

  1. CREATED — CRD принят, оператор увидел, начинает обработку.
  2. DEPLOYING — оператор создаёт ConfigMap (flink-conf.yaml, log4j), JobManager Deployment (с 2 replicas если HA), TaskManager Deployment (с указанным числом replicas), Service для REST/blob, HA ConfigMap-ы.
  3. DEPLOYED — поды стартанули, JM выбрал leader-а, REST API отвечает.
  4. SUBMITTED — JM получил JAR (через local:// из образа или через blob server), сабмитнул джоб, ExecutionGraph создан.
  5. STABLE — джоб running, первый checkpoint успешно сделан.

Каждый переход виден в kubectl describe flinkdeployment orders-to-iceberg в секции Events и Status. Если что-то не так — kubectl logs -n flink deployment/flink-kubernetes-operator покажет, на каком шаге застряли.

kubectl -n flink-jobs get flinkdeployment
# NAME                JOB STATUS   LIFECYCLE STATE
# orders-to-iceberg   RUNNING      STABLE

kubectl -n flink-jobs describe flinkdeployment orders-to-iceberg
# ...
# Status:
#   Job Status:
#     Job Id: 7a8e3f2b1d4c9a5e
#     Job Name: OrdersToIcebergJob
#     State: RUNNING
#     Start Time: 1715342401234
#   Lifecycle State: STABLE

Чего оператор НЕ делает (на что обратить внимание)

  1. Не управляет хранилищем для стейта. S3-бакет, IRSA-роль, политики доступа — это ваша инфра. Оператор только записывает туда чекпойнты через state.checkpoints.dir.

  2. Не делает мониторинг. Prometheus reporter включается в flinkConfiguration, но Prometheus-сервер, ServiceMonitor, дашборды Grafana — отдельная инфра. Этому посвящён урок 5.

  3. Не обновляет образ оператора сам. Helm upgrade при выходе новой версии — ручная операция. Перед upgrade проверьте compatibility matrix для вашей версии Flink.

  4. Не валидирует ваш JAR. Если в коде ошибка и джоб падает при старте — оператор увидит это и попробует пересоздать pod. Циклическое падение приведёт к RestartingState без выхода.

  5. Не управляет секретами. Kafka SASL, S3 credentials — это ваши K8s Secret-ы, монтируются в job pod через podTemplate.

TIP

podTemplate в FlinkDeployment.spec — это мощный механизм инъекции произвольной спецификации Pod (volumes, env, sidecars, securityContext). Используйте его для секретов, volume mounts и инициализационных контейнеров. Документация на сайте оператора показывает примеры.


Ключевые выводы

  1. Flink Kubernetes Operator — декларативное управление Flink-кластерами через CRD: вы описываете желаемое состояние, оператор приводит реальное к нему.

  2. FlinkDeployment — весь кластер (один JM-Deployment, один TM-Deployment, ConfigMap). С секцией job: — application mode (рекомендуется). Без — session mode.

  3. FlinkSessionJob — отдельный джоб в существующем session-кластере. Удобно для пачки маленьких ad-hoc джобов.

  4. Установка: Helm-чарт из официального репозитория Apache Flink, обязательно ограничить watchNamespaces.

  5. Минимальный production-ready манифест включает: HA через K8s, RocksDB + S3 checkpoints, JM replicas: 2, TM resource limits, Prometheus reporter, upgradeMode: savepoint.

  6. Lifecycle states: CREATED -> DEPLOYING -> DEPLOYED -> SUBMITTED -> STABLE. Видны в kubectl describe.

  7. Оператор не делает за вас: инфру для стейта, мониторинг, секреты, кастомные образы.

Проверка знанийKnowledge check
У вас 30 микро-джобов, каждый обрабатывает поток мониторинговых событий от одной команды (parallelism = 1, малый стейт). Также есть 3 критичных пайплайна для биллинга (parallelism = 8, большой стейт, требуют чёткой SLA). Как оптимально разнести их по FlinkDeployment / FlinkSessionJob?
ОтветAnswer
30 микро-джобов имеет смысл поднять как FlinkSessionJob внутри одного-двух session-кластеров (FlinkDeployment без job:). Экономия ресурсов: один JobManager на все 30 джобов, общий пул TaskManager-ов. Простой деплой через kubectl apply отдельных FlinkSessionJob YAML. 3 критичных пайплайна для биллинга должны быть отдельными FlinkDeployment (application mode) — каждый в своём кластере. Изоляция отказов критична: сбой одного билинг-джоба не должен утащить с собой другие. Ресурсы выделены отдельно (свои TM, свой JM), savepoint и rollback на уровне отдельного кластера. HA включается per-кластер (JM replicas: 2). Это стандартный production-паттерн: критичные = application mode, лёгкие edge = session mode.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда разворачивает Flink Kubernetes Operator 1.14 в кластере. Они хотят, чтобы оператор управлял только джобами в двух namespace-ах: prod-flink и stage-flink. Какой helm-параметр обеспечит наименьшие привилегии?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5