Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 18 мин
Средний
Job LifecycleCheckpointsFailoverSubmitRecovery

Job lifecycle: submit, schedule, run, checkpoint, finish

Каждый Flink job проходит через цепочку состояний: submitted -> scheduled -> running -> finished. По пути могут случаться checkpoints, failures, restarts. Понимание этой последовательности — основа для troubleshooting в production. Когда job застрял в RESTARTING или показывает FAILED — вы знаете, в какой фазе lifecycle проблема, и куда смотреть.

К концу урока вы сможете прочитать любое состояние job в Web UI и сказать, что происходит и что делать.


States: алфавит lifecycle

Flink job на уровне JobMaster имеет конечный набор состояний:

СостояниеЧто значит
CREATEDТолько что собран JobGraph, ещё ничего не запущено
RUNNINGSubtasks запущены и обрабатывают данные
FAILINGОдин или несколько subtasks упали, идёт failover
FAILEDFailover не удался, job полностью упал
CANCELLINGПользователь запросил cancel, идёт graceful shutdown
CANCELEDJob отменён пользователем
FINISHEDJob завершился штатно (bounded source закончился)
RESTARTINGJob перезапускается из последнего checkpoint
SUSPENDEDJob приостановлен (например, JobManager failover)
RECONCILINGВосстановление после JobManager failover в HA mode

В Web UI вы видите эти состояния как цветные индикаторы. Зелёный (RUNNING) — норма. Жёлтый/оранжевый (RESTARTING, FAILING) — внимание. Красный (FAILED) — авария.

Subtasks имеют свои подсостояния (CREATED, DEPLOYING, RUNNING, FAILED, CANCELED, FINISHED) — это видно при кликe на оператор в Web UI.


Submit: путь от вашего кода до RUNNING

Submit job: последовательность событий
Client (flink run)
Dispatcher
ResourceManager
JobMaster
TaskManager
POST /jobs (JobGraph + JAR)start JobMaster for this jobrequestSlots(parallelism=N)ensure TaskManager has free slotsslots allocateddeploySubtask(subtask, slot)subtask RUNNINGheartbeat + metrics (every 10s)

После последнего шага — job в RUNNING. Все subtasks обрабатывают данные. Pipeline работает.

Время от submit до RUNNING:

  • Standalone (наш docker compose): 1-3 секунды.
  • Native Kubernetes с уже работающим TaskManager pool: 5-10 секунд.
  • Native Kubernetes с нуля (нужны новые TM pods): 20-60 секунд (включая pull image, K8s scheduling).

Checkpoints: невидимая работа

Когда job в RUNNING, JobMaster регулярно (по execution.checkpointing.interval, типично 10-60 секунд) триггерит checkpoint — снимок distributed state. Это критически важная фоновая работа.

Зачем нужны checkpoints

При failover (TaskManager упал, или job упал, или вы хотите restart) Flink восстанавливает state из последнего успешного checkpoint. Без checkpoints state потерян, job начинает обработку “с нуля” — что для CDC или stateful aggregations означает дублирование или потерю данных.

Как это работает (обзор, без deep dive)

Chandy-Lamport algorithm: JobMaster инжектит checkpoint barrier в source streams. Barrier — это специальная маркерная запись с checkpoint ID. Source emit’ит barrier между нормальными записями.

Chandy-Lamport алгоритм: как barrier-ы распространяются

Каждый оператор, получая barrier на всех входах, snapshot’ит свой state (в RocksDB или HashMap), пишет в state backend (S3, HDFS, file), и пересылает barrier дальше. Когда barrier дошёл до всех sinks и они подтвердили — checkpoint complete.

Checkpoint flow (упрощённо)
JobMaster: trigger checkpoint #NJobMaster каждые execution.checkpointing.interval ms инициирует новый checkpoint. Барьер с ID = N инжектится во все sources одновременно.

Source snapshots offset

Source записывает текущий Kafka offset (или другую source position) в свой state и emit'ит barrier #N в down-stream.
barrier #N

Operator snapshots state

Оператор получает barrier. Ждёт barrier на всех входах (alignment). Snapshot'ит keyed/operator state. Emit'ит barrier дальше.
barrier #N

