Job lifecycle: submit, schedule, run, checkpoint, finish
Каждый Flink job проходит через цепочку состояний: submitted -> scheduled -> running -> finished. По пути могут случаться checkpoints, failures, restarts. Понимание этой последовательности — основа для troubleshooting в production. Когда job застрял в RESTARTING или показывает FAILED — вы знаете, в какой фазе lifecycle проблема, и куда смотреть.
К концу урока вы сможете прочитать любое состояние job в Web UI и сказать, что происходит и что делать.
States: алфавит lifecycle
Flink job на уровне JobMaster имеет конечный набор состояний:
| Состояние | Что значит |
|---|---|
| CREATED | Только что собран JobGraph, ещё ничего не запущено |
| RUNNING | Subtasks запущены и обрабатывают данные |
| FAILING | Один или несколько subtasks упали, идёт failover |
| FAILED | Failover не удался, job полностью упал |
| CANCELLING | Пользователь запросил cancel, идёт graceful shutdown |
| CANCELED | Job отменён пользователем |
| FINISHED | Job завершился штатно (bounded source закончился) |
| RESTARTING | Job перезапускается из последнего checkpoint |
| SUSPENDED | Job приостановлен (например, JobManager failover) |
| RECONCILING | Восстановление после JobManager failover в HA mode |
В Web UI вы видите эти состояния как цветные индикаторы. Зелёный (RUNNING) — норма. Жёлтый/оранжевый (RESTARTING, FAILING) — внимание. Красный (FAILED) — авария.
Subtasks имеют свои подсостояния (CREATED, DEPLOYING, RUNNING, FAILED, CANCELED, FINISHED) — это видно при кликe на оператор в Web UI.
Submit: путь от вашего кода до RUNNING
После последнего шага — job в RUNNING. Все subtasks обрабатывают данные. Pipeline работает.
Время от submit до RUNNING:
- Standalone (наш docker compose): 1-3 секунды.
- Native Kubernetes с уже работающим TaskManager pool: 5-10 секунд.
- Native Kubernetes с нуля (нужны новые TM pods): 20-60 секунд (включая pull image, K8s scheduling).
Checkpoints: невидимая работа
Когда job в RUNNING, JobMaster регулярно (по execution.checkpointing.interval, типично 10-60 секунд) триггерит checkpoint — снимок distributed state. Это критически важная фоновая работа.
Зачем нужны checkpoints
При failover (TaskManager упал, или job упал, или вы хотите restart) Flink восстанавливает state из последнего успешного checkpoint. Без checkpoints state потерян, job начинает обработку “с нуля” — что для CDC или stateful aggregations означает дублирование или потерю данных.
Как это работает (обзор, без deep dive)
Chandy-Lamport algorithm: JobMaster инжектит checkpoint barrier в source streams. Barrier — это специальная маркерная запись с checkpoint ID. Source emit’ит barrier между нормальными записями.
Chandy-Lamport алгоритм: как barrier-ы распространяютсяКаждый оператор, получая barrier на всех входах, snapshot’ит свой state (в RocksDB или HashMap), пишет в state backend (S3, HDFS, file), и пересылает barrier дальше. Когда barrier дошёл до всех sinks и они подтвердили — checkpoint complete.
Source snapshots offset
Source записывает текущий Kafka offset (или другую source position) в свой state и emit'ит barrier #N в down-stream.Operator snapshots state
Оператор получает barrier. Ждёт barrier на всех входах (alignment). Snapshot'ит keyed/operator state. Emit'ит barrier дальше.Sink ACKs checkpoint
Sink получает barrier. Подтверждает в JobMaster. Когда все sinks подтвердили — checkpoint complete.Подробное устройство Chandy-Lamport, aligned vs unaligned checkpoints, incremental checkpoints в RocksDB — модуль 10. Сейчас важно понимать: checkpoint происходит в фоне, не блокирует обработку, и необходим для failover.
Что происходит при сбое
Сбой TaskManager — самый частый production-сценарий. Что происходит:
- Detection: JobMaster не получает heartbeat от TM в течение
heartbeat.timeout(по умолчанию 50 с). - Mark failed: JobMaster помечает все subtasks этого TM как FAILED.
- Failover strategy определяет, что перезапустить. Default — full restart всего job. С region-based failover (для embarrassingly parallel jobs) — только affected region.
- State restore: новые subtasks (на других TM или новых TM, если K8s создаёт) восстанавливают state из последнего successful checkpoint.
- Source rewind: sources читают с offset, который был на момент checkpoint (KafkaSource — с saved Kafka offset).
- Resume: job в RESTARTING -> RUNNING. Обработка возобновляется с момента последнего checkpoint.
Restart значит повторную обработку данных от последнего checkpoint. Это нормально — Flink обеспечивает exactly-once семантику через checkpoint + transactional sinks. Но если ваш sink не transactional (например, обычный HTTP API без идемпотентности), то restart приведёт к дубликатам. Это покрывается в модуле 11.
Restart strategies
Flink имеет несколько restart strategies:
- No restart: job упал -> finished. Используется в тестах.
- Fixed delay restart: попытка restart N раз с задержкой M секунд. Если не получилось — FAILED.
- Failure rate restart: разрешено M failures в N минут. Превысили — FAILED.
- Exponential delay restart: задержка растёт экспоненциально (для перегруженных систем, чтобы не долбить).
Конфигурируется в FLINK_PROPERTIES:
restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 10min
restart-strategy.failure-rate.delay: 30s
Для production typically failure-rate с 5-10 failures за 1-2 часа — достаточно много для transient failures, недостаточно для постоянной проблемы.
Cancellation: graceful vs forced
Когда вы хотите остановить job:
Cancel (flink cancel <jobid>): JobMaster команды cancel всем subtasks. Source закрывается, оставшиеся в pipeline данные обрабатываются, sinks closing. Это graceful shutdown, но state не сохранён (нет savepoint).
Stop with savepoint (flink stop -s <savepoint-dir> <jobid>): то же самое, но перед закрытием триггерится финальный savepoint. State сохранён, можно restart с него. Правильный путь для production upgrades.
Kill (kill -9 на JobManager или TaskManager процессы): не graceful, state не сохранён. Только если ничего другое не работает.
В Flink Kubernetes Operator есть upgradeMode: savepoint — означает, что при обновлении (через FlinkDeployment CRD) Operator автоматически делает stop-with-savepoint, обновляет deployment, restart с savepoint. Это покрывается в модуле 15.
Bounded source: FINISHED state
Если job читает из bounded source (например, FileSource на ограниченном наборе файлов), то после прочтения всех данных:
- Source говорит “I’m done” в emit стрим.
- Все down-stream операторы получают “end of stream” сигнал.
- Window-операторы emit’ят финальные окна.
- Sink’и закрываются.
- Job переходит в FINISHED state.
Это batch processing на DataStream API — bounded execution. Application mode + bounded source = batch job. После FINISHED ресурсы (TaskManager pods в K8s) могут быть освобождены.
Для streaming-приложений с unbounded sources (Kafka, Pulsar) FINISHED обычно не достигается — job работает 24/7.
Failover при JobManager сбое (HA)
В Standalone (наш docker compose) JobManager — single point of failure. Если он упал — job останавливается, manual restart.
В Native Kubernetes mode с HA (high availability):
- JobManager state хранится в HA storage (Kubernetes ConfigMap или ZooKeeper).
- Lock на лидерство держится через K8s API (lease) или ZooKeeper.
- Если active JobManager упал — standby обнаруживает потерю lock через timeout, берёт lock, становится active.
- Новый JobManager восстанавливает состояние: ExecutionGraph, последний checkpoint, кто на каких слотах.
- TaskManager’ы обнаруживают нового JobManager (через service discovery), переподключаются.
- Job переходит в RECONCILING -> RUNNING.
Время failover JobManager — 30 секунд - 2 минуты. Это покрывается в модуле 15 (Kubernetes deployment).
Failover scenarios: что вы увидите в Web UI
Попробуй сам
- Submit job через docker compose из урока 00.3.
- Найдите checkpoints tab в Web UI. Какой интервал? Как долго длится checkpoint? Какой размер state?
- Симулируйте сбой:
docker kill flink-tm-1. Что произошло в Web UI? Сколько времени job был в RESTARTING? После recovery — состояние правильно? - Cancel правильно:
flink stop -s file:///tmp/flink-savepoints/ <jobid>. Откройте /tmp/flink-savepoints в JM container — увидите savepoint. Это то, что вы можете использовать для restart с сохранённым state. - Symuliruйте restart strategy: настройте failure-rate restart, и убейте TM несколько раз подряд. После N failures job FAILED — это default behavior, защищающий от endless restart loop.