Learning Platform
Глоссарий Troubleshooting
Урок 10.03 · 25 мин
Продвинутый
Optimizer RulesPredicate PushdownProjection PushdownDecorrelationJoin ReorderHepPlannerVolcanoPlanner

Rule sets оптимизатора

В прошлом уроке мы видели, что Flink использует около 7 фаз оптимизатора с разными целями. Но содержимое этих фаз — это коллекции правил (RelOptRule). Каждое правило это паттерн “когда узел выглядит так, перепиши его этак”. Правила — главный механизм оптимизации в Calcite-стиле планерах.

В этом уроке мы разбираем самые важные группы правил, которые применяет Flink. Понимая, какие правила работают на каких узлах, вы сможете предсказывать, во что превратится ваш SQL, и точечно влиять на план через hints, конфиги и переписывание SQL.

Logical и Physical планы в Spark Catalyst

Анатомия RelOptRule

RelOptRule (или RelRule в новых версиях Calcite) состоит из трёх частей:

class MyRule extends RelOptRule {
  // 1. Pattern — какой subtree RelNode тригеррит правило
  operand(LogicalFilter.class,
    operand(LogicalProject.class, any()))

  // 2. matches() — дополнительные проверки (опционально)
  boolean matches(RelOptRuleCall call) {
    Filter filter = call.rel(0);
    return filter.getCondition() satisfies some condition;
  }

  // 3. onMatch() — действие: построить новый план и зарегистрировать
  void onMatch(RelOptRuleCall call) {
    Filter filter = call.rel(0);
    Project project = call.rel(1);
    RelNode newPlan = construct new tree;
    call.transformTo(newPlan);
  }
}

Когда правило срабатывает в VolcanoPlanner, оба варианта (старый и новый) остаются в RelSet. Планер потом выберет дешёвый. В HepPlanner старый узел заменяется.

Flink держит свои правила в flink-table-planner модуле: org.apache.flink.table.planner.plan.rules — пакет с подпакетами logical, physical/stream, physical/batch. Сотни правил суммарно.

Группа 1: Subquery rewriting

SQL позволяет подзапросы в трёх контекстах:

-- Scalar subquery (возвращает 1 значение)
SELECT a, (SELECT MAX(x) FROM t2) AS max_x FROM t1;

-- IN/NOT IN subquery
SELECT a FROM t1 WHERE a IN (SELECT b FROM t2);

-- EXISTS/NOT EXISTS subquery
SELECT a FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t2.b = t1.a);

Оптимизатор хочет переписать эти подзапросы в обычные join, потому что:

  • Naïve исполнение подзапроса в каждой строке = O(N×M).
  • Joins оптимизируются эффективно (hash join, sort merge, broadcast).
  • Подзапросы плохо переносят шафлы и параллелизм.
Правила subquery rewriting (упрощённо):

SubQueryRemoveRule:
  - LogicalFilter с IN(subquery) -> LogicalJoin(SEMI)
  - LogicalFilter с EXISTS(subquery) -> LogicalJoin(SEMI)
  - LogicalFilter с NOT IN(subquery) -> LogicalJoin(ANTI)
  - LogicalFilter с NOT EXISTS(subquery) -> LogicalJoin(ANTI)
  - LogicalProject со scalar subquery -> LogicalJoin(LEFT) + LogicalAggregate

Пример:
  Before:
    LogicalFilter(EXISTS(SELECT 1 FROM t2 WHERE t2.b = t1.a))
      LogicalTableScan(t1)

  After:
    LogicalJoin(condition=t1.a=t2.b, joinType=SEMI)
      LogicalTableScan(t1)
      LogicalTableScan(t2)

SEMI join выводит строки из левой стороны, для которых нашёлся match в правой — без дублирования. ANTI — наоборот, строки без match. Большинство современных RDBMS и Flink эффективно исполняют SEMI/ANTI joins.

Группа 2: Decorrelation