Sink ACKs checkpoint

Sink получает barrier. Подтверждает в JobMaster. Когда все sinks подтвердили — checkpoint complete.
State backend: persist snapshotsState snapshots пишутся в state backend (S3, HDFS, file). Метаданные checkpoint пишутся в `_metadata` файл. Старые checkpoints (за пределами `state.checkpoints.num-retained`) удаляются.

Подробное устройство Chandy-Lamport, aligned vs unaligned checkpoints, incremental checkpoints в RocksDB — модуль 10. Сейчас важно понимать: checkpoint происходит в фоне, не блокирует обработку, и необходим для failover.


Что происходит при сбое

Сбой TaskManager — самый частый production-сценарий. Что происходит:

  1. Detection: JobMaster не получает heartbeat от TM в течение heartbeat.timeout (по умолчанию 50 с).
  2. Mark failed: JobMaster помечает все subtasks этого TM как FAILED.
  3. Failover strategy определяет, что перезапустить. Default — full restart всего job. С region-based failover (для embarrassingly parallel jobs) — только affected region.
  4. State restore: новые subtasks (на других TM или новых TM, если K8s создаёт) восстанавливают state из последнего successful checkpoint.
  5. Source rewind: sources читают с offset, который был на момент checkpoint (KafkaSource — с saved Kafka offset).
  6. Resume: job в RESTARTING -> RUNNING. Обработка возобновляется с момента последнего checkpoint.
WARNING

Restart значит повторную обработку данных от последнего checkpoint. Это нормально — Flink обеспечивает exactly-once семантику через checkpoint + transactional sinks. Но если ваш sink не transactional (например, обычный HTTP API без идемпотентности), то restart приведёт к дубликатам. Это покрывается в модуле 11.

Restart strategies

Flink имеет несколько restart strategies:

  • No restart: job упал -> finished. Используется в тестах.
  • Fixed delay restart: попытка restart N раз с задержкой M секунд. Если не получилось — FAILED.
  • Failure rate restart: разрешено M failures в N минут. Превысили — FAILED.
  • Exponential delay restart: задержка растёт экспоненциально (для перегруженных систем, чтобы не долбить).

Конфигурируется в FLINK_PROPERTIES:

restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 10min
restart-strategy.failure-rate.delay: 30s

Для production typically failure-rate с 5-10 failures за 1-2 часа — достаточно много для transient failures, недостаточно для постоянной проблемы.


Cancellation: graceful vs forced

Когда вы хотите остановить job:

Cancel (flink cancel <jobid>): JobMaster команды cancel всем subtasks. Source закрывается, оставшиеся в pipeline данные обрабатываются, sinks closing. Это graceful shutdown, но state не сохранён (нет savepoint).

Stop with savepoint (flink stop -s <savepoint-dir> <jobid>): то же самое, но перед закрытием триггерится финальный savepoint. State сохранён, можно restart с него. Правильный путь для production upgrades.

Kill (kill -9 на JobManager или TaskManager процессы): не graceful, state не сохранён. Только если ничего другое не работает.

В Flink Kubernetes Operator есть upgradeMode: savepoint — означает, что при обновлении (через FlinkDeployment CRD) Operator автоматически делает stop-with-savepoint, обновляет deployment, restart с savepoint. Это покрывается в модуле 15.


Bounded source: FINISHED state

Если job читает из bounded source (например, FileSource на ограниченном наборе файлов), то после прочтения всех данных:

  1. Source говорит “I’m done” в emit стрим.
  2. Все down-stream операторы получают “end of stream” сигнал.
  3. Window-операторы emit’ят финальные окна.
  4. Sink’и закрываются.
  5. Job переходит в FINISHED state.

Это batch processing на DataStream API — bounded execution. Application mode + bounded source = batch job. После FINISHED ресурсы (TaskManager pods в K8s) могут быть освобождены.

Для streaming-приложений с unbounded sources (Kafka, Pulsar) FINISHED обычно не достигается — job работает 24/7.


Failover при JobManager сбое (HA)

В Standalone (наш docker compose) JobManager — single point of failure. Если он упал — job останавливается, manual restart.

