Learning Platform
Глоссарий Troubleshooting
Урок 10.02 · 28 мин
Продвинутый
Flink SQL PipelineExecNodeTransformationStreamGraphJobGraphPlanner

Полный SQL pipeline в Flink

В предыдущем уроке мы разобрали Calcite как универсальный SQL frontend и оптимизатор. Но Calcite заканчивается на оптимизированном RelNode tree. От этого момента до реального выполнения на TaskManager расстояние огромное: нужно сгенерировать Java-код для операторов, собрать ExecNodeGraph, превратить его в Transformation, потом в StreamGraph, потом в JobGraph, потом передать JobManager, который рассует ExecutionGraph и распределит задачи по TaskManager.

Этот урок — полный обход того pipeline. Каждый этап ответственен за свой уровень абстракции, и каждый этап оставляет следы в логах, метриках, EXPLAIN-выводе. Понимание pipeline критично, когда нужно объяснить, почему SQL-запрос превратился в N задач, M chains, K shuffle границ.

SQL SELECT и агрегации в Flink

Высокоуровневый pipeline

SQL string
  |
  v
[1] SqlParser (Calcite)              -> SqlNode AST
  |
  v
[2] SqlValidator (Calcite + Flink)   -> Validated SqlNode + RelDataTypes
  |
  v
[3] SqlToRelConverter (Calcite)      -> Logical RelNode tree
  |
  v
[4] OptimizerPhases (Flink + Calcite) -> Optimized Physical RelNode tree
       phases: subquery rewrite,
               correlate elimination,
               logical opt,
               physical conversion,
               physical opt
  |
  v
[5] ExecNodeGraphGenerator (Flink)   -> ExecNodeGraph
  |
  v
[6] ExecNode.translateToPlan (Flink) -> Transformation tree
  |
  v
[7] StreamGraphGenerator (Flink)     -> StreamGraph
  |
  v
[8] StreamGraph.getJobGraph (Flink)  -> JobGraph
  |
  v
[9] JobManager submission            -> ExecutionGraph
  |
  v
[10] Slot allocation + deployment    -> Tasks running on TaskManagers

Этапы 1–4 — это compile time (логика планирования и оптимизации). Этапы 5–8 — build time (формирование структуры джоба, ещё на клиентской JVM). 9–10 — runtime (выполнение). Граница между compile и build — переход от RelNode к ExecNode.

Этап 1: Parsing

Entry point: tableEnv.executeSql(...) или tableEnv.sqlQuery(...). Реализация в TableEnvironmentImpl.

String sql = "SELECT user, COUNT(*) FROM clicks GROUP BY user";

tableEnv.sqlQuery(sql)
  -> PlannerBase.parse(sql)
    -> CalciteParser.parse(sql)
      -> SqlParser.create(sql, config).parseStmt()  // Calcite
        -> возвращает SqlNode (например, SqlSelect)

Парсер — JavaCC-generated класс из Parser.jj. Парсер не знает про каталог, типы, текущий контекст. Он только превращает текст в дерево SqlNode. На этом этапе можно поймать только синтаксические ошибки (например, забытая запятая, пропущенное FROM).

Время этапа: 1–10 мс для типичного запроса, до 100 мс для очень больших SQL (тысячи столбцов).

Этап 2: Validation

PlannerBase.translate(operations)
  -> Operation создаётся через SqlToOperationConverter из SqlNode
    -> Перед конвертацией: SqlValidator.validate(sqlNode)
      -> resolveTables через CalciteCatalogReader
        -> FlinkCalciteCatalogReader.getTable(qualifiedName)
          -> CatalogManager.getTable
            -> конкретный Catalog (Hive, Iceberg, In-Memory)
              -> CatalogBaseTable -> ResolvedSchema
                -> Calcite RelDataType
      -> deriveTypes через FlinkTypeFactory

После валидации SqlNode обогащён информацией о типах каждого выражения. На этом этапе возможны ошибки:

  • Object 'foo' not found within 'cat.db' — таблицы нет.
  • Column 'bar' not found in any table — столбца нет.
  • Cannot apply '=' to arguments of type '<INT> = <CHAR(3)>' — несовместимые типы.
  • Aggregate expression is illegal in WHERE clause — семантические нарушения.