Коррелированные подзапросы — это подзапросы, которые ссылаются на столбцы из enclosing query. Calcite моделирует их через LogicalCorrelate — оператор, который соединяет внешнюю и внутреннюю часть.

SELECT a, b
FROM t1
WHERE b < (SELECT AVG(x) FROM t2 WHERE t2.k = t1.k);  -- t1.k correlation

В наивной форме это LogicalCorrelate. Но корелляция мешает оптимизации: planner не может pushdown predicate, не может изменить порядок, не может выбрать join algorithm. Decorrelation превращает LogicalCorrelate в LogicalJoin с обычным equi-join condition.

Правила decorrelation:

RelDecorrelator (Calcite framework):
  - Анализирует LogicalCorrelate
  - Находит correlated references (RexFieldAccess на CorrelationId)
  - Перестраивает inner expression в форму, где correlation становится join key
  - Замещает LogicalCorrelate на LogicalJoin

FlinkRelDecorrelator (Flink-extended):
  - Дополнительные правила для streaming-специфичных случаев
  - Поддержка коррелированных temporal joins

После decorrelation:

Before:
  LogicalCorrelate(joinType=LEFT)
    LogicalTableScan(t1)
    LogicalAggregate(AVG(x))
      LogicalFilter(t2.k = $cor0.k)
        LogicalTableScan(t2)

After:
  LogicalJoin(condition=t1.k = t2_avg.k)
    LogicalTableScan(t1)
    LogicalAggregate(group by t2.k, AVG(x) as avg_x)
      LogicalTableScan(t2)
WARNING

Не все коррелированные подзапросы можно decorrelate. Если внутри есть LIMIT, OFFSET, или non-equi correlation, decorrelation fails. Flink выкинет Cannot decorrelate ... TableException: This is a bug, please file an issue. Workaround — переписать запрос вручную в форму join.

Группа 3: Predicate pushdown

Идея: переместить фильтр как можно ниже в плане (ближе к источникам). Это уменьшает число строк, которые обрабатывают вышестоящие операторы.

SELECT *
FROM orders o JOIN customers c ON o.cust_id = c.id
WHERE c.country = 'US' AND o.amount > 100;

Без pushdown:

LogicalFilter(c.country='US' AND o.amount>100)
  LogicalJoin(o.cust_id = c.id)
    LogicalTableScan(orders)   -- читает все orders
    LogicalTableScan(customers) -- читает всех customers

После pushdown:

LogicalJoin(o.cust_id = c.id)
  LogicalFilter(o.amount > 100)
    LogicalTableScan(orders)
  LogicalFilter(c.country = 'US')
    LogicalTableScan(customers)

В streaming это уменьшает throughput через сетевой shuffle. В batch — может pushdown ещё дальше, до уровня source, если source поддерживает SupportsFilterPushDown:

LogicalJoin(o.cust_id = c.id)
  LogicalTableScan(orders WITH filter o.amount > 100)  -- predicate pushed to source
  LogicalTableScan(customers WITH filter c.country='US')

Это особенно мощно для:

  • Iceberg/Paimon/Hudi sources — могут пропустить целые файлы через manifest pruning.
  • JDBC sources — добавляют WHERE прямо в SQL запрос к remote БД.
  • Parquet/ORC — фильтр через row group statistics (Min/Max), bloom filter pruning.

Flink правила:

PushFilterIntoTableSourceScanRule — main rule
FilterProjectTransposeRule          — переставляет filter ниже project
FilterJoinRule                      — pushdown поверх join (boundary check)
JoinPushExpressionsRule             — выталкивает expressions в outputs

Группа 4: Projection pushdown

Аналог predicate pushdown, но для столбцов. Если запрос использует только 3 столбца из таблицы с 100 столбцами, source должен читать только эти 3.

SELECT user_id, amount FROM orders WHERE country = 'US';

После projection pushdown:

LogicalTableScan(orders, fields=[user_id, amount, country], filter=country='US')

