Learning Platform
Глоссарий Troubleshooting
Урок 10.01 · 25 мин
Продвинутый
Apache CalciteRelational AlgebraSqlNodeRelNodeQuery OptimizationFlink SQL

Apache Calcite — relational algebra IR

Apache Flink не пишет SQL-парсер сам. Все системы, которые научились SQL — Hive, Drill, Storm SQL, Beam SQL, Kafka KSQL до версии 2, Flink SQL, MaxCompute, Samza SQL — берут готовый компонент Apache Calcite. Это не библиотека парсинга в стиле ANTLR, а полноценный optimizer framework: парсер, валидатор, IR на основе relational algebra, оптимизатор с правилами и cost-моделью, codegen в физический план.

В этом уроке мы разбираемся, что такое Calcite, почему он стал индустриальным стандартом для SQL-on-anything систем и какую роль он играет в Flink. Без понимания Calcite вы не сможете читать стек-трейсы Flink SQL ошибок, не отлавите regressions в плане после миграции и не объясните, почему два логически эквивалентных запроса дают разный JobGraph.

Catalyst Optimizer в Spark: тот же Calcite, другой контекст ksqlDB: SQL поверх Kafka Streams

Что такое Apache Calcite

Calcite — это библиотека для построения query processing систем. Apache top-level проект с 2015 года, исторически отделился от Apache Drill. Идея, которая делает Calcite уникальным: разделить SQL frontend и storage backend.

Традиционные СУБД (PostgreSQL, MySQL):
  SQL parser -> planner -> executor -> storage
  Все слои — монолит. SQL понятен только своему storage.

Calcite-based системы (Flink, Hive, Drill):
  SQL parser (Calcite) ->
    RelNode IR (Calcite) ->
      Rules + cost optimizer (Calcite) ->
        Physical plan (специфичный для движка) ->
          Executor (Flink, Spark, Hive...)

  Любой движок может использовать SQL frontend Calcite.
  Calcite сам не выполняет запросы — он строит план.

Ключевые компоненты Calcite:

  1. SQL parser — generated через JavaCC из грамматики Parser.jj. Возвращает абстрактное синтаксическое дерево SqlNode.
  2. Validator — проверяет ссылки на таблицы, типы столбцов, разрешает имена через каталог.
  3. SqlToRelConverter — конвертирует SqlNode в RelNode (relational algebra tree).
  4. HepPlanner — rule-based planner для детерминированных трансформаций.
  5. VolcanoPlanner — cost-based планировщик из работы Goetz Graefe (1995), Volcano/Cascades framework.
  6. RelNode + RelTrait + RelOptRule — extension points для добавления своих операторов и правил.
  7. Codegen + Linq4j — генерация runtime кода (используется в самом Calcite, но Flink игнорирует и делает свой codegen).
NOTE

Flink использует только frontend Calcite — парсер, validator, SqlToRelConverter, оптимизатор. Executor у Flink свой (StreamGraph и JobGraph). Это критически важное разделение: Calcite ничего не знает про watermarks, state, checkpoints. Flink добавляет свои RelNode-подклассы (StreamPhysicalRel) и свои правила оптимизатора.

SQL -> SqlNode: парсинг

Первый шаг pipeline — превратить текст SQL в дерево SqlNode. У Calcite своя грамматика, которую можно расширять — Flink добавляет свои синтаксические конструкции (например, MATCH_RECOGNIZE, WATERMARK FOR ..., CREATE TABLE ... WITH (...)).

Входной SQL:
  SELECT user_id, COUNT(*) AS cnt
  FROM clicks
  WHERE country = 'US'
  GROUP BY user_id

SqlNode tree (упрощённо):
  SqlSelect
  +-- selectList: [SqlIdentifier(user_id), SqlAsOperator(COUNT(*), cnt)]
  +-- from: SqlIdentifier(clicks)
  +-- where: SqlBasicCall(EQUALS, country, 'US')
  +-- groupBy: [SqlIdentifier(user_id)]

SqlNode — это синтаксическое дерево, оно ещё не знает про схему. На этом этапе нельзя сказать, существует ли таблица clicks и какие у неё столбцы. Это работа validator.

Validation: проверка типов и каталога