Время этапа: 5–50 мс. Latency сильно зависит от каталога — например, HiveCatalog ходит в Hive Metastore по сети (10–100 мс на каждый getTable).

Этап 3: SqlToRelConverter

SqlToOperationConverter.convert(validatedSqlNode)
  -> для DML/Query:
    -> Calcite SqlToRelConverter.convertQuery
      -> обход SqlNode сверху вниз
        -> emit LogicalProject, LogicalFilter, LogicalAggregate, ...
      -> возвращает RelRoot, содержащий Logical RelNode tree
  -> для DDL (CREATE TABLE, USE CATALOG):
    -> прямой Operation без RelNode (DDL не нуждается в оптимизации)

На выходе — дерево RelNode с конвенцией Convention.NONE (logical nodes). У них есть RelDataType (схема выхода), но нет физических traits (collation, distribution).

Время этапа: 1–20 мс. Подзапросы и коррелированные ссылки обрабатываются здесь же — генерируются LogicalCorrelate, Subquery RelNodes.

Этап 4: Optimization — главный этап

Это самый сложный и долгий этап. В Flink оптимизатор разбит на фазы (FlinkRelOptCluster, StreamCommonSubGraphBasedOptimizer):

Стандартный набор фаз для streaming (упрощённо):

Phase 1: Pre-Optimization (HepPlanner)
  - Subquery rewriting
  - Predicate normalization
  - Time indicator conversion

Phase 2: Decorrelation (HepPlanner)
  - Replace LogicalCorrelate with joins
  - Push correlated predicates

Phase 3: Logical optimization (VolcanoPlanner)
  - Filter pushdown
  - Projection pushdown
  - Join reordering (cost-based)
  - Constant folding
  - Sub-plan reuse (CSE)

Phase 4: Time materialization (HepPlanner)
  - Materialize time attributes
  - Pick rowtime vs proctime semantics

Phase 5: Physical conversion (VolcanoPlanner)
  - Convention.NONE -> STREAM_PHYSICAL (или BATCH_PHYSICAL)
  - Replace LogicalJoin with StreamPhysicalJoin или BatchPhysicalHashJoin
  - Insert StreamPhysicalExchange (shuffle markers)
  - Choose aggregate strategy

Phase 6: Physical rewrites (HepPlanner)
  - MiniBatch enable
  - TwoPhaseAgg (local + global aggregate split)
  - Deduplicate optimization
  - DeltaJoin conversion (Flink 2.1+)
  - MultiJoin merging (Flink 2.2+)

Phase 7: Post-optimization (HepPlanner)
  - Watermark assigner placement
  - Sink materialization (ChangelogNormalize insertion)
  - Final cleanup

Каждая фаза получает на вход RelNode tree и выдаёт новый. Фазы независимы по контракту — после каждой получается валидный план. Это позволяет частично исполнять только нужные фазы (для EXPLAIN PLAN можно остановиться на любом этапе).

Время этапа: 50–500 мс для нормального запроса, секунды или десятки секунд для очень сложных (10+ joins, deep subqueries).

WARNING

Самый частый источник долгой компиляции — Phase 5 (physical conversion) с большим числом joins. VolcanoPlanner рассматривает разные порядки и алгоритмы соединения. Для join reordering пространство поиска факториальное. Если запрос компилируется минуты — попробуйте table.optimizer.join-reorder-enabled = false или JOIN hints (/*+ USE_HASH(t1, t2) */).

Этап 5: ExecNodeGraph

Оптимизированный RelNode tree ещё не описывает конкретный исполнитель. Нужно перейти к ExecNode — это узел “Flink execution plan”, сериализуемый и предназначенный для исполнения.

ExecNodeGenerator.generate(physicalRelTree)
  -> для каждого StreamPhysicalRel создать StreamExecNode
    -> StreamPhysicalCalc -> StreamExecCalc
    -> StreamPhysicalGroupAggregate -> StreamExecGroupAggregate
    -> StreamPhysicalJoin -> StreamExecJoin
    -> StreamPhysicalExchange -> StreamExecExchange
    ...
  -> Связать ExecNode по входам/выходам
  -> Получить ExecNodeGraph