Эффект особенно мощный для column-store форматов (Parquet, ORC):

  • Читаются только нужные column chunks.
  • Сетевой трафик уменьшается пропорционально.
  • IO пропорционально числу нужных столбцов.

Правила:

PushProjectIntoTableSourceScanRule
ProjectFilterTransposeRule  — проталкивает project ниже filter (если filter не использует выкинутые столбцы)
ProjectJoinTransposeRule
JoinProjectTransposeRule

Группа 5: Constant folding и predicate simplification

Самые “школьные” оптимизации:

1 + 2 -> 3                              -- ReduceExpressionsRule
WHERE 1=1 -> WHERE TRUE -> no filter     -- redundant predicate
WHERE country='US' AND TRUE -> WHERE country='US'
WHERE country IN ('US','US','UK') -> WHERE country IN ('US','UK')
WHERE x BETWEEN 1 AND 10 -> WHERE x >= 1 AND x <= 10
WHERE x = NULL -> WHERE FALSE          -- NULL semantics
SUBSTRING(s, 1, LENGTH(s)) -> s        -- function inlining

Calcite правила:

ReduceExpressionsRule (для Project/Filter/Calc)
ReduceJoinConditionRule
SimplifyFilterConditionRule
ConstantPropagationRule

Эти правила недорогие и применяются в Phase 1: Pre-Optimization через HepPlanner. Эффект — меньше работы downstream-операторам.

Группа 6: Join reordering — cost-based

Это самая сложная и важная группа. Порядок joins сильно влияет на performance:

SELECT *
FROM small_table s
JOIN huge_table h ON s.k = h.k
JOIN medium_table m ON h.j = m.j;

Плохой порядок: (huge JOIN medium) JOIN small — выпадает большой intermediate. Хороший порядок: (small JOIN huge) JOIN medium — фильтрация через small сразу уменьшает huge.

Cost-based join reordering работает только в VolcanoPlanner. Он перебирает разные порядки, оценивает каждый через cost model и выбирает дешёвый.

Стратегии reordering:

1. Left-deep tree (default)
   - Joins вкладываются слева
   - Пример: ((A JOIN B) JOIN C) JOIN D
   - Простой алгоритм, ограниченное пространство поиска

2. Bushy tree
   - Любая форма дерева
   - Пример: (A JOIN B) JOIN (C JOIN D)
   - Большое пространство поиска, но лучшие планы для больших joins
   - Включается через table.optimizer.bushy-tree-join-reorder-threshold=N

3. Greedy
   - Жадный выбор пар (cheapest first)
   - Fallback для очень больших join групп

Cost = function(row count, row width, join condition selectivity, algorithm). Flink использует FlinkRelMdRowCount для оценки row count после joins — это часто bottleneck качества плана.

TIP

Самая частая жалоба на Flink SQL — “plan был хороший, потом стал плохой”. Обычно это потому, что Statistics устарели или cardinality estimate неточный. Flink использует CatalogTableStatistics для row counts — если их нет, fallback на default constant. В production стоит регулярно обновлять статистики через ANALYZE TABLE или через external metadata.

Группа 7: Aggregate optimization

Aggregates имеют свои правила, важные для performance:

TwoPhaseAggregateRule:
  Идея: разделить агрегацию на local + global фазы
  Before:
    LogicalAggregate(group by user, SUM(amount))
      LogicalExchange(hash by user)
        Source
  After:
    LogicalAggregate(group by user, SUM(partial_sum))   -- global
      LogicalExchange(hash by user)
        LogicalAggregate(group by user, SUM(amount) AS partial_sum)  -- local
          Source

  Эффект: local aggregation сокращает данные перед сетевым shuffle.

AggregateUnionTransposeRule:
  Aggregate(Union(A, B)) -> Union(Aggregate(A), Aggregate(B))

AggregateProjectMergeRule:
  Project на Aggregate -> объединить в один Aggregate

TwoPhaseAggregateRule — критическое правило для streaming throughput. По умолчанию выключено для streaming (table.exec.mini-batch.enabled=false). Включается с MiniBatch processing (table.exec.mini-batch.enabled=true + size/latency параметры).