Validator (SqlValidator) работает в двух фазах:

  1. Scope resolution — определяет, к какой таблице относится каждый идентификатор.
  2. Type derivation — выводит тип каждого выражения снизу вверх по дереву.

Если столбца country нет в clicks, validator выкинет SqlValidatorException: Column 'country' not found. Если WHERE country = 5 — выкинет ошибку типа: integer не сравнивается со строкой без явного cast.

Каталог (CatalogReader) — это интерфейс, который Flink реализует через свой Catalog API (CatalogManager, Catalog, CatalogBaseTable). Когда вы регистрируете таблицу через tableEnv.createTemporaryTable(...) или через DDL CREATE TABLE, она попадает в каталог Flink, а Calcite читает оттуда схему.

SqlToRelConverter: переход к relational algebra

После валидации SqlToRelConverter превращает SqlNode в дерево RelNode. RelNode — это операторы реляционной алгебры. Каждый узел представляет операцию над отношением (таблицей или потоком).

SqlNode (синтаксис) -> RelNode (семантика)

SELECT user_id, COUNT(*) FROM clicks WHERE country='US' GROUP BY user_id

Превращается в:
  LogicalAggregate(group=[{0}], cnt=[COUNT()])
    LogicalProject(user_id=[$0])
      LogicalFilter(condition=[=($2, 'US')])
        LogicalTableScan(table=[clicks])

Базовые типы RelNode в Calcite (Logical-варианты):

  • LogicalTableScan — чтение таблицы из каталога.
  • LogicalProject — выбор и преобразование столбцов (часть SELECT).
  • LogicalFilter — фильтрация (часть WHERE/HAVING).
  • LogicalAggregate — группировка и агрегации (GROUP BY).
  • LogicalJoin — соединение двух отношений.
  • LogicalUnion / LogicalIntersect / LogicalMinus — теоретико-множественные операции.
  • LogicalSort — упорядочивание (ORDER BY) и/или LIMIT.
  • LogicalCorrelate — для коррелированных подзапросов.
  • LogicalWindow — оконные функции (OVER).

Логические узлы означают “что делаем”, не “как”. Физическая реализация — стрим vs batch, какой алгоритм join, какой тип агрегации — определяется на следующих фазах через правила.

Calcite transformation pipeline
SQL TextТекст SQL. На этом этапе ничего не известно про схему или типы. Парсер просто разбирает синтаксис.
JavaCC parser
SqlNodeАбстрактное синтаксическое дерево. SqlSelect, SqlIdentifier, SqlBasicCall. Не знает ни о таблицах, ни о типах. Расширяется грамматикой Flink (WATERMARK, MATCH_RECOGNIZE).
CatalogReaderCatalog Manager Flink: ссылается на CatalogBaseTable, source DDL, schema. Validator резолвит имена столбцов и таблиц через этот интерфейс.
SqlValidator
Validated SqlNode + RelDataTypesВалидированный SqlNode + типы каждого выражения. Если столбец не найден или типы несовместимы — exception здесь.
SqlToRelConverterSqlToRelConverter обходит SqlNode сверху вниз, материализуя реляционные операторы. Подзапросы и коррелированные ссылки превращаются в LogicalCorrelate и Subquery RelNodes.
emit
Logical RelNode treeДерево LogicalProject/LogicalFilter/LogicalAggregate/LogicalJoin/LogicalTableScan. Это вход для оптимизатора. На этом этапе нет различия между stream и batch.
HepPlanner + VolcanoPlannerHepPlanner (rule-based) и VolcanoPlanner (cost-based). Применяют правила: predicate pushdown, projection pushdown, join reordering, subquery decorrelation. На выходе — optimized RelNode.
rules
Physical RelNode (Flink-specific)Физический план — StreamPhysicalRel или BatchPhysicalRel (классы Flink). Дальше Flink codegen генерирует Java-код и собирает ExecNode -> JobGraph.

RelNode как универсальный IR

Почему RelNode такой важный? Потому что это универсальный язык для описания запросов. Если ваш SQL валиден, он будет однозначно конвертирован в RelNode-дерево. После этого все оптимизации — это трансформации RelNode -> RelNode, не имеющие отношения к синтаксису SQL.

Два разных SQL:
  Q1: SELECT * FROM a JOIN b ON a.id = b.id WHERE a.x > 5
  Q2: SELECT * FROM a, b WHERE a.id = b.id AND a.x > 5