Зачем отдельный слой ExecNode? Несколько причин:

  1. Сериализация для compiled plans (JsonPlan). С Flink 1.15 можно сохранить план как JSON и переиспользовать. RelNode напрямую не сериализуется, ExecNode — да.
  2. Совместимость. ExecNode стабильнее RelNode. План, сохранённый в Flink 1.16, должен работать в 1.17. RelNode внутри может поменяться, ExecNode имеет аннотации @ExecNodeMetadata(version = N) для бэкап-совместимости.
  3. Multi-input операторы. ExecNode может иметь N входов, RelNode — обычно 1–2. Это нужно для DeltaJoin, MultiJoin и других оптимизаций.
  4. Codegen seam. ExecNode.translateToPlan генерирует Transformation и при этом запускает codegen для операторов.

Этап 6: Transformation

ExecNode.translateToPlan(plannerContext) — это вызов, который генерирует Transformation — низкоуровневое описание оператора Flink DataStream API.

ExecNode (Flink table-planner)
  -> StreamExecCalc.translateToPlan
    -> codegen ProcessFunction через CodeGenUtils
      -> возвращает GeneratedFunction (Java исходник + class file)
    -> создать OneInputTransformation
      -> с operatorFactory оборачивающим GeneratedFunction
      -> с указанием parallelism, slotSharingGroup, name
  -> Transformation становится узлом в Transformation DAG

Transformation — это уже DataStream-уровень. То же самое, что вы получите из обычного Java DataStream API (.map(...), .process(...)). Это унификация: после ExecNode.translateToPlan мы попадаем в общий с DataStream API мир.

Codegen — отдельная большая тема. Кратко: Flink не интерпретирует RelNode/ExecNode, а генерирует Java-код через Janino (in-memory compiler) и компилирует на лету. Один RelNode превращается в один или несколько классов оператора. Это критично для performance — нет per-record reflection или virtual calls.

Этап 7: StreamGraph

Transformation tree (на самом деле DAG) превращается в StreamGraph через StreamGraphGenerator.generate(transformations). StreamGraph — это уже структура с узлами StreamNode и рёбрами StreamEdge:

StreamGraphGenerator:
  -> Обходим Transformation DAG
    -> Для каждого Transformation создаём StreamNode
      -> StreamNode содержит OperatorFactory, type info, parallelism
    -> Для каждого input создаём StreamEdge
      -> StreamEdge содержит partitioner (forward, hash, rebalance...)
    -> Назначаем resource specs (managed memory, network memory)
    -> Назначаем slot sharing groups
  -> Возвращаем StreamGraph

StreamGraph — последняя структура, которую видит клиентская JVM перед отправкой на JobManager.

Этап 8: JobGraph

StreamGraph.getJobGraph(jobId)
  -> Operator chaining: соседние StreamNode объединяются в JobVertex,
     если совпадают partitioner=FORWARD, parallelism, slot sharing
    -> Это оптимизация: chain'ы выполняются в одном thread, без сетевой передачи
  -> Назначение IntermediateDataSet (рёбра между JobVertex)
  -> Сериализация ResourceSpec
  -> Compute JobGraphPayload (для checkpoint config, blob references)
  -> Создание JobGraph (immutable)

JobGraph — это финальная структура, которая отправляется в JobManager. Это client-side artifact, который JM использует для построения ExecutionGraph.

