Полный SQL pipeline в Flink
В предыдущем уроке мы разобрали Calcite как универсальный SQL frontend и оптимизатор. Но Calcite заканчивается на оптимизированном RelNode tree. От этого момента до реального выполнения на TaskManager расстояние огромное: нужно сгенерировать Java-код для операторов, собрать ExecNodeGraph, превратить его в Transformation, потом в StreamGraph, потом в JobGraph, потом передать JobManager, который рассует ExecutionGraph и распределит задачи по TaskManager.
Этот урок — полный обход того pipeline. Каждый этап ответственен за свой уровень абстракции, и каждый этап оставляет следы в логах, метриках, EXPLAIN-выводе. Понимание pipeline критично, когда нужно объяснить, почему SQL-запрос превратился в N задач, M chains, K shuffle границ.
SQL SELECT и агрегации в FlinkВысокоуровневый pipeline
SQL string
|
v
[1] SqlParser (Calcite) -> SqlNode AST
|
v
[2] SqlValidator (Calcite + Flink) -> Validated SqlNode + RelDataTypes
|
v
[3] SqlToRelConverter (Calcite) -> Logical RelNode tree
|
v
[4] OptimizerPhases (Flink + Calcite) -> Optimized Physical RelNode tree
phases: subquery rewrite,
correlate elimination,
logical opt,
physical conversion,
physical opt
|
v
[5] ExecNodeGraphGenerator (Flink) -> ExecNodeGraph
|
v
[6] ExecNode.translateToPlan (Flink) -> Transformation tree
|
v
[7] StreamGraphGenerator (Flink) -> StreamGraph
|
v
[8] StreamGraph.getJobGraph (Flink) -> JobGraph
|
v
[9] JobManager submission -> ExecutionGraph
|
v
[10] Slot allocation + deployment -> Tasks running on TaskManagers
Этапы 1–4 — это compile time (логика планирования и оптимизации). Этапы 5–8 — build time (формирование структуры джоба, ещё на клиентской JVM). 9–10 — runtime (выполнение). Граница между compile и build — переход от RelNode к ExecNode.
Этап 1: Parsing
Entry point: tableEnv.executeSql(...) или tableEnv.sqlQuery(...). Реализация в TableEnvironmentImpl.
String sql = "SELECT user, COUNT(*) FROM clicks GROUP BY user";
tableEnv.sqlQuery(sql)
-> PlannerBase.parse(sql)
-> CalciteParser.parse(sql)
-> SqlParser.create(sql, config).parseStmt() // Calcite
-> возвращает SqlNode (например, SqlSelect)
Парсер — JavaCC-generated класс из Parser.jj. Парсер не знает про каталог, типы, текущий контекст. Он только превращает текст в дерево SqlNode. На этом этапе можно поймать только синтаксические ошибки (например, забытая запятая, пропущенное FROM).
Время этапа: 1–10 мс для типичного запроса, до 100 мс для очень больших SQL (тысячи столбцов).
Этап 2: Validation
PlannerBase.translate(operations)
-> Operation создаётся через SqlToOperationConverter из SqlNode
-> Перед конвертацией: SqlValidator.validate(sqlNode)
-> resolveTables через CalciteCatalogReader
-> FlinkCalciteCatalogReader.getTable(qualifiedName)
-> CatalogManager.getTable
-> конкретный Catalog (Hive, Iceberg, In-Memory)
-> CatalogBaseTable -> ResolvedSchema
-> Calcite RelDataType
-> deriveTypes через FlinkTypeFactory
После валидации SqlNode обогащён информацией о типах каждого выражения. На этом этапе возможны ошибки:
Object 'foo' not found within 'cat.db'— таблицы нет.Column 'bar' not found in any table— столбца нет.Cannot apply '=' to arguments of type '<INT> = <CHAR(3)>'— несовместимые типы.Aggregate expression is illegal in WHERE clause— семантические нарушения.
Время этапа: 5–50 мс. Latency сильно зависит от каталога — например, HiveCatalog ходит в Hive Metastore по сети (10–100 мс на каждый getTable).
Этап 3: SqlToRelConverter
SqlToOperationConverter.convert(validatedSqlNode)
-> для DML/Query:
-> Calcite SqlToRelConverter.convertQuery
-> обход SqlNode сверху вниз
-> emit LogicalProject, LogicalFilter, LogicalAggregate, ...
-> возвращает RelRoot, содержащий Logical RelNode tree
-> для DDL (CREATE TABLE, USE CATALOG):
-> прямой Operation без RelNode (DDL не нуждается в оптимизации)
На выходе — дерево RelNode с конвенцией Convention.NONE (logical nodes). У них есть RelDataType (схема выхода), но нет физических traits (collation, distribution).
Время этапа: 1–20 мс. Подзапросы и коррелированные ссылки обрабатываются здесь же — генерируются LogicalCorrelate, Subquery RelNodes.
Этап 4: Optimization — главный этап
Это самый сложный и долгий этап. В Flink оптимизатор разбит на фазы (FlinkRelOptCluster, StreamCommonSubGraphBasedOptimizer):
Стандартный набор фаз для streaming (упрощённо):
Phase 1: Pre-Optimization (HepPlanner)
- Subquery rewriting
- Predicate normalization
- Time indicator conversion
Phase 2: Decorrelation (HepPlanner)
- Replace LogicalCorrelate with joins
- Push correlated predicates
Phase 3: Logical optimization (VolcanoPlanner)
- Filter pushdown
- Projection pushdown
- Join reordering (cost-based)
- Constant folding
- Sub-plan reuse (CSE)
Phase 4: Time materialization (HepPlanner)
- Materialize time attributes
- Pick rowtime vs proctime semantics
Phase 5: Physical conversion (VolcanoPlanner)
- Convention.NONE -> STREAM_PHYSICAL (или BATCH_PHYSICAL)
- Replace LogicalJoin with StreamPhysicalJoin или BatchPhysicalHashJoin
- Insert StreamPhysicalExchange (shuffle markers)
- Choose aggregate strategy
Phase 6: Physical rewrites (HepPlanner)
- MiniBatch enable
- TwoPhaseAgg (local + global aggregate split)
- Deduplicate optimization
- DeltaJoin conversion (Flink 2.1+)
- MultiJoin merging (Flink 2.2+)
Phase 7: Post-optimization (HepPlanner)
- Watermark assigner placement
- Sink materialization (ChangelogNormalize insertion)
- Final cleanup
Каждая фаза получает на вход RelNode tree и выдаёт новый. Фазы независимы по контракту — после каждой получается валидный план. Это позволяет частично исполнять только нужные фазы (для EXPLAIN PLAN можно остановиться на любом этапе).
Время этапа: 50–500 мс для нормального запроса, секунды или десятки секунд для очень сложных (10+ joins, deep subqueries).
Самый частый источник долгой компиляции — Phase 5 (physical conversion) с большим числом joins. VolcanoPlanner рассматривает разные порядки и алгоритмы соединения. Для join reordering пространство поиска факториальное. Если запрос компилируется минуты — попробуйте table.optimizer.join-reorder-enabled = false или JOIN hints (/*+ USE_HASH(t1, t2) */).
Этап 5: ExecNodeGraph
Оптимизированный RelNode tree ещё не описывает конкретный исполнитель. Нужно перейти к ExecNode — это узел “Flink execution plan”, сериализуемый и предназначенный для исполнения.
ExecNodeGenerator.generate(physicalRelTree)
-> для каждого StreamPhysicalRel создать StreamExecNode
-> StreamPhysicalCalc -> StreamExecCalc
-> StreamPhysicalGroupAggregate -> StreamExecGroupAggregate
-> StreamPhysicalJoin -> StreamExecJoin
-> StreamPhysicalExchange -> StreamExecExchange
...
-> Связать ExecNode по входам/выходам
-> Получить ExecNodeGraph
Зачем отдельный слой ExecNode? Несколько причин:
- Сериализация для compiled plans (JsonPlan). С Flink 1.15 можно сохранить план как JSON и переиспользовать. RelNode напрямую не сериализуется, ExecNode — да.
- Совместимость. ExecNode стабильнее RelNode. План, сохранённый в Flink 1.16, должен работать в 1.17. RelNode внутри может поменяться, ExecNode имеет аннотации
@ExecNodeMetadata(version = N)для бэкап-совместимости. - Multi-input операторы. ExecNode может иметь N входов, RelNode — обычно 1–2. Это нужно для DeltaJoin, MultiJoin и других оптимизаций.
- Codegen seam. ExecNode.translateToPlan генерирует Transformation и при этом запускает codegen для операторов.
Этап 6: Transformation
ExecNode.translateToPlan(plannerContext) — это вызов, который генерирует Transformation — низкоуровневое описание оператора Flink DataStream API.
ExecNode (Flink table-planner)
-> StreamExecCalc.translateToPlan
-> codegen ProcessFunction через CodeGenUtils
-> возвращает GeneratedFunction (Java исходник + class file)
-> создать OneInputTransformation
-> с operatorFactory оборачивающим GeneratedFunction
-> с указанием parallelism, slotSharingGroup, name
-> Transformation становится узлом в Transformation DAG
Transformation — это уже DataStream-уровень. То же самое, что вы получите из обычного Java DataStream API (.map(...), .process(...)). Это унификация: после ExecNode.translateToPlan мы попадаем в общий с DataStream API мир.
Codegen — отдельная большая тема. Кратко: Flink не интерпретирует RelNode/ExecNode, а генерирует Java-код через Janino (in-memory compiler) и компилирует на лету. Один RelNode превращается в один или несколько классов оператора. Это критично для performance — нет per-record reflection или virtual calls.
Этап 7: StreamGraph
Transformation tree (на самом деле DAG) превращается в StreamGraph через StreamGraphGenerator.generate(transformations). StreamGraph — это уже структура с узлами StreamNode и рёбрами StreamEdge:
StreamGraphGenerator:
-> Обходим Transformation DAG
-> Для каждого Transformation создаём StreamNode
-> StreamNode содержит OperatorFactory, type info, parallelism
-> Для каждого input создаём StreamEdge
-> StreamEdge содержит partitioner (forward, hash, rebalance...)
-> Назначаем resource specs (managed memory, network memory)
-> Назначаем slot sharing groups
-> Возвращаем StreamGraph
StreamGraph — последняя структура, которую видит клиентская JVM перед отправкой на JobManager.
Этап 8: JobGraph
StreamGraph.getJobGraph(jobId)
-> Operator chaining: соседние StreamNode объединяются в JobVertex,
если совпадают partitioner=FORWARD, parallelism, slot sharing
-> Это оптимизация: chain'ы выполняются в одном thread, без сетевой передачи
-> Назначение IntermediateDataSet (рёбра между JobVertex)
-> Сериализация ResourceSpec
-> Compute JobGraphPayload (для checkpoint config, blob references)
-> Создание JobGraph (immutable)
JobGraph — это финальная структура, которая отправляется в JobManager. Это client-side artifact, который JM использует для построения ExecutionGraph.
Где какие данные доступны для дебага
Тип данных Этап Команда / способ получить
SqlNode [1] Внутренний API, не виден извне без debug
Logical RelNode [3] EXPLAIN PLAN FOR <query>
Optimized RelNode [4] EXPLAIN PLAN FOR <query>; смотреть == Optimized Physical Plan ==
ExecNodeGraph [5] EXPLAIN PLAN_ADVICE FOR <query>
COMPILE PLAN '/tmp/plan.json' FOR <query> -- JsonPlan
Transformation tree [6] DataStream API: env.getStreamGraph() (после translate)
StreamGraph [7] env.getStreamGraph().getStreamingPlanAsJSON()
JobGraph [8] Логи клиента: "Submitting job ... with JobGraph ..."
ExecutionGraph [9] Flink Web UI / REST API /jobs/<jobId>
Running tasks [10] Web UI tasks tab, TaskManager logs
Команда EXPLAIN PLAN_ADVICE FOR ... (с Flink 1.18+) добавляет рекомендации: “Two-phase aggregation could improve throughput”, “Reduce state size with TTL”.
Как менять план через hints
Hints — это способ, которым SQL-автор намекает оптимизатору на предпочтения. Calcite поддерживает hints в формате /*+ HINT_NAME(args) */.
SELECT /*+ USE_HASH(orders, lineitem) */
o.id, sum(l.amount)
FROM orders o JOIN lineitem l ON o.id = l.order_id
GROUP BY o.id;
Flink-специфичные hints:
JOIN strategy hints (batch):
/*+ USE_HASH(t1, t2) */ -- HashJoin
/*+ USE_SORTMERGE(t1, t2) */ -- SortMergeJoin
/*+ USE_NL(t1, t2) */ -- NestedLoopJoin
/*+ BROADCAST(t1) */ -- broadcast меньшую таблицу
State TTL hints (streaming):
/*+ STATE_TTL('orders' = '12h', 'payments' = '24h') */
Lookup join hints:
/*+ LOOKUP('table'='dim', 'async'='true', 'output-mode'='ordered') */
/*+ LOOKUP('table'='dim', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay') */
Options hints (override connector options on the fly):
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */
Hints проходят через парсер как метаданные на SqlBasicCall и потом доходят до оптимизатора через RelHint объекты на RelNode.
Что происходит на JobManager после submission
После шага 9 — это уже не SQL pipeline, а runtime:
JobManager receives JobGraph
-> ExecutionGraphBuilder.build(jobGraph)
-> Для каждого JobVertex создать ExecutionJobVertex
-> с parallelism копиями ExecutionVertex (по одной на subtask)
-> ExecutionVertex имеет Execution (текущая попытка) и история
-> SlotPool.requestSlots
-> ResourceManager.requestNewWorker (если нет свободных)
-> TaskExecutor создаётся
-> Когда слоты получены: ExecutionVertex.deploy
-> TaskDeploymentDescriptor сериализуется, отправляется на TaskManager
-> TaskManager.submitTask(tdd)
-> Task создаётся, классы загружаются через UserCodeClassLoader
-> Operator chain instantiates через StreamTask, StreamOperator
-> StreamTask.invoke() запускает run loop
С этого момента runtime обработка идёт независимо от SQL planner. Любая ошибка в плане превратится в NullPointerException или type cast error в сгенерированном коде, и стек-трейс будет указывать на сгенерированный класс StreamExecCalc$1234 — это нормально, оператор был создан codegen.
Compiled plans (JsonPlan) и production
Flink 1.15+ поддерживает сохранение плана как JSON через COMPILE PLAN '/path/file.json' FOR <sql>. Это критически важно для production:
Сценарий: SQL job, который работает в production месяц
Проблема: после апгрейда Flink 1.20 -> 2.0 план может измениться
(новые правила, новая cost model). Это значит:
- Другой parallelism
- Другие операторы в state (incompatible state schema)
- Performance regression
Решение: COMPILE PLAN сохраняет ExecNodeGraph как JSON
-> При деплое EXECUTE PLAN загружает JSON, не пере-планирует
-> ExecNodeMetadata version гарантирует совместимость
-> State schema стабильна между апгрейдами
Пример workflow:
1. Compile: tEnv.compilePlanSql(sql).writeToFile("/plans/job.json")
2. Commit JSON в git
3. Deploy: tEnv.executePlan(PlanReference.fromFile("/plans/job.json"))
4. После апгрейда Flink JSON работает как был, savepoint compatible
Без compiled plans любой production SQL job — это бомба замедленного действия: следующий апгрейд может его сломать.
Чтение source
flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/
TableEnvironmentImpl.java -- entry point executeSql, sqlQuery
flink-table-planner/src/main/scala/org/apache/flink/table/planner/
delegation/PlannerBase.scala -- translate(operations)
delegation/StreamPlanner.scala -- streaming-specific optimizer phases
delegation/BatchPlanner.scala -- batch-specific
plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
plan/optimize/program/StreamOptimizeContext.scala
plan/optimize/program/FlinkOptimizeProgram.scala -- phase orchestration
plan/nodes/exec/ExecNode.java
plan/nodes/exec/stream/StreamExecCalc.java -- pattern для всех StreamExecXxx
codegen/CodeGenUtils.scala -- Janino codegen wrappers
flink-runtime/src/main/java/org/apache/flink/runtime/
jobgraph/JobGraph.java
executiongraph/ExecutionGraph.java
scheduler/SchedulerNG.java
Чтобы увидеть весь pipeline на работающем запросе: добавьте -Dlog4j.logger.org.apache.flink.table.planner=DEBUG в логи. Будут видны фазы оптимизатора и их результаты. Очень шумно, но информативно для расследования.
Зачем эти 10 этапов в продакшене
Сложность pipeline оправдана разделением слоёв:
- Compile vs build vs runtime — каждый слой можно оптимизировать независимо. Compile time меняется новыми правилами оптимизатора, runtime — новой реализацией оператора.
- Несколько frontend-ов — кроме SQL Flink поддерживает Table API и DataStream API. Все три приходят к одному Transformation DAG.
- Несколько backend-ов — streaming, batch, и в будущем native execution через Velox или similar. ExecNode абстракция позволяет менять executor.
- Compiled plans — без чёткой границы ExecNode не было бы возможности сериализовать план.
- Hot-swap оптимизаций — DeltaJoin, MultiJoin, TwoPhaseAgg добавлены в Phase 6 без изменения других этапов.