Дают эквивалентные LogicalJoin + LogicalFilter после canonical form.
Оптимизатор может работать с одним представлением, не различая исходный SQL.

RelNode имеет три ключевые характеристики:

  1. Inputs — другие RelNode, от которых зависит этот. Дерево строится снизу вверх.
  2. RowType — схема выходных столбцов с типами (RelDataType).
  3. Traits — физические характеристики (collation, distribution, convention). Используются Volcano-планером для матчинга rule-based и cost-based.

Trait Convention определяет “calling convention” — в каком физическом представлении находится узел. У Calcite по умолчанию есть Convention.NONE (логические узлы) и EnumerableConvention (in-memory исполнение через Linq4j). Flink определяет свои conventions: FlinkConventions.STREAM_PHYSICAL и FlinkConventions.BATCH_PHYSICAL.

Когда логический план готов, начинается главная работа — поиск оптимального физического плана. У Volcano это search task:

1. Группировка эквивалентных планов в RelSubset
   - Каждое логическое выражение принадлежит RelSet (group of equivalent expressions)
   - В RelSet несколько RelSubset, по одному на конвенцию (STREAM_PHYSICAL, BATCH_PHYSICAL...)

2. Применение правил
   - RelOptRule имеет matching pattern на RelNode subtree
   - Когда rule matches, генерирует новый RelNode и добавляет в тот же RelSet
   - Все варианты сохраняются — это memoization

3. Cost calculation
   - Каждый физический оператор оценивается RelMetadataQuery
   - Cost модель: cpu + io + memory + network
   - Volcano выбирает best plan через top-down dynamic programming

4. Best plan extraction
   - Из RelSet root выбирается subplan с минимальной стоимостью
   - Этот subplan — физический план для исполнения

В Flink cost model переписана: добавлены метрики для streaming (rate-based costs, state size). Для batch — стандартная модель строк × bytes per row × фактор операции.

Rule-based vs cost-based

Calcite использует два типа planner и оба важны:

  • HepPlanner — детерминированный, применяет правила в фиксированном порядке. Хорош для обязательных трансформаций (subquery decorrelation, filter normalization). Быстрый, не требует cost model.
  • VolcanoPlanner — exploratory, рассматривает множество вариантов, выбирает по cost. Хорош для join reordering, выбора алгоритма join, выбора index/scan.

Flink использует обоих в pipeline: сначала HepPlanner для подготовки и обязательных правил, потом VolcanoPlanner для cost-based поиска физических планов.

TIP

Если запрос компилируется минуты — почти всегда виноват VolcanoPlanner. Для запроса с 10+ joins пространство поиска экспоненциальное. Решения: timeout (table.optimizer.search.timeout), greedy join reordering (table.optimizer.join-reorder-enabled = false отключает), bushy plans (table.optimizer.bushy-tree-join-reorder-threshold).

Flink не использует Calcite “из коробки”. Большой объём кода добавляется поверх:

Расширения Flink:
  flink-table-planner
  +-- FlinkPlannerImpl — оборачивает Calcite planner
  +-- FlinkRelOptCluster — расширенный RelOptCluster с Flink-specific
  +-- FlinkConvention.STREAM_PHYSICAL, BATCH_PHYSICAL
  +-- StreamPhysicalRel base class
  +-- StreamPhysicalCalc, StreamPhysicalGroupAggregate, StreamPhysicalJoin...
  +-- FlinkRelMetadataQuery — переопределённые метрики
  +-- FlinkRelMdRowCount, FlinkRelMdSelectivity — cost оценки
  +-- StreamCommonSubGraphBasedOptimizer — главный оптимизатор для streaming
  +-- BatchCommonSubGraphBasedOptimizer — для batch
  +-- 200+ собственных RelOptRule

Conventions:
  - LOGICAL (Calcite дефолтные)
  - STREAM_PHYSICAL (узлы, исполняемые в streaming mode)
  - BATCH_PHYSICAL (узлы, исполняемые в batch mode)

Stream-only узлы (нет batch-аналога):
  - StreamPhysicalDeduplicate
  - StreamPhysicalGroupAggregate (с retract support)
  - StreamPhysicalChangelogNormalize
  - StreamPhysicalWatermarkAssigner
  - StreamPhysicalIntervalJoin

