Справочник ключевых терминов курса Apache Flink Internals.
Компонент JobManager, который принимает входящие job submissions через REST API, создаёт отдельный JobMaster на каждый job и хранит JobGraphStore (для HA — в ZooKeeper или Kubernetes ConfigMap). При рестарте Dispatcher восстанавливает все running jobs из persisted JobGraph storage. Это entry point для всех client-side операций — submit, cancel, savepoint trigger.
REST вызов POST /jobs приходит в Dispatcher, который сохраняет JobGraph в JobGraphStore и спавнит JobMaster.Компонент Flink, ответственный за allocation и tracking task slots в кластере. В Standalone-режиме держит статический пул TaskManager-ов; в Kubernetes/YARN-режимах динамически просит у внешнего orchestrator-а новые TM-pods/контейнеры через SlotManager. Различает ResourceManager Flink и внешний (k8s scheduler) — они работают вместе.
При scale-up adaptive scheduler запрашивает у RM N новых slots, RM просит у K8s N новых TM-pod через KubernetesResourceManagerDriver.Per-job компонент в JobManager process, который владеет жизненным циклом конкретного job: разворачивает JobGraph в ExecutionGraph, scheduleит ExecutionVertex на slots, управляет CheckpointCoordinator, обрабатывает failures и restarts. До Flink 1.6 был JobManager-monolith; сейчас JobMaster + Dispatcher + RM разделены.
При partial failure subtask JobMaster триггерит RegionFailoverStrategy и рестартит только pipeline region вместо всего job.Logical-уровень представление job, который клиент отправляет в Dispatcher. Получается из StreamGraph после оптимизаций — chaining операторов, slot sharing groups, co-location groups. Содержит JobVertices (соответствуют operator chains) и IntermediateDataSets (логические результаты vertices). Сериализуется и хранится в JobGraphStore для HA.
Physical-уровень граф, который JobMaster разворачивает из JobGraph. Каждая JobVertex превращается в N ExecutionVertex (по parallelism), каждый — в Execution (текущая попытка запуска). ExecutionGraph хранит state каждого vertex (CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED/FAILED) и управляет failover.
Самое раннее (logical) представление DataStream pipeline до оптимизаций. Строится в client-process из вызовов API (env.fromSource, map, keyBy и т.д.). StreamGraphGenerator затем превращает StreamGraph в JobGraph, применяя operator chaining, fixing slot sharing и resolving forward edges.
Логическая группа операторов, чьи subtasks с одинаковым subtask-index делят один task slot. По дефолту все операторы в default group — это снижает резервирование slots до max(parallelism). Можно разделить через .slotSharingGroup("name") — полезно для изоляции heavy операторов (RocksDB-stateful от lightweight map).
stream.map(...).slotSharingGroup("compute").keyBy(...).process(new StatefulFn()).slotSharingGroup("state");Более строгая чем slot sharing: subtasks с одинаковым subtask-index ГАРАНТИРОВАННО запускаются в одном slot. Используется для iterations (head и tail итерации должны быть на одном TM для tight loop) и для co-located join без shuffle. Конфигурируется через CoLocationConstraint, не пользовательским API напрямую.
Единица resource allocation в TaskManager — фиксированная часть managed memory и CPU. По дефолту 1 TM = N slots (taskmanager.numberOfTaskSlots), и каждый slot хостит one subtask per operator chain. С slot sharing N разных operators могут жить в одном slot, деля JVM heap и network buffers.
TM с 4 slots при parallelism=4 джобе хостит 4 параллельных pipeline replicas (по одной в каждый slot).Network flow control mechanism (FLIP-27 era — с 1.5), при котором downstream subtask явно объявляет upstream subtask сколько buffers он готов принять (credits). Upstream НЕ шлёт данные пока downstream не выдал кредиты. Заменил старый TCP-backpressure через сетевой stack и убрал head-of-line blocking между subtasks на одном TM.
Default network stack Flink, основанный на Netty. Хостит ResultPartition (output buffers оператора), InputGate (input buffers downstream subtask), а также NetworkBufferPool — глобальный пул direct-memory сегментов. Поддерживает credit-based flow control и pipelined результаты для streaming, blocking — для batch.
Network buffer, выделенный конкретно одному input channel (один upstream subtask -> один downstream channel). По дефолту 2 exclusive buffers на channel (taskmanager.network.memory.buffers-per-channel). Гарантирует минимальную пропускную способность даже при contention, но фрагментирует память.
Pool of network buffers, распределяемых динамически между channels одного InputGate. Если один channel hot (быстрый upstream), он получает больше floating buffers; медленные обходятся exclusive minimum. Размер пула — taskmanager.network.memory.floating-buffers-per-gate (default 8).
Adaptive механизм (FLIP-183, Flink 1.14+) который динамически уменьшает размер buffer-аs в очереди, целясь на фиксированное время обработки (taskmanager.network.memory.buffer-debloat.target-ms, default 1s). Снижает checkpoint alignment time под backpressure ценой небольшого throughput penalty. Включается через taskmanager.network.memory.buffer-debloat.enabled=true.
Состояние, когда downstream оператор не успевает обрабатывать поступающие данные, и upstream вынужден замедляться (через credit exhaust). Видна в Flink UI как gradient (yellow/red) per subtask. Метрики: backPressuredTimeMsPerSecond, isBackPressured. Под backpressure aligned checkpoints раздуваются — нужны unaligned checkpoints + buffer debloating.
Off-heap память Flink, управляемая внутренне через MemoryManager. Используется RocksDB (block cache + write buffers через WriteBufferManager), batch операторами (hash join, sort) и Python (Beam fn worker). Размер задаётся taskmanager.memory.managed.size или .fraction (default 0.4 от total flink). Главное преимущество — отсутствие GC pressure.
Direct off-heap память для network buffers (Netty). Размер: taskmanager.memory.network.min/max/fraction (default 0.1 от total flink, 64MB-1GB). Делится между ResultPartition (output) и InputGate (input). Под backpressure именно эти buffers держат накопленные данные перед barrier.
Память вне JVM heap, не управляемая GC. В Flink делится на Managed (off-heap) и Direct (network buffers + framework off-heap). У TM total off-heap = managed + network + framework.off-heap + task.off-heap + jvm metaspace + jvm overhead. На K8s именно эта сумма + heap должна вмещаться в pod memory limit.
JVM-allocated off-heap buffers через ByteBuffer.allocateDirect или Unsafe. Используется Netty network stack и DirectMemorySegment. Лимит: -XX:MaxDirectMemorySize, в Flink выставляется автоматически = network + task.off-heap + framework.off-heap. Превышение -> OOMKilled на k8s (видно не сразу).
Off-heap область JVM для class metadata (после Java 8 заменила PermGen). В Flink: taskmanager.memory.jvm-metaspace.size (default 256MB). Если deploy много jobs с разными dependencies на один TM — может разрастаться, OOM Metaspace. На Application Mode (1 jar per cluster) — обычно не проблема.
Резерв в Flink memory model на thread stacks, code cache, GC structures, JNI native allocations (не покрытые managed/direct). Default: taskmanager.memory.jvm-overhead.fraction=0.1 от total process memory (min 192MB, max 1GB). Если RocksDB или сторонние JNI-библиотеки текут — этот резерв первый кандидат на увеличение перед OOMKilled.
Embedded LSM-tree KV-store от Facebook, основной state backend Flink для больших state (TB-scale per TM). Хранит state на локальном диске, не в heap — устраняет GC pressure. Каждый ColumnFamily = один state primitive (ValueState, MapState и т.д.) per operator. RocksDB же лежит в основе ForstDB (disaggregated state, Flink 2.0).
Log-Structured Merge tree — структура данных для write-optimized KV-store. Запись идёт в in-memory MemTable, который при заполнении flush'ится на диск как immutable SST file. Background compaction сливает SST разных уровней для уменьшения read amplification. Trade-off: low write amp + high read amp (надо смотреть в несколько SST).
In-memory write buffer RocksDB (по дефолту skip-list, sorted). Все записи (Put/Delete/Merge) идут сначала сюда + в WAL. При достижении write_buffer_size (default 64MB) MemTable immutable -> flush на диск как L0 SST. Количество MemTable одновременно: max_write_buffer_number (default 2-4). В Flink WAL отключён — durability через checkpoints.
Sorted String Table — immutable on-disk файл RocksDB, формат для одного flush'а MemTable или результата compaction. Содержит блоки данных (4KB-32KB), index block, filter block (Bloom), footer. Уровни L0...L6: L0 — flush из MemTable (могут overlap по ключам), L1+ — non-overlapping после compaction.
Логически отдельное keyspace в RocksDB с собственными MemTable и SST files (но shared WAL). Flink создаёт одну CF на каждый state primitive в операторе: ValueState count -> cf 'count'. У каждой CF свои options (write_buffer_size, block_cache, compression) — можно тюнить per-state.
То же что MemTable, термин из RocksDB options API: write_buffer_size, max_write_buffer_number. В Flink 1.10+ через WriteBufferManager все CF в одном TM делят общий бюджет (sharing managed memory), чтобы избежать unbounded memory leak при многих CF.
RocksDB-level LRU кэш блоков прочитанных SST-файлов. Без него каждый read = disk I/O. По дефолту 8MB на CF; в Flink через managed memory все CF делят shared cache. Метрика rocksdb_block_cache_hit / rocksdb_block_cache_miss — ниже 90% hit ratio = stateful read-heavy джоб надо тюнить.
Probabilistic data structure в footer каждого SST: «точно нет такого ключа в SST» или «возможно есть». Используется RocksDB при point lookup, чтобы пропускать SST без disk read. Включается через cf_option.bloom_filter (default 10 bits/key -> ~1% false positive). Критично для read-heavy workloads с большим N SST.
Background процесс слияния SST разных уровней для устранения дубликатов, применения tombstones и уменьшения read amplification. Стратегии: Level (default, low space amp), Universal (low write amp, для logging), FIFO (TTL-based). Compaction stalls — write throttle когда L0 переполнен -> flush блокируется -> write latency скачет в 10x.
Классический distributed snapshot algorithm (1985), на котором основан Flink checkpoint. Идея: marker (barrier) пускается от sources вниз по DAG; каждый процесс при получении marker фиксирует свой state и forward'ит marker дальше. Получается globally consistent snapshot без stop-the-world. Flink-вариант называется Asynchronous Barrier Snapshotting.
Flink-вариант Chandy-Lamport, описан в paper Carbone et al. 2015. CheckpointCoordinator инжектит barrier в sources с monotonically растущим ID; barrier пробегает DAG, каждый оператор при получении barrier на всех input channels снимает snapshot своего state (asynchronously, через RocksDB native checkpoint API) и forward'ит barrier downstream. Sink ack-ит — checkpoint complete.
Default режим: оператор с несколькими input channels БЛОКИРУЕТ уже-получивший-barrier channel пока остальные не получат свой barrier. Это гарантирует consistent snapshot без необходимости snapshot-ить in-flight buffer. Под backpressure alignment time раздувается до минут — отсюда unaligned checkpoints как альтернатива.
FLIP-76 (Flink 1.11+) альтернатива aligned: при получении barrier на ANY input channel оператор сразу снимает snapshot и пишет ВСЕ in-flight buffers (включая overtaken-data на других channels) в checkpoint. Барьер obgонит buffers и пробежит DAG быстро. Trade-off: больший checkpoint size, но низкая end-to-end latency под backpressure.
Optimization для RocksDB state backend: вместо полного snapshot копируется только новые SST с прошлого checkpoint (плюс manifest и MANIFEST-журнал). RocksDB native CheckpointAPI создаёт hardlinks на SST без копирования файлов. Снижает checkpoint size в 10-100x для slowly-evolving state. Включается state.backend.incremental=true.
Newer (Flink 1.15+, FLIP-158) механизм через changelog state backend: каждое state update сразу пишется в durable changelog (DFS append-only log), а MaterializationCheckpoint периодически коммитит RocksDB snapshot. Снижает checkpoint duration до миллисекунд независимо от state size — barrier надо лишь дописать changelog offset.
Wrapper-state-backend (FLIP-158): любое write в state дополнительно append-ится в durable changelog file на DFS. При checkpoint надо коммитить только offset в changelog (а не SST). Базовый state backend (RocksDB или EmbeddedRocksDBStateBackend) периодически materialize'ится в фоне. Низкая checkpoint duration ценой увеличенного write amplification.
Компонент JobMaster, инициирующий checkpoints по schedule (execution.checkpointing.interval), отслеживающий ack от sources/operators/sinks и финализирующий checkpoint metadata в _metadata файл на DFS. При timeout (execution.checkpointing.timeout) abortит checkpoint. Хранит список complete checkpoints (max-concurrent-checkpoints, max-retained).
Специальный stream-record (не event), который CheckpointCoordinator инжектит в sources при старте checkpoint. Содержит checkpoint ID и timestamp. Каждый оператор пропускает barrier strictly после events перед ним и strictly до events после — это гарантирует exactly-once semantics между checkpoints.
Backend-agnostic savepoint format (default с Flink 1.15, FLIP-203). Сериализуется независимо от state backend, что позволяет: HashMapStateBackend savepoint -> restore в EmbeddedRocksDBStateBackend и наоборот. Slower чем native, но enables backend migration. Триггерится через --type canonical.
Savepoint в формате конкретного state backend (для RocksDB — bunch SST + manifest). Быстрее create/restore (без сериализационного слоя), но завязан на backend. Не позволяет менять state backend между runs. Default для EmbeddedRocksDBStateBackend pre-1.15.
Batch-API (на DataSet, теперь на DataStream) для read/write savepoints как обычный dataset. Use cases: bootstrap state из исторических данных перед запуском streaming job, debug state-related bugs, migration state schema, форсированная injection компенсирующих записей. SavepointWriter.forNewSavepoint / SavepointReader.read.
SavepointWriter.newSavepoint(env, new HashMapStateBackend(), 4).withOperator("counter", transformation).write("file:///bootstrap-savepoint");Stable identifier оператора (uid("name")) для matching state из savepoint при restore. Если UID не задан — Flink генерирует из StreamGraph topology, что хрупко: любое изменение DAG -> state не найдется. Best practice: каждому stateful оператору явный uid(). Без UID restore с allowNonRestoredState=false фейлится при несовпадении.
stream.keyBy(...).process(new MyStatefulFn()).uid("stateful-fn").name("stateful-fn");Флаг при restore из savepoint (--allowNonRestoredState или -n), который позволяет игнорировать state, не сопоставленный с оператором (например, удалили оператор из DAG). По умолчанию false — Flink fail-fast'ит при mismatch. Опасный флаг: можно случайно потерять state из-за typo в UID.
Интерфейс (с Flink 1.11+, заменил AssignerWithPeriodicWatermarks) для генерации watermark'ов из event timestamp. Two methods: onEvent (per-record, для punctuated), onPeriodicEmit (по таймеру, для periodic). Регистрируется через WatermarkStrategy. Бывает forBoundedOutOfOrderness, forMonotonousTimestamps, custom.
FLIP-182 механизм (Flink 1.15+) для синхронизации watermark между параллельными source readers — если один partition быстро двигается вперёд по event time, а другой отстаёт, alignment пауzит fast reader пока slow не догонит. Конфигурируется .withWatermarkAlignment("group", maxDrift, updateInterval). Решает проблему unbounded state роста от idle partitions.
Состояние source/partition, который не emit'ит events дольше threshold (.withIdleness(Duration.ofMinutes(5))). Idle partition прекращает блокировать watermark от других partitions (отправляет idle signal). Без idleness один idle Kafka partition остановит global watermark и заморозит все windows на DAG.
Watermark, генерируемый по специальному маркеру в потоке (например, end-of-batch signal от upstream system). Implements WatermarkGenerator.onEvent, инспектирует поле события и emit'ит ctx.emitWatermark(...) когда встречает marker. Используется когда event-rate низкий, periodic не подходит.
Watermark, генерируемый по таймеру (default 200ms, pipeline.auto-watermark-interval). Реализуется через WatermarkGenerator.onPeriodicEmit. forBoundedOutOfOrderness — periodic вариант, держит max-seen-timestamp - outOfOrderness и emit'ит по таймеру.
Open-source SQL parser + query optimizer framework, на котором построен Flink SQL planner. Calcite предоставляет SqlNode (AST), RelNode (relational algebra), HepPlanner (rule-based optimizer) и VolcanoPlanner (cost-based). Flink расширяет Calcite через FlinkRelMdSelectivity, FlinkRelMdRowCount, кастомные rules в FlinkStreamRuleSets.
Calcite AST-нода после parsing — представляет элементы SQL syntax (SELECT, identifiers, expressions, joins). SqlValidator проверяет references, type coercions; SqlToRelConverter превращает SqlNode -> RelNode. На уровне SqlNode оптимизаций нет — только syntactic/semantic анализ.
Calcite relational algebra node — представляет операцию (TableScan, Filter, Project, Join, Aggregate). Каждый RelNode имеет input(s), traits (convention, collation, distribution) и реализуется конкретным backend (FlinkLogicalRel, FlinkPhysicalRel). Оптимизатор работает над plan tree из RelNode, применяя Rules.
Calcite-rule (RelOptRule), трансформирующий plan tree: matches pattern из RelNode -> produces equivalent (но более эффективный) sub-tree. Примеры: FilterMergeRule, ProjectMergeRule, JoinPushExpressionsRule, FlinkStreamingJoinReorderRule. HepPlanner применяет rules последовательно; VolcanoPlanner — top-down с cost.
VolcanoPlanner Calcite — оптимизатор, выбирающий план с минимальной cost. Cost = (rowCount, cpu, io). Flink использует FlinkRelMdRowCount для estimation cardinality (на основе table statistics или эвристик). CBO выбирает join order, decides build/probe side в HashJoin, push down predicates. Без statistics CBO работает на эвристиках — часто неоптимально.
Calcite-трансформация (RelDecorrelator), превращающая correlated subquery в JOIN. Например, WHERE x IN (SELECT y FROM t WHERE t.k=outer.k) -> semi-join. Без decorrelation subquery выполнится N раз (для каждой outer row), что катастрофично для streaming. Flink-планер триггерит decorrelation в logical phase.
Оптимизация: filter predicate (WHERE) проталкивается вниз по plan tree — в Source connector (если поддерживает SupportsFilterPushDown — Iceberg, Paimon, JDBC), тем самым уменьшая объём прочитанных строк. Для streaming критично — Kafka source не поддерживает predicate pushdown (broker не знает schema), Paimon — поддерживает.
CBO-оптимизация: для N-way join подобрать порядок join (A⋈B)⋈C vs A⋈(B⋈C) с минимальным intermediate result. Для streaming сложнее чем batch — нет статистик cardinality, regular join требует materialization. Flink с 1.17 поддерживает streaming join reorder для regular joins; для interval/temporal — нет.
Streaming-specific optimization (Flink 1.19+, FLIP-470 era): для regular join CDC-источников вместо materialization обеих сторон выполняется delta-join — каждое изменение в left/right сразу emit'ится merged в downstream. Существенно снижает state size (нужны только индексы), требует source с full snapshot (changelog source).
Calcite MultiJoin node — представление N-way join как single node вместо вложенных Join. Позволяет JoinReorderRule swap order по cost. В Flink активируется для regular streaming join >= 3 таблиц через JoinToMultiJoinRule. Если join содержит condition зависящий от outer (correlated), MultiJoin не применится.
Disaggregated KV-store от Alibaba (с Flink 2.0+, FLIP-423), форк RocksDB с separated compute и storage. Hot data — в local SSD/memory cache, cold — на S3/HDFS через ForstFs file system. SST файлы лежат на DFS, local — только cache. Позволяет TM-pods быть stateless (восстановление за секунды без download N TB state).
Async-friendly state API (Flink 2.0+, FLIP-424), заменяющий legacy ValueState.value() (sync, блокирующий thread). State V2: StateFuture<T> value().thenAccept(v -> ...). Все state operations стали async, batch'ятся внутри для амортизации latency disaggregated storage (S3 round-trip = 10-50ms). Required для ForstDB.
Pattern в State V2 API: state operation возвращает StateFuture<T>, который complete'ится после batch I/O round-trip к DFS. Flink runtime аккумулирует pending state operations в AEC (Async Execution Controller) и batch-flushит. User не должен делать .get() на StateFuture в processElement — это убьёт async.
Архитектурный паттерн (Flink 2.0+): state лежит на shared DFS (S3/HDFS), не на local disk TM. TM держит только working set в cache, restart = download cache (секунды) вместо download всего state (минуты на TB). Снимает coupling между state size и compute resources. Trade-off: latency state access вырастает с микросекунд до миллисекунд -> требует async API.
Scheduler (Flink 1.13+, FLIP-160), который не fail'ит job при недостатке slots, а вместо этого ждёт available resources и рестартит pipeline с tomorrow меньшим parallelism. Поддерживает up-scaling: при доступности новых slots — rescale пайплайна без manual savepoint/restart. Включается jobmanager.scheduler=adaptive.
Режим Flink (FLIP-159), при котором job всегда занимает ВСЕ доступные slots в кластере (jobmanager.scheduler=adaptive + scheduler-mode=reactive). Сам ResourceManager не requests slots — кластер scale up/down externally (k8s HPA, manual), а Flink reactively adjusts parallelism. Используется с autoscaler (например, Kubernetes operator) для elastic streaming.
Scheduler для batch jobs (FLIP-187, Flink 1.15+), который решает parallelism на runtime на основе фактического размера intermediate dataset (а не задаётся статически). Каждый stage запускается только после complete предыдущего; если предыдущий output 10TB — scheduler ставит большой parallelism, если 1MB — маленький. jobmanager.scheduler=adaptive-batch.
Batch-only optimization (FLIP-168, Flink 1.16+): для slow subtasks scheduler запускает дублирующую copy на другом TM; кто первый закончит — тот winner, медленный killed. Решает проблему straggler в batch (медленный диск, hot CPU). Stream — не применим (нельзя duplicate state-update). Конфигурация: execution.batch.speculative.enabled.
FLink Improvement Proposal #160 (2021), который представил adaptive scheduler как general framework для rescale без savepoint/restart. До FLIP-160 любая смена parallelism требовала canceling job + restore из savepoint. С adaptive scheduler — graceful rescale в пределах текущего execution.
Distributed transaction protocol: Phase 1 — coordinator говорит всем participants prepare (writes но не commit), participants ack. Phase 2 — coordinator говорит commit или abort. Flink использует 2PC в TwoPhaseCommittingSink: preCommit на checkpoint barrier, commit после notifyCheckpointComplete. Гарантирует exactly-once end-to-end с transactional sinks (Kafka, JDBC XA, Iceberg).
Интерфейс Sink V2 (FLIP-191), реализующий 2PC через SinkWriter (write + prepareCommit на checkpoint) и Committer (commit pre-committed transactions после notifyCheckpointComplete). Заменяет deprecated TwoPhaseCommitSinkFunction. Используется в KafkaSink, IcebergSink, JdbcXaSink. Гарантирует exactly-once при сочетании с checkpointing.
Уникальный идентификатор producer transaction в Kafka (transactional.id). Flink Kafka exactly-once sink генерирует TID per subtask + per checkpoint (prefix.subtask.checkpointId), что позволяет fencing — при restart старый TID становится invalid (epoch++), новый получает Producer ID. Без уникального TID два sink subtask с одинаковым TID повредят consistency.
Kafka mechanism: каждый init transactions с TID получает новый epoch (transactional.producer.id_and_epoch). Старая producer-инстанция с тем же TID но старым epoch получит InvalidProducerEpoch на любое write и не сможет corrupt transaction. Критично при Flink failover — старый зависший Kafka producer не разрушит state committed новой instance.
Kafka consumer setting (isolation.level=read_committed), при котором downstream consumer видит только commit-нутые транзакции, пропуская aborted и in-flight. Required для downstream consumer Flink exactly-once Kafka sink — без него consumer прочитает uncommitted сообщения (могут быть rolled back). Snickers подобный read-uncommitted дефолт.
JobMaster-side компонент Source V2 (FLIP-27), отвечающий за discovery splits (Kafka partitions, Iceberg manifest entries, files в S3) и assignment к SourceReader. Может работать в single-threaded mode на coordinator. Поддерживает checkpoint своего state (что назначено, что осталось). Replaces legacy SourceFunction который смешивал enumeration и reading.
TM-side компонент Source V2, читает assigned splits через SplitReader (pluggable, обёртка над KafkaConsumer/Avro DataFileReader). Emit events через ReaderOutput.collect и watermarks через emitWatermark. Поддерживает asynchronous IO через FutureCompletingBlockingQueue. Сохраняет split offsets в checkpoint.
FLink Improvement Proposal #27 (2019, GA в 1.12+) — unified Source API заменивший legacy SourceFunction. Разделяет SplitEnumerator (coordinator side) и SourceReader (TM side), что даёт: batch+stream unified, watermark alignment, dynamic split discovery, deterministic split assignment. Все новые source connectors (Kafka, Iceberg, Paimon) реализуют FLIP-27.
FLink Improvement Proposal #143 (2020, GA в 1.14+) — Sink API V2. Аналогично FLIP-27 для sinks: разделяет SinkWriter (на TM, пишет), Committer (комитит, опциональный), GlobalCommitter (single committer на cluster для idempotent commit). Поддерживает TwoPhaseCommittingSink из коробки. Заменяет SinkFunction.
Unified API для source/sink connectors в Flink, реализованный через FLIP-27 (sources) и FLIP-143 (sinks). Поддерживает batch+stream unified semantic, watermark alignment, checkpointable enumerator state, transactional commits. Все built-in connectors мигрированы. Legacy SourceFunction/SinkFunction deprecated с 1.17.
Apache Paimon (бывший Flink Table Store) — lakehouse-формат на LSM-tree на DFS (S3/HDFS). Каждая партиция = bucket'ы (key-range partitioning), каждый bucket — LSM с L0...Ln SST на DFS. Append/insert идут в L0, compaction merges в Ln. Дает streaming-frequency upserts (минут intervals) на batch-формате против Iceberg/Delta.
Атомарная версия Paimon table — соответствует одному commit. Содержит manifest list (ссылки на manifest файлы, которые ссылаются на data files). Каждый writer создаёт новый snapshot, reader видит consistent view. Snapshots expire по политике (max-retained-numbers, max-retained-time). Live snapshots — те, на которые есть activeбранч/tag.
Paimon tag — named immutable snapshot, не подлежащий expiration. Используется для archive (раз в день snapshot тегается "daily-2026-05-19" и доступен годами для time-travel queries). Tags также используются для cross-cluster replication (DR) — snapshot на active кластере тегается, отправляется через Fluss/MirrorMaker.
Paimon branch — копия snapshot для concurrent writes без конфликтов (testing, A/B writes). Например, branch "experiment" пишется в течение недели, не влияя на main; в конце мерджится через MERGE INTO. Реализован через snapshot pointer на отдельный manifest tree. Аналог git branch для lakehouse.
Streaming storage, спроектированный для Flink (apache.org incubator с 2024). Combines Kafka-like real-time read/write с lakehouse-friendly columnar storage и primary-key индексами. Поддерживает: column projection на сервере, predicate pushdown, partial updates, materialized tables. Конкурирует с Kafka+Iceberg-стеком как unified streaming + storage.
Flink SQL объект (FLIP-435, Flink 1.19+) — таблица, у которой данные автоматически поддерживаются Flink streaming джобой из CONTINUOUSLY запроса. CREATE MATERIALIZED TABLE m FRESHNESS = INTERVAL '1' MINUTE AS SELECT ... FROM src GROUP BY ... — Flink спавнит background джоб, обновляющий m. Аналог DBT incremental на streaming-движке.
Non-deterministic Finite Automaton — внутренняя модель Flink CEP для evaluation patterns. Каждый event может trigger transition в N partial-match states (например, pattern A followed by A* followed by B на event A создаст множество branched states). Storage стоимость растёт с pattern complexity и event rate — главный source of state bloat в CEP.
Java/Scala API для CEP в Flink: Pattern.begin("a").where(a -> ...).followedBy("b").where(b -> ...).within(Time.minutes(5)). Компилируется в NFA. Поддерживает: contiguity modes (strict/relaxed/non-deterministic), quantifiers (oneOrMore, times), within/until, groups. Альтернатива SQL MATCH_RECOGNIZE.
SQL syntax (ANSI SQL:2016) для row-pattern recognition в Flink SQL. SELECT * FROM events MATCH_RECOGNIZE (PARTITION BY user ORDER BY ts MEASURES ... PATTERN (A B+ C) DEFINE A AS ..., B AS ..., C AS ...). Транслируется в тот же NFA backend что Pattern API. Декларативнее, но менее гибкий чем Java API.
Pattern API constraint .within(Time.minutes(5)) — pattern должен полностью completeться за указанное время от первого matched event. Если истек — pattern abort'ится, partial match выкидывается. Критично для bounded CEP state — без within state растёт unbounded при greedy patterns (A+).
Pattern API contiguity: pattern.followedBy("b") — событие b может прийти после a с любыми не-matched events между ними (skip-till-next-match). Strict version — .next("b") — только immediately next event. NonDeterministic — .followedByAny("b") — все возможные комбинации (взрыв state). Default — relaxed.
Flink SQL DDL (FLIP-437, Flink 2.1+) для регистрации AI model в catalog: CREATE MODEL my_model INPUT (text STRING) OUTPUT (embedding ARRAY<FLOAT>) WITH ('provider'='openai', 'task'='embedding', 'api-key'=...). После регистрации model доступна через ML_PREDICT, ML_EVALUATE table functions.
Table-Valued Function для batch/stream inference по registered model: SELECT * FROM ML_PREDICT(TABLE inputs, MODEL my_model, DESCRIPTOR(text)). Под капотом batches input rows и async-вызывает model endpoint (OpenAI, AWS Bedrock, custom). Поддерживает retry, timeout, max concurrency через config. Главный риск в проде — external endpoint latency блокирует pipeline.
Flink SQL TVF для approximate nearest-neighbor search в external vector DB (Milvus, Qdrant, Pinecone). SELECT * FROM VECTOR_SEARCH(TABLE queries, vector_table, DESCRIPTOR(query_vec), 10). Поддерживает HNSW индексы; filter pushdown (по metadata) часто не работает — full scan + post-filter становится bottleneck.
Java API для programmatic регистрации AI model в Flink catalog (без SQL DDL). TableEnvironment.createModel("name", ModelDescriptor.forProvider("openai").schema(...).option(...).build()). Используется при building reusable libs или когда нужно dynamically регистрировать model по runtime config.
Flink тип serializer для классов с public no-arg constructor + public getters/setters (или public fields). Faster чем Kryo, schema-aware (поддерживает evolution в savepoint через PojoSerializerSnapshot). Если класс не PojoFriendly, Flink silently fallback на Kryo (slow + не state-evolution-friendly). Detection: pipeline.generic-types=false fail-fast.
Schema-evolution-friendly row-based binary format. Flink TypeInfo.of(SpecificRecord.class) — генерирует AvroSerializer, который при mismatch schema авто-resolve'ит с help писаной schema fingerprint. Recommended формат для long-running stateful Flink jobs где state schema эволюционирует (added/removed fields).
Schema-driven binary format от Google. Flink Protobuf SerializerFactory (через flink-protobuf) использует ProtobufSerializer с встроенным schema evolution. Менее богатые evolution rules чем Avro (нельзя rename field, только add optional / remove deprecated). Common в Kafka-based pipelines где proto-format dominates.
Generic Java serializer, fallback для типов которые не подходят под POJO/Tuple/Avro. Медленный, не schema-aware (state не migrate-able между app versions при изменении класса). В production должен быть disabled через pipeline.generic-types=false — иначе скрытый Kryo fallback испортит upgrade.
Flink native tuples (Tuple1...Tuple25) — наиболее эффективный serializer, field-positional. Используется в внутренних operators (KeyedStream хранит ключ как Tuple). Для user code лучше POJO (читаемее) — performance overhead над Tuple минимален.
PyFlink user-defined function (UDF, UDAF, UDTF) выполняется в отдельном Python-процессе (Beam fn worker), не в JVM. Каждый TM спавнит N Python-процессов (python.fn-execution.bundle.size, .bundle.time). Java и Python обмениваются данными через Beam shared memory или TCP. Это overhead vs pure Java джоб.
Framework Apache Beam для cross-language UDF execution. Flink PyFlink использует Beam fn-api: Java side (Flink runtime) общается с Python side (user UDF) через стандартизированный protobuf protocol. Это даёт PyFlink reuse Beam Python SDK код, но привносит overhead serializaition + RPC между JVM и Python.
PyFlink использует Cython для critical-path Python модулей (event serialization, type conversion) — это AOT-compile в C. Снижает Python-Java boundary overhead в ~3-5x по сравнению с pure Python. Включён по дефолту в PyPI-wheel pyflink; building from source требует Cython >= 0.29.
Java Flight Recorder — встроенный JDK profiler (с Java 11 open-source). Записывает sampled events (CPU, allocations, GC, locks, IO) с очень низким overhead (~1%). В Flink: env.java.opts=-XX:StartFlightRecording=duration=60s,filename=/flink/jfr/run.jfr. Анализируется в JDK Mission Control. Default для prod profiling Flink над async-profiler.
Low-overhead sampling profiler от Andrey Pangin (open-source). Использует perf_events на Linux для CPU, malloc hooks для allocations. Strength: точные native+Java stack traces (включая JNI в RocksDB). Output: flame graph (SVG). Подключается через -agentpath:libasyncProfiler.so или java -jar profiler.jar attach <pid>.
Visualization invented Brendan Gregg для profile output: x-axis — function calls aggregated by stack, y-axis — call depth, ширина пропорциональна samples (time spent). Wide bars в top — hot functions. Critical tool для finding bottlenecks в Flink (например, Kryo serialization wide -> POJO migration justifies).
Подбор JVM GC + параметров для минимизации pause time. Flink-defaults: G1 (Java 11+) — хороший для streaming. Для очень больших heap (>50GB) — ZGC или Shenandoah (sub-millisecond pause). Метрики: -Xlog:gc*,gc+heap=info, JMX GC times. Для Flink цель — < 100ms pause (длиннее -> checkpoint timeout + watermark stall).
Kafka cross-cluster replication tool (на основе Kafka Connect). Реплицирует topics, consumer offsets, ACLs. Для Flink multi-region: source-кластер -> MM2 -> DR-кластер; Flink failover читает из DR. Limitations: replication лаг -> potential data loss; topic renaming (cluster_alias.topic_name) ломает Flink kafka-source если не предусмотреть.
Confluent Cloud / Confluent Platform feature — Kafka brokers напрямую replicate данные между кластерами на network-level (без Connect-кластера как у MM2). Лучше latency и throughput, простая операционка. Open-source альтернативы — нет (Apache Kafka не имеет cluster linking). Для Flink DR — означает один источник truth с failover на secondary.
Recovery Point Objective — максимально допустимая потеря данных в случае failure. Для Flink: RPO = max(checkpoint_interval, kafka_replication_lag). Если checkpoint interval = 1 минута и kafka mm2 lag = 30s -> RPO = 1.5 минуты. Снижение RPO -> чаще checkpoints + sync replication, ценой throughput.
Recovery Time Objective — максимально допустимое время восстановления после disaster. Для Flink: RTO = время с failure до полного processing на DR-кластере = (detect failure) + (start jobs from savepoint) + (restore state) + (catch up backlog). Disaggregated state (ForstDB) драматически снижает RTO — не надо downloading TB state на TM.