Apache Calcite — relational algebra IR
Apache Flink не пишет SQL-парсер сам. Все системы, которые научились SQL — Hive, Drill, Storm SQL, Beam SQL, Kafka KSQL до версии 2, Flink SQL, MaxCompute, Samza SQL — берут готовый компонент Apache Calcite. Это не библиотека парсинга в стиле ANTLR, а полноценный optimizer framework: парсер, валидатор, IR на основе relational algebra, оптимизатор с правилами и cost-моделью, codegen в физический план.
В этом уроке мы разбираемся, что такое Calcite, почему он стал индустриальным стандартом для SQL-on-anything систем и какую роль он играет в Flink. Без понимания Calcite вы не сможете читать стек-трейсы Flink SQL ошибок, не отлавите regressions в плане после миграции и не объясните, почему два логически эквивалентных запроса дают разный JobGraph.
Catalyst Optimizer в Spark: тот же Calcite, другой контекст ksqlDB: SQL поверх Kafka StreamsЧто такое Apache Calcite
Calcite — это библиотека для построения query processing систем. Apache top-level проект с 2015 года, исторически отделился от Apache Drill. Идея, которая делает Calcite уникальным: разделить SQL frontend и storage backend.
Традиционные СУБД (PostgreSQL, MySQL):
SQL parser -> planner -> executor -> storage
Все слои — монолит. SQL понятен только своему storage.
Calcite-based системы (Flink, Hive, Drill):
SQL parser (Calcite) ->
RelNode IR (Calcite) ->
Rules + cost optimizer (Calcite) ->
Physical plan (специфичный для движка) ->
Executor (Flink, Spark, Hive...)
Любой движок может использовать SQL frontend Calcite.
Calcite сам не выполняет запросы — он строит план.
Ключевые компоненты Calcite:
- SQL parser — generated через JavaCC из грамматики Parser.jj. Возвращает абстрактное синтаксическое дерево SqlNode.
- Validator — проверяет ссылки на таблицы, типы столбцов, разрешает имена через каталог.
- SqlToRelConverter — конвертирует SqlNode в RelNode (relational algebra tree).
- HepPlanner — rule-based planner для детерминированных трансформаций.
- VolcanoPlanner — cost-based планировщик из работы Goetz Graefe (1995), Volcano/Cascades framework.
- RelNode + RelTrait + RelOptRule — extension points для добавления своих операторов и правил.
- Codegen + Linq4j — генерация runtime кода (используется в самом Calcite, но Flink игнорирует и делает свой codegen).
Flink использует только frontend Calcite — парсер, validator, SqlToRelConverter, оптимизатор. Executor у Flink свой (StreamGraph и JobGraph). Это критически важное разделение: Calcite ничего не знает про watermarks, state, checkpoints. Flink добавляет свои RelNode-подклассы (StreamPhysicalRel) и свои правила оптимизатора.
SQL -> SqlNode: парсинг
Первый шаг pipeline — превратить текст SQL в дерево SqlNode. У Calcite своя грамматика, которую можно расширять — Flink добавляет свои синтаксические конструкции (например, MATCH_RECOGNIZE, WATERMARK FOR ..., CREATE TABLE ... WITH (...)).
Входной SQL:
SELECT user_id, COUNT(*) AS cnt
FROM clicks
WHERE country = 'US'
GROUP BY user_id
SqlNode tree (упрощённо):
SqlSelect
+-- selectList: [SqlIdentifier(user_id), SqlAsOperator(COUNT(*), cnt)]
+-- from: SqlIdentifier(clicks)
+-- where: SqlBasicCall(EQUALS, country, 'US')
+-- groupBy: [SqlIdentifier(user_id)]
SqlNode — это синтаксическое дерево, оно ещё не знает про схему. На этом этапе нельзя сказать, существует ли таблица clicks и какие у неё столбцы. Это работа validator.
Validation: проверка типов и каталога
Validator (SqlValidator) работает в двух фазах:
- Scope resolution — определяет, к какой таблице относится каждый идентификатор.
- Type derivation — выводит тип каждого выражения снизу вверх по дереву.
Если столбца country нет в clicks, validator выкинет SqlValidatorException: Column 'country' not found. Если WHERE country = 5 — выкинет ошибку типа: integer не сравнивается со строкой без явного cast.
Каталог (CatalogReader) — это интерфейс, который Flink реализует через свой Catalog API (CatalogManager, Catalog, CatalogBaseTable). Когда вы регистрируете таблицу через tableEnv.createTemporaryTable(...) или через DDL CREATE TABLE, она попадает в каталог Flink, а Calcite читает оттуда схему.
SqlToRelConverter: переход к relational algebra
После валидации SqlToRelConverter превращает SqlNode в дерево RelNode. RelNode — это операторы реляционной алгебры. Каждый узел представляет операцию над отношением (таблицей или потоком).
SqlNode (синтаксис) -> RelNode (семантика)
SELECT user_id, COUNT(*) FROM clicks WHERE country='US' GROUP BY user_id
Превращается в:
LogicalAggregate(group=[{0}], cnt=[COUNT()])
LogicalProject(user_id=[$0])
LogicalFilter(condition=[=($2, 'US')])
LogicalTableScan(table=[clicks])
Базовые типы RelNode в Calcite (Logical-варианты):
- LogicalTableScan — чтение таблицы из каталога.
- LogicalProject — выбор и преобразование столбцов (часть SELECT).
- LogicalFilter — фильтрация (часть WHERE/HAVING).
- LogicalAggregate — группировка и агрегации (GROUP BY).
- LogicalJoin — соединение двух отношений.
- LogicalUnion / LogicalIntersect / LogicalMinus — теоретико-множественные операции.
- LogicalSort — упорядочивание (ORDER BY) и/или LIMIT.
- LogicalCorrelate — для коррелированных подзапросов.
- LogicalWindow — оконные функции (OVER).
Логические узлы означают “что делаем”, не “как”. Физическая реализация — стрим vs batch, какой алгоритм join, какой тип агрегации — определяется на следующих фазах через правила.
RelNode как универсальный IR
Почему RelNode такой важный? Потому что это универсальный язык для описания запросов. Если ваш SQL валиден, он будет однозначно конвертирован в RelNode-дерево. После этого все оптимизации — это трансформации RelNode -> RelNode, не имеющие отношения к синтаксису SQL.
Два разных SQL:
Q1: SELECT * FROM a JOIN b ON a.id = b.id WHERE a.x > 5
Q2: SELECT * FROM a, b WHERE a.id = b.id AND a.x > 5
Дают эквивалентные LogicalJoin + LogicalFilter после canonical form.
Оптимизатор может работать с одним представлением, не различая исходный SQL.
RelNode имеет три ключевые характеристики:
- Inputs — другие RelNode, от которых зависит этот. Дерево строится снизу вверх.
- RowType — схема выходных столбцов с типами (RelDataType).
- Traits — физические характеристики (collation, distribution, convention). Используются Volcano-планером для матчинга rule-based и cost-based.
Trait Convention определяет “calling convention” — в каком физическом представлении находится узел. У Calcite по умолчанию есть Convention.NONE (логические узлы) и EnumerableConvention (in-memory исполнение через Linq4j). Flink определяет свои conventions: FlinkConventions.STREAM_PHYSICAL и FlinkConventions.BATCH_PHYSICAL.
Volcano planner: cost-based search
Когда логический план готов, начинается главная работа — поиск оптимального физического плана. У Volcano это search task:
1. Группировка эквивалентных планов в RelSubset
- Каждое логическое выражение принадлежит RelSet (group of equivalent expressions)
- В RelSet несколько RelSubset, по одному на конвенцию (STREAM_PHYSICAL, BATCH_PHYSICAL...)
2. Применение правил
- RelOptRule имеет matching pattern на RelNode subtree
- Когда rule matches, генерирует новый RelNode и добавляет в тот же RelSet
- Все варианты сохраняются — это memoization
3. Cost calculation
- Каждый физический оператор оценивается RelMetadataQuery
- Cost модель: cpu + io + memory + network
- Volcano выбирает best plan через top-down dynamic programming
4. Best plan extraction
- Из RelSet root выбирается subplan с минимальной стоимостью
- Этот subplan — физический план для исполнения
В Flink cost model переписана: добавлены метрики для streaming (rate-based costs, state size). Для batch — стандартная модель строк × bytes per row × фактор операции.
Rule-based vs cost-based
Calcite использует два типа planner и оба важны:
- HepPlanner — детерминированный, применяет правила в фиксированном порядке. Хорош для обязательных трансформаций (subquery decorrelation, filter normalization). Быстрый, не требует cost model.
- VolcanoPlanner — exploratory, рассматривает множество вариантов, выбирает по cost. Хорош для join reordering, выбора алгоритма join, выбора index/scan.
Flink использует обоих в pipeline: сначала HepPlanner для подготовки и обязательных правил, потом VolcanoPlanner для cost-based поиска физических планов.
Если запрос компилируется минуты — почти всегда виноват VolcanoPlanner. Для запроса с 10+ joins пространство поиска экспоненциальное. Решения: timeout (table.optimizer.search.timeout), greedy join reordering (table.optimizer.join-reorder-enabled = false отключает), bushy plans (table.optimizer.bushy-tree-join-reorder-threshold).
Где Flink расширяет Calcite
Flink не использует Calcite “из коробки”. Большой объём кода добавляется поверх:
Расширения Flink:
flink-table-planner
+-- FlinkPlannerImpl — оборачивает Calcite planner
+-- FlinkRelOptCluster — расширенный RelOptCluster с Flink-specific
+-- FlinkConvention.STREAM_PHYSICAL, BATCH_PHYSICAL
+-- StreamPhysicalRel base class
+-- StreamPhysicalCalc, StreamPhysicalGroupAggregate, StreamPhysicalJoin...
+-- FlinkRelMetadataQuery — переопределённые метрики
+-- FlinkRelMdRowCount, FlinkRelMdSelectivity — cost оценки
+-- StreamCommonSubGraphBasedOptimizer — главный оптимизатор для streaming
+-- BatchCommonSubGraphBasedOptimizer — для batch
+-- 200+ собственных RelOptRule
Conventions:
- LOGICAL (Calcite дефолтные)
- STREAM_PHYSICAL (узлы, исполняемые в streaming mode)
- BATCH_PHYSICAL (узлы, исполняемые в batch mode)
Stream-only узлы (нет batch-аналога):
- StreamPhysicalDeduplicate
- StreamPhysicalGroupAggregate (с retract support)
- StreamPhysicalChangelogNormalize
- StreamPhysicalWatermarkAssigner
- StreamPhysicalIntervalJoin
Batch-only узлы:
- BatchPhysicalHashJoin
- BatchPhysicalSortMergeJoin
- BatchPhysicalNestedLoopJoin
- BatchPhysicalHashAggregate
- BatchPhysicalSortAggregate
Это разделение позволяет Flink использовать одну SQL-логику для streaming и batch, но физическая реализация и cost model различные. Подробно о различиях — в уроке 4 этого модуля.
Каталог в Flink
Calcite сам каталога не имеет — он использует абстракцию RelOptSchema / CatalogReader. Flink реализует через CatalogManager:
CatalogManager (Flink)
+-- GenericInMemoryCatalog (default in-memory)
+-- HiveCatalog (Hive Metastore)
+-- JdbcCatalog (PostgreSQL, MySQL)
+-- Paimon Catalog (lakehouse tables)
+-- HudiCatalog, IcebergCatalog (через connectors)
При SQL compile:
1. CalciteCatalogReader.getTable(qualifiedName)
2. CalciteCatalogReader -> FlinkSchema -> CatalogManager.getTable
3. CatalogManager -> конкретный Catalog (например HiveCatalog)
4. HiveCatalog -> Hive Metastore (Thrift call) -> CatalogBaseTable
5. CatalogBaseTable -> ResolvedSchema -> RelDataType для Calcite
Поэтому DDL CREATE TABLE foo (...) WITH (...) в Flink — это запись в Catalog. Когда вы потом пишете SELECT * FROM foo, Calcite через CatalogReader находит схему в том же Catalog и строит LogicalTableScan.
Расширение грамматики SQL
Flink добавляет свои синтаксические конструкции. Это делается через расширение Parser.jj:
flink-sql-parser/.../Parser.jj — extended grammar
Добавляет:
CREATE TABLE ... WITH (key = 'value', ...)
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
TUMBLE(TABLE clicks, DESCRIPTOR(rowtime), INTERVAL '1' HOUR) — table function syntax
MATCH_RECOGNIZE (PARTITION BY ... ORDER BY ... PATTERN ...)
EXPLAIN PLAN FOR ... INCLUDE ALL ATTRIBUTES
STATEMENT SET BEGIN ... END
EXECUTE STATEMENT SET ...
USE CATALOG ...
SHOW CATALOGS / DATABASES / TABLES / FUNCTIONS
Парсер re-generates через JavaCC в build time.
Изменения в Parser.jj требуют compile flink-sql-parser module.
Чтение source
Если хочется покопаться:
flink-table-planner/src/main/java/org/apache/flink/table/planner/
+-- delegation/PlannerBase.scala — главный entry point planner
+-- delegation/StreamPlanner.scala
+-- delegation/BatchPlanner.scala
+-- plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+-- plan/nodes/physical/stream/StreamPhysicalRel.scala
+-- plan/rules/physical/stream/StreamExecRules.scala
+-- plan/cost/FlinkRelMdRowCount.scala
flink-sql-parser/src/main/codegen/templates/Parser.jj — грамматика
Calcite-side:
org.apache.calcite.sql — SqlNode иерархия
org.apache.calcite.rel — RelNode иерархия
org.apache.calcite.plan.volcano — VolcanoPlanner
org.apache.calcite.plan.hep — HepPlanner
org.apache.calcite.sql2rel — SqlToRelConverter
Зачем разработчику Flink знать Calcite
Три типичных сценария, где это окупается:
-
Чтение Explain Plans.
EXPLAIN PLAN FOR SELECT ...показывает RelNode tree. Если не знаешь типы узлов и trait — план непонятен. После изменения SQL знание Calcite позволяет понять, почему planner выбрал другой алгоритм. -
Catalog ошибки. Когда
Table 'foo' not found in any of the catalogs— это validator на этапе resolution. Понимание catalog chain CatalogManager -> Catalog -> Schema -> Table ускоряет дебаг. -
Performance regressions. При апгрейде Flink с 1.20 на 2.0 план может измениться (новые правила, новая cost model). Чтобы локально воспроизвести и зафиксировать через hints — нужно знать, какие правила Flink применяет в каком порядке.