Batch-only узлы:
  - BatchPhysicalHashJoin
  - BatchPhysicalSortMergeJoin
  - BatchPhysicalNestedLoopJoin
  - BatchPhysicalHashAggregate
  - BatchPhysicalSortAggregate

Это разделение позволяет Flink использовать одну SQL-логику для streaming и batch, но физическая реализация и cost model различные. Подробно о различиях — в уроке 4 этого модуля.

Calcite сам каталога не имеет — он использует абстракцию RelOptSchema / CatalogReader. Flink реализует через CatalogManager:

CatalogManager (Flink)
  +-- GenericInMemoryCatalog (default in-memory)
  +-- HiveCatalog (Hive Metastore)
  +-- JdbcCatalog (PostgreSQL, MySQL)
  +-- Paimon Catalog (lakehouse tables)
  +-- HudiCatalog, IcebergCatalog (через connectors)

При SQL compile:
  1. CalciteCatalogReader.getTable(qualifiedName)
  2. CalciteCatalogReader -> FlinkSchema -> CatalogManager.getTable
  3. CatalogManager -> конкретный Catalog (например HiveCatalog)
  4. HiveCatalog -> Hive Metastore (Thrift call) -> CatalogBaseTable
  5. CatalogBaseTable -> ResolvedSchema -> RelDataType для Calcite

Поэтому DDL CREATE TABLE foo (...) WITH (...) в Flink — это запись в Catalog. Когда вы потом пишете SELECT * FROM foo, Calcite через CatalogReader находит схему в том же Catalog и строит LogicalTableScan.

Расширение грамматики SQL

Flink добавляет свои синтаксические конструкции. Это делается через расширение Parser.jj:

flink-sql-parser/.../Parser.jj — extended grammar

Добавляет:
  CREATE TABLE ... WITH (key = 'value', ...)
  WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
  TUMBLE(TABLE clicks, DESCRIPTOR(rowtime), INTERVAL '1' HOUR) — table function syntax
  MATCH_RECOGNIZE (PARTITION BY ... ORDER BY ... PATTERN ...)
  EXPLAIN PLAN FOR ... INCLUDE ALL ATTRIBUTES
  STATEMENT SET BEGIN ... END
  EXECUTE STATEMENT SET ...
  USE CATALOG ...
  SHOW CATALOGS / DATABASES / TABLES / FUNCTIONS

Парсер re-generates через JavaCC в build time.
Изменения в Parser.jj требуют compile flink-sql-parser module.

Чтение source

Если хочется покопаться:

flink-table-planner/src/main/java/org/apache/flink/table/planner/
  +-- delegation/PlannerBase.scala — главный entry point planner
  +-- delegation/StreamPlanner.scala
  +-- delegation/BatchPlanner.scala
  +-- plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
  +-- plan/nodes/physical/stream/StreamPhysicalRel.scala
  +-- plan/rules/physical/stream/StreamExecRules.scala
  +-- plan/cost/FlinkRelMdRowCount.scala

flink-sql-parser/src/main/codegen/templates/Parser.jj — грамматика

Calcite-side:
  org.apache.calcite.sql — SqlNode иерархия
  org.apache.calcite.rel — RelNode иерархия
  org.apache.calcite.plan.volcano — VolcanoPlanner
  org.apache.calcite.plan.hep — HepPlanner
  org.apache.calcite.sql2rel — SqlToRelConverter

Три типичных сценария, где это окупается:

  1. Чтение Explain Plans. EXPLAIN PLAN FOR SELECT ... показывает RelNode tree. Если не знаешь типы узлов и trait — план непонятен. После изменения SQL знание Calcite позволяет понять, почему planner выбрал другой алгоритм.

  2. Catalog ошибки. Когда Table 'foo' not found in any of the catalogs — это validator на этапе resolution. Понимание catalog chain CatalogManager -> Catalog -> Schema -> Table ускоряет дебаг.

  3. Performance regressions. При апгрейде Flink с 1.20 на 2.0 план может измениться (новые правила, новая cost model). Чтобы локально воспроизвести и зафиксировать через hints — нужно знать, какие правила Flink применяет в каком порядке.

Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какую роль играет Apache Calcite в Flink SQL pipeline?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5