В Native Kubernetes mode с HA (high availability):

  1. JobManager state хранится в HA storage (Kubernetes ConfigMap или ZooKeeper).
  2. Lock на лидерство держится через K8s API (lease) или ZooKeeper.
  3. Если active JobManager упал — standby обнаруживает потерю lock через timeout, берёт lock, становится active.
  4. Новый JobManager восстанавливает состояние: ExecutionGraph, последний checkpoint, кто на каких слотах.
  5. TaskManager’ы обнаруживают нового JobManager (через service discovery), переподключаются.
  6. Job переходит в RECONCILING -> RUNNING.

Время failover JobManager — 30 секунд - 2 минуты. Это покрывается в модуле 15 (Kubernetes deployment).


Failover scenarios: что вы увидите в Web UI

Common failover scenarios
TM OOMTaskManager закрылся (OOM kill). Job в RESTARTING ~30 секунд (timeout + restart delay), потом RUNNING. Состояние восстановилось из последнего checkpoint. В логах JM: 'lost TaskManager'.
TM сетевой timeoutTM не отвечает (сеть, GC pause). JM считает мёртвым. Failover. Если TM был жив — он переподключается, но JM уже инициировал restart. Возможен race condition в логах.
Slow checkpoints / failuresCheckpoints не успевают за интервал. Накапливаются. В Web UI: Checkpoints tab показывает 'In Progress' и failures. Не блокирует job, но указывает на проблему (большой state, slow IO).
Code exception (NPE и т.п.)Subtask упал с RuntimeException. Failover полного job (или region). Restart восстанавливает state, но если bug в коде — упадёт снова. Restart strategy определяет, сколько раз пробовать.
Source unreachableKafka unavailable. Source падает с exception. Failover. Restart — source пытается reconnect. Если Kafka down 5+ минут — job FAILED по failure-rate strategy.
JM upgrade (rolling)С Flink Kubernetes Operator: savepoint + cancel + новый pod + restart с savepoint. Downtime: 30 секунд - 2 минуты. State сохраняется.

Попробуй сам

  1. Submit job через docker compose из урока 00.3.
  2. Найдите checkpoints tab в Web UI. Какой интервал? Как долго длится checkpoint? Какой размер state?
  3. Симулируйте сбой: docker kill flink-tm-1. Что произошло в Web UI? Сколько времени job был в RESTARTING? После recovery — состояние правильно?
  4. Cancel правильно: flink stop -s file:///tmp/flink-savepoints/ <jobid>. Откройте /tmp/flink-savepoints в JM container — увидите savepoint. Это то, что вы можете использовать для restart с сохранённым state.
  5. Symuliruйте restart strategy: настройте failure-rate restart, и убейте TM несколько раз подряд. После N failures job FAILED — это default behavior, защищающий от endless restart loop.
Проверка знанийKnowledge check
Production Flink job показывает в Web UI: 'RESTARTING' уже 5 минут. Checkpoints tab показывает все zelyne галочки на последних 10 checkpoint'ах. В чём вероятная причина и где смотреть?
ОтветAnswer
Checkpoint history зелёная — значит сами snapshots работают, но job не может полностью восстановиться. Вероятные причины: (1) недостаточно slots для restart — например, упал TaskManager, и в кластере не хватает свободных slots для restart subtasks. Смотреть в Web UI: Overview -> Available Task Slots. Если 0 — нужно добавить TM или дождаться возврата упавшего. (2) Source unreachable — например, Kafka брокер недоступен, KafkaSource не может reconnect. Смотреть TaskManager logs на subtask Source — там будут ConnectionRefusedException. (3) Restart с savepoint, но savepoint corrupted или несовместим (изменилась схема state) — смотреть JobManager logs на 'savepoint' ошибки. (4) Restart strategy с long backoff — например, exponential backoff достиг минут задержки. Смотреть FLINK_PROPERTIES restart-strategy конфиг. Первое, что делать: открыть TaskManager logs (если они есть) и JobManager logs за последние 10 минут.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Production Flink job уже 5 минут в состоянии RESTARTING. Checkpoints tab показывает успешные checkpoints на последних 10 запусках. Какая вероятная причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4