Полный SQL pipeline — от строки до JobGraph
SQL TextТекст SQL запроса. Только клиентская JVM. Никаких обращений к каталогу, JobManager, TaskManagers.
[1] Parse
SqlNodeАбстрактное синтаксическое дерево после JavaCC-парсера. Только синтаксис, ничего про схему. 1-10 мс.
[2] Validate
Validated SqlNodeSqlValidator резолвит таблицы через CatalogManager Flink (Hive/InMemory/Iceberg). Выводит типы. 5-50 мс.
Logical RelNodeSqlToRelConverter превращает синтаксическое дерево в реляционную алгебру. Logical-узлы: Project/Filter/Aggregate/Join. 1-20 мс.
[4] Optimize
Optimized Physical RelNodeГлавный этап. 7 фаз оптимизации: subquery rewrite, decorrelation, logical opt, time materialization, physical conversion, physical rewrites, post-opt. 50-500 мс.
ExecNodeGraphExecNode — сериализуемый план Flink. Стабильный API для compiled plans (JsonPlan). Поддерживает multi-input операторы (DeltaJoin, MultiJoin).
[6] translateToPlan + codegen
Transformation DAGCodegen Janino генерирует Java-классы для операторов. Создаются OneInputTransformation / TwoInputTransformation. Уровень DataStream API.
StreamGraphStreamNode + StreamEdge + partitioner + resource spec. Граница между Table API и DataStream API исчезла.
[8] chaining
JobGraphOperator chaining объединяет соседние StreamNode в JobVertex (один thread, no network). Финальный артефакт клиентской JVM.
[9] submit
ExecutionGraph (JM)JobManager строит ExecutionGraph, заказывает слоты у ResourceManager, деплоит задачи на TaskManager. Здесь начинается runtime.

Где какие данные доступны для дебага

Тип данных          Этап          Команда / способ получить

SqlNode             [1]           Внутренний API, не виден извне без debug
Logical RelNode     [3]           EXPLAIN PLAN FOR <query>
Optimized RelNode   [4]           EXPLAIN PLAN FOR <query>; смотреть == Optimized Physical Plan ==
ExecNodeGraph       [5]           EXPLAIN PLAN_ADVICE FOR <query>
                                  COMPILE PLAN '/tmp/plan.json' FOR <query>  -- JsonPlan
Transformation tree [6]           DataStream API: env.getStreamGraph() (после translate)
StreamGraph         [7]           env.getStreamGraph().getStreamingPlanAsJSON()
JobGraph            [8]           Логи клиента: "Submitting job ... with JobGraph ..."
ExecutionGraph      [9]           Flink Web UI / REST API /jobs/<jobId>
Running tasks       [10]          Web UI tasks tab, TaskManager logs

Команда EXPLAIN PLAN_ADVICE FOR ... (с Flink 1.18+) добавляет рекомендации: “Two-phase aggregation could improve throughput”, “Reduce state size with TTL”.

Как менять план через hints

Hints — это способ, которым SQL-автор намекает оптимизатору на предпочтения. Calcite поддерживает hints в формате /*+ HINT_NAME(args) */.

SELECT /*+ USE_HASH(orders, lineitem) */
       o.id, sum(l.amount)
FROM orders o JOIN lineitem l ON o.id = l.order_id
GROUP BY o.id;

Flink-специфичные hints:

JOIN strategy hints (batch):
  /*+ USE_HASH(t1, t2) */          -- HashJoin
  /*+ USE_SORTMERGE(t1, t2) */     -- SortMergeJoin
  /*+ USE_NL(t1, t2) */            -- NestedLoopJoin
  /*+ BROADCAST(t1) */              -- broadcast меньшую таблицу

State TTL hints (streaming):
  /*+ STATE_TTL('orders' = '12h', 'payments' = '24h') */

Lookup join hints:
  /*+ LOOKUP('table'='dim', 'async'='true', 'output-mode'='ordered') */
  /*+ LOOKUP('table'='dim', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay') */

Options hints (override connector options on the fly):
  /*+ OPTIONS('scan.startup.mode'='earliest-offset') */

Hints проходят через парсер как метаданные на SqlBasicCall и потом доходят до оптимизатора через RelHint объекты на RelNode.

Что происходит на JobManager после submission

После шага 9 — это уже не SQL pipeline, а runtime:

