Rule sets оптимизатора
В прошлом уроке мы видели, что Flink использует около 7 фаз оптимизатора с разными целями. Но содержимое этих фаз — это коллекции правил (RelOptRule). Каждое правило это паттерн “когда узел выглядит так, перепиши его этак”. Правила — главный механизм оптимизации в Calcite-стиле планерах.
В этом уроке мы разбираем самые важные группы правил, которые применяет Flink. Понимая, какие правила работают на каких узлах, вы сможете предсказывать, во что превратится ваш SQL, и точечно влиять на план через hints, конфиги и переписывание SQL.
Logical и Physical планы в Spark CatalystАнатомия RelOptRule
RelOptRule (или RelRule в новых версиях Calcite) состоит из трёх частей:
class MyRule extends RelOptRule {
// 1. Pattern — какой subtree RelNode тригеррит правило
operand(LogicalFilter.class,
operand(LogicalProject.class, any()))
// 2. matches() — дополнительные проверки (опционально)
boolean matches(RelOptRuleCall call) {
Filter filter = call.rel(0);
return filter.getCondition() satisfies some condition;
}
// 3. onMatch() — действие: построить новый план и зарегистрировать
void onMatch(RelOptRuleCall call) {
Filter filter = call.rel(0);
Project project = call.rel(1);
RelNode newPlan = construct new tree;
call.transformTo(newPlan);
}
}
Когда правило срабатывает в VolcanoPlanner, оба варианта (старый и новый) остаются в RelSet. Планер потом выберет дешёвый. В HepPlanner старый узел заменяется.
Flink держит свои правила в flink-table-planner модуле: org.apache.flink.table.planner.plan.rules — пакет с подпакетами logical, physical/stream, physical/batch. Сотни правил суммарно.
Группа 1: Subquery rewriting
SQL позволяет подзапросы в трёх контекстах:
-- Scalar subquery (возвращает 1 значение)
SELECT a, (SELECT MAX(x) FROM t2) AS max_x FROM t1;
-- IN/NOT IN subquery
SELECT a FROM t1 WHERE a IN (SELECT b FROM t2);
-- EXISTS/NOT EXISTS subquery
SELECT a FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t2.b = t1.a);
Оптимизатор хочет переписать эти подзапросы в обычные join, потому что:
- Naïve исполнение подзапроса в каждой строке = O(N×M).
- Joins оптимизируются эффективно (hash join, sort merge, broadcast).
- Подзапросы плохо переносят шафлы и параллелизм.
Правила subquery rewriting (упрощённо):
SubQueryRemoveRule:
- LogicalFilter с IN(subquery) -> LogicalJoin(SEMI)
- LogicalFilter с EXISTS(subquery) -> LogicalJoin(SEMI)
- LogicalFilter с NOT IN(subquery) -> LogicalJoin(ANTI)
- LogicalFilter с NOT EXISTS(subquery) -> LogicalJoin(ANTI)
- LogicalProject со scalar subquery -> LogicalJoin(LEFT) + LogicalAggregate
Пример:
Before:
LogicalFilter(EXISTS(SELECT 1 FROM t2 WHERE t2.b = t1.a))
LogicalTableScan(t1)
After:
LogicalJoin(condition=t1.a=t2.b, joinType=SEMI)
LogicalTableScan(t1)
LogicalTableScan(t2)
SEMI join выводит строки из левой стороны, для которых нашёлся match в правой — без дублирования. ANTI — наоборот, строки без match. Большинство современных RDBMS и Flink эффективно исполняют SEMI/ANTI joins.
Группа 2: Decorrelation
Коррелированные подзапросы — это подзапросы, которые ссылаются на столбцы из enclosing query. Calcite моделирует их через LogicalCorrelate — оператор, который соединяет внешнюю и внутреннюю часть.
SELECT a, b
FROM t1
WHERE b < (SELECT AVG(x) FROM t2 WHERE t2.k = t1.k); -- t1.k correlation
В наивной форме это LogicalCorrelate. Но корелляция мешает оптимизации: planner не может pushdown predicate, не может изменить порядок, не может выбрать join algorithm. Decorrelation превращает LogicalCorrelate в LogicalJoin с обычным equi-join condition.
Правила decorrelation:
RelDecorrelator (Calcite framework):
- Анализирует LogicalCorrelate
- Находит correlated references (RexFieldAccess на CorrelationId)
- Перестраивает inner expression в форму, где correlation становится join key
- Замещает LogicalCorrelate на LogicalJoin
FlinkRelDecorrelator (Flink-extended):
- Дополнительные правила для streaming-специфичных случаев
- Поддержка коррелированных temporal joins
После decorrelation:
Before:
LogicalCorrelate(joinType=LEFT)
LogicalTableScan(t1)
LogicalAggregate(AVG(x))
LogicalFilter(t2.k = $cor0.k)
LogicalTableScan(t2)
After:
LogicalJoin(condition=t1.k = t2_avg.k)
LogicalTableScan(t1)
LogicalAggregate(group by t2.k, AVG(x) as avg_x)
LogicalTableScan(t2)
Не все коррелированные подзапросы можно decorrelate. Если внутри есть LIMIT, OFFSET, или non-equi correlation, decorrelation fails. Flink выкинет Cannot decorrelate ... TableException: This is a bug, please file an issue. Workaround — переписать запрос вручную в форму join.
Группа 3: Predicate pushdown
Идея: переместить фильтр как можно ниже в плане (ближе к источникам). Это уменьшает число строк, которые обрабатывают вышестоящие операторы.
SELECT *
FROM orders o JOIN customers c ON o.cust_id = c.id
WHERE c.country = 'US' AND o.amount > 100;
Без pushdown:
LogicalFilter(c.country='US' AND o.amount>100)
LogicalJoin(o.cust_id = c.id)
LogicalTableScan(orders) -- читает все orders
LogicalTableScan(customers) -- читает всех customers
После pushdown:
LogicalJoin(o.cust_id = c.id)
LogicalFilter(o.amount > 100)
LogicalTableScan(orders)
LogicalFilter(c.country = 'US')
LogicalTableScan(customers)
В streaming это уменьшает throughput через сетевой shuffle. В batch — может pushdown ещё дальше, до уровня source, если source поддерживает SupportsFilterPushDown:
LogicalJoin(o.cust_id = c.id)
LogicalTableScan(orders WITH filter o.amount > 100) -- predicate pushed to source
LogicalTableScan(customers WITH filter c.country='US')
Это особенно мощно для:
- Iceberg/Paimon/Hudi sources — могут пропустить целые файлы через manifest pruning.
- JDBC sources — добавляют WHERE прямо в SQL запрос к remote БД.
- Parquet/ORC — фильтр через row group statistics (Min/Max), bloom filter pruning.
Flink правила:
PushFilterIntoTableSourceScanRule — main rule
FilterProjectTransposeRule — переставляет filter ниже project
FilterJoinRule — pushdown поверх join (boundary check)
JoinPushExpressionsRule — выталкивает expressions в outputs
Группа 4: Projection pushdown
Аналог predicate pushdown, но для столбцов. Если запрос использует только 3 столбца из таблицы с 100 столбцами, source должен читать только эти 3.
SELECT user_id, amount FROM orders WHERE country = 'US';
После projection pushdown:
LogicalTableScan(orders, fields=[user_id, amount, country], filter=country='US')
Эффект особенно мощный для column-store форматов (Parquet, ORC):
- Читаются только нужные column chunks.
- Сетевой трафик уменьшается пропорционально.
- IO пропорционально числу нужных столбцов.
Правила:
PushProjectIntoTableSourceScanRule
ProjectFilterTransposeRule — проталкивает project ниже filter (если filter не использует выкинутые столбцы)
ProjectJoinTransposeRule
JoinProjectTransposeRule
Группа 5: Constant folding и predicate simplification
Самые “школьные” оптимизации:
1 + 2 -> 3 -- ReduceExpressionsRule
WHERE 1=1 -> WHERE TRUE -> no filter -- redundant predicate
WHERE country='US' AND TRUE -> WHERE country='US'
WHERE country IN ('US','US','UK') -> WHERE country IN ('US','UK')
WHERE x BETWEEN 1 AND 10 -> WHERE x >= 1 AND x <= 10
WHERE x = NULL -> WHERE FALSE -- NULL semantics
SUBSTRING(s, 1, LENGTH(s)) -> s -- function inlining
Calcite правила:
ReduceExpressionsRule (для Project/Filter/Calc)
ReduceJoinConditionRule
SimplifyFilterConditionRule
ConstantPropagationRule
Эти правила недорогие и применяются в Phase 1: Pre-Optimization через HepPlanner. Эффект — меньше работы downstream-операторам.
Группа 6: Join reordering — cost-based
Это самая сложная и важная группа. Порядок joins сильно влияет на performance:
SELECT *
FROM small_table s
JOIN huge_table h ON s.k = h.k
JOIN medium_table m ON h.j = m.j;
Плохой порядок: (huge JOIN medium) JOIN small — выпадает большой intermediate.
Хороший порядок: (small JOIN huge) JOIN medium — фильтрация через small сразу уменьшает huge.
Cost-based join reordering работает только в VolcanoPlanner. Он перебирает разные порядки, оценивает каждый через cost model и выбирает дешёвый.
Стратегии reordering:
1. Left-deep tree (default)
- Joins вкладываются слева
- Пример: ((A JOIN B) JOIN C) JOIN D
- Простой алгоритм, ограниченное пространство поиска
2. Bushy tree
- Любая форма дерева
- Пример: (A JOIN B) JOIN (C JOIN D)
- Большое пространство поиска, но лучшие планы для больших joins
- Включается через table.optimizer.bushy-tree-join-reorder-threshold=N
3. Greedy
- Жадный выбор пар (cheapest first)
- Fallback для очень больших join групп
Cost = function(row count, row width, join condition selectivity, algorithm). Flink использует FlinkRelMdRowCount для оценки row count после joins — это часто bottleneck качества плана.
Самая частая жалоба на Flink SQL — “plan был хороший, потом стал плохой”. Обычно это потому, что Statistics устарели или cardinality estimate неточный. Flink использует CatalogTableStatistics для row counts — если их нет, fallback на default constant. В production стоит регулярно обновлять статистики через ANALYZE TABLE или через external metadata.
Группа 7: Aggregate optimization
Aggregates имеют свои правила, важные для performance:
TwoPhaseAggregateRule:
Идея: разделить агрегацию на local + global фазы
Before:
LogicalAggregate(group by user, SUM(amount))
LogicalExchange(hash by user)
Source
After:
LogicalAggregate(group by user, SUM(partial_sum)) -- global
LogicalExchange(hash by user)
LogicalAggregate(group by user, SUM(amount) AS partial_sum) -- local
Source
Эффект: local aggregation сокращает данные перед сетевым shuffle.
AggregateUnionTransposeRule:
Aggregate(Union(A, B)) -> Union(Aggregate(A), Aggregate(B))
AggregateProjectMergeRule:
Project на Aggregate -> объединить в один Aggregate
TwoPhaseAggregateRule — критическое правило для streaming throughput. По умолчанию выключено для streaming (table.exec.mini-batch.enabled=false). Включается с MiniBatch processing (table.exec.mini-batch.enabled=true + size/latency параметры).
Группа 8: Stream-only правила
Эти правила работают только для streaming pipeline:
MiniBatchIntervalInferRule:
Анализирует sink retract behaviour, выставляет mini-batch interval
для downstream-агрегаций
DeduplicateLastRowRule:
ROW_NUMBER() OVER (PARTITION BY k ORDER BY t DESC) FETCH FIRST 1 ROW
-> StreamPhysicalDeduplicate operator
DeltaJoinRule (Flink 2.1+):
Streaming joins, где обе стороны имеют primary key и часто обновляются
-> DeltaJoin operator с минимальным state footprint
MultiJoinRule (Flink 2.2):
Каскадные joins без intermediate state
-> MultiJoin operator
Подробнее об этих оптимизациях — в уроках 4 и 5 этого модуля.
Группа 9: Batch-only правила
JoinPushTransitivePredicatesRule:
Если a.x = b.x AND b.x = 5, то можно вывести a.x = 5 и push в источник
JoinDistributionRule:
Выбирает PARTITIONED vs BROADCAST distribution для join
Cost-based
EnumerableJoinConverterRule (стандартный Calcite, не используется Flink):
Только в Calcite Enumerable convention
Flink batch:
BatchPhysicalJoinRule — выбор HashJoin / SortMergeJoin / NestedLoopJoin
BatchPhysicalAggregateRule — HashAggregate / SortAggregate
Rule-based (HepPlanner) vs Cost-based (VolcanoPlanner)
Aspect HepPlanner VolcanoPlanner
Strategy Apply rules in fixed order Search alternatives, pick cheapest
Memoization Replace in-place Keep all variants in RelSet
Cost model Not used Required
Determinism Always same plan Same plan for same statistics
Speed Fast (linear in rules) Slow (exponential in worst case)
Use case Required normalizations, Choosing algorithms,
rewriting non-negotiables join ordering, physical opts
Rule trigger once per rule per node whenever subtree changes
Flink использует обоих в разных фазах:
Phase 1 (Pre-Opt): HepPlanner — обязательная нормализация
Phase 2 (Decorrelation): HepPlanner — детерминированное переписывание
Phase 3 (Logical opt): VolcanoPlanner — поиск + join reorder
Phase 4 (Time material.): HepPlanner — Flink-specific обязательно
Phase 5 (Physical conv.): VolcanoPlanner — выбор физических узлов
Phase 6 (Physical rewrite): HepPlanner — финальные оптимизации
Phase 7 (Post-opt): HepPlanner — sink wrapping, watermarks
Как влиять на план
1. Hints — внутри SQL
SELECT /*+ USE_HASH(a, b) */ ... — выбрать join algorithm
2. Configs — глобально через TableConfig
table.optimizer.join-reorder-enabled = true/false
table.optimizer.distinct-agg.split.enabled = true
table.exec.mini-batch.enabled = true
table.optimizer.bushy-tree-join-reorder-threshold = 12
3. Statistics — через ANALYZE TABLE или Catalog
Без статистик cost-based оптимизатор работает по default'ам
4. Rewriting SQL — последний шанс
Если SQL получается плохо, попробовать переписать
(вынести подзапрос как CTE, переупорядочить joins вручную)
5. Compiled plans — заморозить лучший план
COMPILE PLAN, EXECUTE PLAN — план не меняется между апгрейдами
Кейс: один SQL — три плана
Один и тот же SQL может дать существенно разные планы в зависимости от:
SELECT customer_id, COUNT(*) AS orders_cnt
FROM orders
WHERE country = 'US' AND order_date > DATE '2025-01-01'
GROUP BY customer_id;
Stream mode (default):
StreamPhysicalSink
StreamPhysicalGroupAggregate (group by customer_id, COUNT)
StreamPhysicalExchange (hash by customer_id)
StreamPhysicalCalc (filter, project customer_id)
StreamPhysicalTableSourceScan (orders, no filter pushdown)
Stream + MiniBatch enabled:
StreamPhysicalSink
StreamPhysicalGlobalGroupAggregate (group by customer_id, SUM(local_cnt))
StreamPhysicalExchange (hash by customer_id)
StreamPhysicalLocalGroupAggregate (group by customer_id, COUNT AS local_cnt)
StreamPhysicalCalc
StreamPhysicalTableSourceScan
Batch mode (Paimon/Iceberg source):
BatchPhysicalSink
BatchPhysicalHashAggregate (group by customer_id, COUNT)
BatchPhysicalExchange (hash by customer_id)
BatchPhysicalCalc
BatchPhysicalTableSourceScan(orders WITH filter, projection)
-- filter и projection pushdown в Paimon
-- file pruning через manifests
Чтение source
Calcite rule base:
org.apache.calcite.plan.RelOptRule -- base class
org.apache.calcite.rel.rules -- стандартные правила (200+)
FilterJoinRule, ProjectMergeRule, AggregateProjectMergeRule, ...
Flink streaming rules:
flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/
logical/ -- общие правила для stream + batch
physical/stream/ -- stream-only
physical/batch/ -- batch-only
ScalaCase classes: FlinkStreamRuleSets, FlinkBatchRuleSets
Конкретные интересные:
PushFilterIntoTableSourceScanRule
PushProjectIntoTableSourceScanRule
TwoStageOptimizedAggregateRule
SplitAggregateRule
SubQueryDecorrelator
StreamPhysicalDeltaJoinRule (Flink 2.1+)
StreamPhysicalMultiJoinRule (Flink 2.2)
Когда самим писать правила
Это редкий, но legitimate сценарий — extension для специального connector’а или custom function:
1. Custom source с особыми pushdown возможностями
-> implement SupportsFilterPushDown, SupportsProjectionPushDown
-> Flink правила сделают остальное автоматически
2. Custom physical operator (например, native execution)
-> Создать новый Convention, RelNode subclass, RelOptRule
3. Domain-specific оптимизации
-> Например, для geo-data: rewrite ST_Distance(...) < r в bounding box predicate
-> Реализовать как HepPlanner rule
В 95% случаев пишут не свои правила, а используют hints и configs. Свои правила — для библиотек/коннекторов, не для бизнес-логики.