Группа 8: Stream-only правила

Эти правила работают только для streaming pipeline:

MiniBatchIntervalInferRule:
  Анализирует sink retract behaviour, выставляет mini-batch interval
  для downstream-агрегаций

DeduplicateLastRowRule:
  ROW_NUMBER() OVER (PARTITION BY k ORDER BY t DESC) FETCH FIRST 1 ROW
  -> StreamPhysicalDeduplicate operator

DeltaJoinRule (Flink 2.1+):
  Streaming joins, где обе стороны имеют primary key и часто обновляются
  -> DeltaJoin operator с минимальным state footprint

MultiJoinRule (Flink 2.2):
  Каскадные joins без intermediate state
  -> MultiJoin operator

Подробнее об этих оптимизациях — в уроках 4 и 5 этого модуля.

Группа 9: Batch-only правила

JoinPushTransitivePredicatesRule:
  Если a.x = b.x AND b.x = 5, то можно вывести a.x = 5 и push в источник

JoinDistributionRule:
  Выбирает PARTITIONED vs BROADCAST distribution для join
  Cost-based

EnumerableJoinConverterRule (стандартный Calcite, не используется Flink):
  Только в Calcite Enumerable convention

Flink batch:
  BatchPhysicalJoinRule — выбор HashJoin / SortMergeJoin / NestedLoopJoin
  BatchPhysicalAggregateRule — HashAggregate / SortAggregate

Rule-based (HepPlanner) vs Cost-based (VolcanoPlanner)

Aspect              HepPlanner                  VolcanoPlanner

Strategy            Apply rules in fixed order  Search alternatives, pick cheapest
Memoization         Replace in-place            Keep all variants in RelSet
Cost model          Not used                    Required
Determinism         Always same plan            Same plan for same statistics
Speed               Fast (linear in rules)      Slow (exponential in worst case)
Use case            Required normalizations,    Choosing algorithms,
                    rewriting non-negotiables   join ordering, physical opts
Rule trigger        once per rule per node      whenever subtree changes

Flink использует обоих в разных фазах:

Phase 1 (Pre-Opt):           HepPlanner — обязательная нормализация
Phase 2 (Decorrelation):     HepPlanner — детерминированное переписывание
Phase 3 (Logical opt):       VolcanoPlanner — поиск + join reorder
Phase 4 (Time material.):    HepPlanner — Flink-specific обязательно
Phase 5 (Physical conv.):    VolcanoPlanner — выбор физических узлов
Phase 6 (Physical rewrite):  HepPlanner — финальные оптимизации
Phase 7 (Post-opt):          HepPlanner — sink wrapping, watermarks

Как влиять на план

1. Hints — внутри SQL
   SELECT /*+ USE_HASH(a, b) */ ... — выбрать join algorithm

2. Configs — глобально через TableConfig
   table.optimizer.join-reorder-enabled = true/false
   table.optimizer.distinct-agg.split.enabled = true
   table.exec.mini-batch.enabled = true
   table.optimizer.bushy-tree-join-reorder-threshold = 12

3. Statistics — через ANALYZE TABLE или Catalog
   Без статистик cost-based оптимизатор работает по default'ам

4. Rewriting SQL — последний шанс
   Если SQL получается плохо, попробовать переписать
   (вынести подзапрос как CTE, переупорядочить joins вручную)

5. Compiled plans — заморозить лучший план
   COMPILE PLAN, EXECUTE PLAN — план не меняется между апгрейдами