JobManager receives JobGraph
  -> ExecutionGraphBuilder.build(jobGraph)
    -> Для каждого JobVertex создать ExecutionJobVertex
      -> с parallelism копиями ExecutionVertex (по одной на subtask)
      -> ExecutionVertex имеет Execution (текущая попытка) и история
  -> SlotPool.requestSlots
    -> ResourceManager.requestNewWorker (если нет свободных)
      -> TaskExecutor создаётся
  -> Когда слоты получены: ExecutionVertex.deploy
    -> TaskDeploymentDescriptor сериализуется, отправляется на TaskManager
  -> TaskManager.submitTask(tdd)
    -> Task создаётся, классы загружаются через UserCodeClassLoader
    -> Operator chain instantiates через StreamTask, StreamOperator
    -> StreamTask.invoke() запускает run loop

С этого момента runtime обработка идёт независимо от SQL planner. Любая ошибка в плане превратится в NullPointerException или type cast error в сгенерированном коде, и стек-трейс будет указывать на сгенерированный класс StreamExecCalc$1234 — это нормально, оператор был создан codegen.

Compiled plans (JsonPlan) и production

Flink 1.15+ поддерживает сохранение плана как JSON через COMPILE PLAN '/path/file.json' FOR <sql>. Это критически важно для production:

Сценарий: SQL job, который работает в production месяц
  Проблема: после апгрейда Flink 1.20 -> 2.0 план может измениться
            (новые правила, новая cost model). Это значит:
            - Другой parallelism
            - Другие операторы в state (incompatible state schema)
            - Performance regression

Решение: COMPILE PLAN сохраняет ExecNodeGraph как JSON
  -> При деплое EXECUTE PLAN загружает JSON, не пере-планирует
  -> ExecNodeMetadata version гарантирует совместимость
  -> State schema стабильна между апгрейдами

Пример workflow:
  1. Compile: tEnv.compilePlanSql(sql).writeToFile("/plans/job.json")
  2. Commit JSON в git
  3. Deploy: tEnv.executePlan(PlanReference.fromFile("/plans/job.json"))
  4. После апгрейда Flink JSON работает как был, savepoint compatible

Без compiled plans любой production SQL job — это бомба замедленного действия: следующий апгрейд может его сломать.

Чтение source

flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/
  TableEnvironmentImpl.java -- entry point executeSql, sqlQuery

flink-table-planner/src/main/scala/org/apache/flink/table/planner/
  delegation/PlannerBase.scala -- translate(operations)
  delegation/StreamPlanner.scala -- streaming-specific optimizer phases
  delegation/BatchPlanner.scala  -- batch-specific
  plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
  plan/optimize/program/StreamOptimizeContext.scala
  plan/optimize/program/FlinkOptimizeProgram.scala -- phase orchestration
  plan/nodes/exec/ExecNode.java
  plan/nodes/exec/stream/StreamExecCalc.java -- pattern для всех StreamExecXxx
  codegen/CodeGenUtils.scala -- Janino codegen wrappers

flink-runtime/src/main/java/org/apache/flink/runtime/
  jobgraph/JobGraph.java
  executiongraph/ExecutionGraph.java
  scheduler/SchedulerNG.java
TIP

Чтобы увидеть весь pipeline на работающем запросе: добавьте -Dlog4j.logger.org.apache.flink.table.planner=DEBUG в логи. Будут видны фазы оптимизатора и их результаты. Очень шумно, но информативно для расследования.

Зачем эти 10 этапов в продакшене

Сложность pipeline оправдана разделением слоёв:

  1. Compile vs build vs runtime — каждый слой можно оптимизировать независимо. Compile time меняется новыми правилами оптимизатора, runtime — новой реализацией оператора.
  2. Несколько frontend-ов — кроме SQL Flink поддерживает Table API и DataStream API. Все три приходят к одному Transformation DAG.
  3. Несколько backend-ов — streaming, batch, и в будущем native execution через Velox или similar. ExecNode абстракция позволяет менять executor.
  4. Compiled plans — без чёткой границы ExecNode не было бы возможности сериализовать план.
  5. Hot-swap оптимизаций — DeltaJoin, MultiJoin, TwoPhaseAgg добавлены в Phase 6 без изменения других этапов.
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Зачем Flink ввёл слой ExecNode отдельно от RelNode?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5