Поток правил оптимизатора по фазам
Logical RelNodeСвежий Logical RelNode tree после SqlToRelConverter. Конвенция None.
Phase 1: Hep
After Pre-OptSubQueryRemoveRule, ReduceExpressionsRule, FilterProjectTranspose. Обязательная нормализация.
DecorrelatedRelDecorrelator превращает LogicalCorrelate в LogicalJoin. Может выкинуть exception если decorrelation невозможен.
Phase 3: Volcano
Logical OptimizedPredicate/projection pushdown, join reorder, constant folding. Cost-based выбор порядка joins. Может занять секунды для сложных запросов.
Time materializedHepPlanner: time materialization, выбор rowtime/proctime семантики, sink rewriting для retract streams.
Phase 5: Volcano
Physical RelNodeLogical -> Physical conversion. Convention Convention.NONE становится STREAM_PHYSICAL или BATCH_PHYSICAL. Choosing join algorithm by cost.
Physical RewritesPhase 6: MiniBatch, TwoPhaseAgg, DeltaJoin, MultiJoin, deduplicate. Финальные физические оптимизации.
Phase 7: Hep
Final PlanWatermark assigner placement, ChangelogNormalize insertion, sink wrapping. Финальный план, готовый для ExecNode generation.

Кейс: один SQL — три плана

Один и тот же SQL может дать существенно разные планы в зависимости от:

SELECT customer_id, COUNT(*) AS orders_cnt
FROM orders
WHERE country = 'US' AND order_date > DATE '2025-01-01'
GROUP BY customer_id;
Stream mode (default):
  StreamPhysicalSink
    StreamPhysicalGroupAggregate (group by customer_id, COUNT)
      StreamPhysicalExchange (hash by customer_id)
        StreamPhysicalCalc (filter, project customer_id)
          StreamPhysicalTableSourceScan (orders, no filter pushdown)

Stream + MiniBatch enabled:
  StreamPhysicalSink
    StreamPhysicalGlobalGroupAggregate (group by customer_id, SUM(local_cnt))
      StreamPhysicalExchange (hash by customer_id)
        StreamPhysicalLocalGroupAggregate (group by customer_id, COUNT AS local_cnt)
          StreamPhysicalCalc
            StreamPhysicalTableSourceScan

Batch mode (Paimon/Iceberg source):
  BatchPhysicalSink
    BatchPhysicalHashAggregate (group by customer_id, COUNT)
      BatchPhysicalExchange (hash by customer_id)
        BatchPhysicalCalc
          BatchPhysicalTableSourceScan(orders WITH filter, projection)
            -- filter и projection pushdown в Paimon
            -- file pruning через manifests

Чтение source

Calcite rule base:
  org.apache.calcite.plan.RelOptRule  -- base class
  org.apache.calcite.rel.rules        -- стандартные правила (200+)
    FilterJoinRule, ProjectMergeRule, AggregateProjectMergeRule, ...

Flink streaming rules:
  flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/
    logical/                  -- общие правила для stream + batch
    physical/stream/          -- stream-only
    physical/batch/           -- batch-only

  ScalaCase classes: FlinkStreamRuleSets, FlinkBatchRuleSets

  Конкретные интересные:
    PushFilterIntoTableSourceScanRule
    PushProjectIntoTableSourceScanRule
    TwoStageOptimizedAggregateRule
    SplitAggregateRule
    SubQueryDecorrelator
    StreamPhysicalDeltaJoinRule  (Flink 2.1+)
    StreamPhysicalMultiJoinRule  (Flink 2.2)

Когда самим писать правила

Это редкий, но legitimate сценарий — extension для специального connector’а или custom function:

1. Custom source с особыми pushdown возможностями
   -> implement SupportsFilterPushDown, SupportsProjectionPushDown
   -> Flink правила сделают остальное автоматически

2. Custom physical operator (например, native execution)
   -> Создать новый Convention, RelNode subclass, RelOptRule

3. Domain-specific оптимизации
   -> Например, для geo-data: rewrite ST_Distance(...) < r в bounding box predicate
   -> Реализовать как HepPlanner rule

В 95% случаев пишут не свои правила, а используют hints и configs. Свои правила — для библиотек/коннекторов, не для бизнес-логики.

Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Что делает SubQueryRemoveRule с подзапросом WHERE col IN (SELECT ...)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5