Learning Platform
Глоссарий Troubleshooting
Урок 10.04 · 25 мин
Продвинутый
Stream ModeBatch ModeRetract StreamUpsert StreamChangelogModeJoin AlgorithmsMiniBatch

Stream vs batch — два мира под одним SQL

Flink — один из немногих движков, где stream и batch обрабатываются единым SQL planner. Это unifying selling point: один запрос работает и для исторических данных (batch), и для real-time потока (stream). Но внутри планер использует разные наборы правил и разные физические операторы. Стрим и бэтч живут вместе, но почти не пересекаются.

Правила оптимизации в Catalyst (Spark) Dynamic Tables и changelog в Flink SQL

В этом уроке мы разбираемся, где Flink делит мир на stream и batch, какие правила специфичны для каждого режима, и почему один и тот же SQL дает совершенно разные физические планы.

Откуда берутся два режима

Mode определяется на уровне TableEnvironment:

// Stream mode
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Batch mode
TableEnvironment tEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inBatchMode().build()
);

// Unified (1.14+) — режим выводится из source
TableEnvironment tEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inStreamingMode().build()
);

В runtime эти режимы используют разные планеры:

StreamTableEnvironment -> StreamPlanner
TableEnvironment(BATCH) -> BatchPlanner

Оба расширяют PlannerBase, но:
  - Разные RelTrait.Convention (STREAM_PHYSICAL vs BATCH_PHYSICAL)
  - Разные наборы правил (FlinkStreamRuleSets vs FlinkBatchRuleSets)
  - Разные физические операторы (StreamExecXxx vs BatchExecXxx)
  - Разные ExecNode классы

После Phase 5 оптимизатора (physical conversion) узлы попадают либо в STREAM_PHYSICAL, либо в BATCH_PHYSICAL. Конвенции взаимоисключающи — не бывает узла, который и stream, и batch.

Концептуальная разница: что значит “запрос”

Batch SQL семантика:
  - Дано: фиксированный, конечный набор строк
  - Результат: запрос вычисляется один раз, выдает финальный ответ
  - Optimization: оптимизатор знает row count и может cost-based выбирать
  - Operators: блокирующие OK (сортировка, hash join build)

Stream SQL семантика:
  - Дано: бесконечный поток событий, может содержать обновления и удаления
  - Результат: запрос непрерывно обновляет результат
  - Optimization: cardinality unknown, статистики ограничены
  - Operators: must быть incremental, non-blocking

Стрим SQL имеет специальную семантику: каждая запись может быть insert, update (изменение существующей записи) или delete. Это называется changelog.

Changelog modes

ChangelogMode:
  INSERT_ONLY              -- только добавление (append-only)
  RETRACT                  -- удаление + добавление (полное представление update)
  UPSERT                   -- update_before optional, требуется primary key
  ALL                      -- любая комбинация

Поток как changelog:
  INSERT_ONLY:
    +I (user=A, val=1)
    +I (user=B, val=2)
    +I (user=A, val=3)

  RETRACT (например, поток после GROUP BY):
    +I (user=A, count=1)
    -U (user=A, count=1)   -- retraction (cancellation предыдущего)
    +U (user=A, count=2)   -- new value
    +I (user=B, count=1)

  UPSERT (sink на материализованную таблицу):
    +I (user=A, count=1)   -- insert
    +U (user=A, count=2)   -- update (по PK user)
    +I (user=B, count=1)

Stream SQL planner должен решить, в каком changelog mode каждый промежуточный узел работает. От этого зависит:

  • Какие операторы можно использовать.
  • Какой sink можно подключить (sink заявляет, какой changelog принимает).
  • Нужны ли промежуточные ChangelogNormalize узлы.

Stream-only правила (retract management)

ChangelogNormalize

Когда source поставляет UPSERT поток (например, CDC из Debezium), а downstream нужен RETRACT — нужно вставить ChangelogNormalize. Этот оператор хранит state с последним известным значением каждой строки по PK и эмитит retraction + new value:

UPSERT input:           ChangelogNormalize state:    RETRACT output:
                        (PK -> last row)

+I (id=1, v=A)         {1 -> (A)}                   +I (id=1, v=A)
+U (id=1, v=B)         {1 -> (B)}                   -U (id=1, v=A)
                                                     +U (id=1, v=B)
+I (id=2, v=C)         {1 -> (B), 2 -> (C)}         +I (id=2, v=C)

State = таблица из last-known-version всех ключей. Размер state равен числу уникальных PK. Может быть огромным для долгоживущего стрима.

Правило: StreamPhysicalChangelogNormalizeRule
  Добавляет ChangelogNormalize, когда:
    - upstream changelog mode = UPSERT
    - downstream требует RETRACT
  ChangelogNormalize требует PK на source — иначе ошибка.

Retract aggregations

StreamPhysicalGroupAggregate поддерживает RETRACT входов. Если на вход прилетает retraction предыдущего значения, агрегат должен “откатить” вычисление:

SUM с retract support:
  state = {sum: long}

  on +I (val=10):  state.sum += 10
  on +U (val=15) which retracts +I (val=10):
    state.sum -= 10   -- retract
    state.sum += 15   -- new

Для не-retract sources (insert-only) можно использовать оптимизированный append-only aggregate без retract handling. Это правило StreamPhysicalAppendOnlyGroupAggregateRule.

Deduplicate operator

ROW_NUMBER() OVER (PARTITION BY k ORDER BY t DESC) FETCH FIRST 1 ROW — типичный паттерн для “взять последнее значение по ключу”. В стриме это превращается не в обычный Sort + Limit (это были бы blocking операторы), а в специальный StreamPhysicalDeduplicate:

SQL:
  SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY user ORDER BY ts DESC) AS rn
    FROM events
  ) WHERE rn = 1;

Stream физический план:
  StreamPhysicalDeduplicate (key=user, mode=keepLastRow)
    StreamPhysicalExchange (hash by user)
      StreamPhysicalTableSourceScan(events)

State per key: единственная последняя строка.
Когда приходит новая строка с большим ts — retract старую, emit новую.

Правило: DeduplicateLastRowRule
  Распознает ROW_NUMBER + FETCH FIRST паттерн.

MiniBatch

MiniBatch — оптимизация для streaming аггрегаций. Без неё каждая входящая запись триггерит state lookup + update + emit. С MiniBatch несколько записей буферизуются и обрабатываются батчем.

Без MiniBatch:
  per record:
    state.get(key)        -- RocksDB read (10 µs)
    update value
    state.put(key, value) -- RocksDB write (5 µs)
    emit                  -- send downstream (1 µs)

С MiniBatch (size=1000):
  буфер 1000 records
  group by key in buffer (in-memory)
  для каждого уникального ключа:
    state.get(key)        -- 1 read
    apply all updates
    state.put(key, value) -- 1 write
    emit final result     -- 1 emit
  Эффект: до 10x throughput для high-skew workloads

Включается:

table.exec.mini-batch.enabled = true
table.exec.mini-batch.allow-latency = '2s'
table.exec.mini-batch.size = 5000

Правила:

TwoStageOptimizedAggregateRule:
  - Разделяет агрегат на local + global
  - local: per parallelism subtask
  - global: per key после shuffle
  - Local working с MiniBatch буфером
  - Global нужен для финального ответа

SplitAggregateRule:
  - Для distinct aggregates (COUNT DISTINCT)
  - Разбивает на bucket + final
  - Решает проблему data skew по ключу

Window aggregate

Время — first-class concept в стриме. Window aggregates имеют свои правила:

WindowAggregateRule:
  TUMBLE/HOP/SESSION/CUMULATE table functions
  -> StreamPhysicalWindowAggregate operator

Watermark management:
  WatermarkAssigner placement
  Late events handling
  Window emit strategy

Window join:
  StreamPhysicalWindowJoin
  Только для events в одинаковом окне

Window операторы используют WindowTriggerStrategy, EventTimeSemantics — это собственный мир в Flink.

Batch-only правила (cost-based algorithm selection)

Выбор join algorithm

В batch у вас есть полный набор алгоритмов join, каждый со своими trade-off:

HashJoin (StreamingHashJoin в batch):
  Build side: меньшая таблица, hash в memory
  Probe side: большая таблица, scan + probe
  Memory: O(build_size)
  Time: O(build + probe)
  Best when: build fits in memory

SortMergeJoin:
  Обе стороны сортируются по join key (если ещё не отсортированы)
  Merge phase: walking simultaneously
  Memory: O(1) если sorted, O(N) для sort
  Time: O(N log N + M log M + N + M)
  Best when: large data, есть ordering или индекс

NestedLoopJoin:
  Inner cross product с условием
  Memory: O(1)
  Time: O(N × M)
  Best when: маленькие таблицы, non-equi conditions

BroadcastHashJoin:
  Меньшая таблица broadcast на все subtasks
  Большая таблица — без shuffle
  Memory: O(broadcast_size) на каждой subtask
  Time: O(probe per subtask)
  Best when: одна таблица очень маленькая (~MB), вторая огромная

В streaming доступны только HashJoin (через RocksDB state) и interval join. SortMerge невозможен — поток не отсортирован.

Batch правила:

JoinDistributionStrategyRule:
  Решает: PARTITIONED (обе стороны shuffle by key)
          или BROADCAST (меньшая broadcast)
  Cost-based по сравнению network cost

JoinExecStrategyRule:
  После distribution выбирает HashJoin / SortMergeJoin / NestedLoopJoin
  Cost-based по сравнению с memory budget

JoinReorderRule (общая, но cost очень разный):
  В batch known cardinalities -> чёткое join reordering
  В stream cardinality unknown -> часто не работает

Aggregate algorithm

HashAggregate:
  Hash table in memory: key -> aggregator state
  Streaming output: всё in-memory или spill to disk
  Best when: умеренное число groups

SortAggregate:
  Сначала sort by group key, потом scan with state per group
  Memory: O(1) после sort
  Best when: огромное число groups, не помещается в hash table

Правило BatchExecAggregateRule выбирает на основе оценки числа групп и memory budget.

Source pushdown — full power

Batch sources почти всегда поддерживают:

  • SupportsFilterPushDown — predicates как WHERE в Parquet/Iceberg.
  • SupportsProjectionPushDown — только нужные колонки.
  • SupportsPartitionPushDown — partition pruning (Hive partitions, Iceberg partition specs).
  • SupportsAggregatePushDown — некоторые aggregations выполняются в источнике (например, JDBC).
  • SupportsLimitPushDown — LIMIT в источник.

В streaming большинство этих оптимизаций не применимы — нет static partitions, цикл бесконечный.

Различия конкретных операторов

Stream vs Batch — физические операторы
Operation
Stream operator
Batch operator

Specific: stream join с RocksDB state

В стриме у join нет понятия “build side fits in memory”. Обе стороны бесконечны. Решение — обе стороны хранятся в state (RocksDB или ForStDB):

Stream regular join (INNER):
  Left state:  RocksDB MapState<JoinKey, List<LeftRow>>
  Right state: RocksDB MapState<JoinKey, List<RightRow>>

  On left record:
    1. state.put(key, currentLeftRows + thisRow)
    2. lookup right state by key
    3. for each right row matching: emit join result

  On right record:
    1. state.put(key, currentRightRows + thisRow)
    2. lookup left state by key
    3. for each left row matching: emit join result

State growth = ∞ (без TTL). Memory bound by ключи.

Это критично: stream regular join без TTL = unbounded state, в итоге OOM.

Solution 1: state TTL
  /*+ STATE_TTL('orders' = '12h', 'payments' = '24h') */
  SELECT ... FROM orders JOIN payments ON ...
  -- старые записи удаляются автоматически

Solution 2: interval join (event time)
  SELECT *
  FROM orders o JOIN payments p
  ON o.order_id = p.order_id
  AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR
  -- State bound by interval width

Solution 3: temporal join (для dimension data)
  SELECT *
  FROM orders o JOIN dim FOR SYSTEM_TIME AS OF o.ts ON o.k = dim.k
  -- Lookup join, dim state управляется connector'ом

Solution 4: DeltaJoin (Flink 2.1+, default 2.2+)
  -- См. урок 5 этого модуля

Specific: stream aggregate emit strategy

В batch aggregate эмитит результат один раз, после обработки всего входа. В стриме результат меняется со временем, и emit strategy критична:

EmitStrategy для streaming aggregate:

1. CONTINUOUS (default для GROUP BY):
   Эмит обновления как только меняется агрегат
   Для каждого ключа: +I (initial) -> -U (retract) -> +U (new) ...
   High network/sink load

2. FINAL_ONLY (для window aggregate):
   Эмит только финальный результат, когда watermark прошёл конец окна
   Один emit per window per key
   Low load, но increased latency

3. EARLY (early firing):
   Эмит preliminary результат до конца окна
   Контролируется через TableConfig.WINDOW_ALLOW_RETRACT
   Trade-off: latency vs network

4. LATE (allowed lateness):
   Если приходит late event после window close, retract + emit corrected
   Контролируется через TableConfig

Правила emit strategy в StreamPhysicalGroupWindowAggregateRule и связанных.

Specific: batch dynamic partition pruning

Batch имеет уникальную оптимизацию — Dynamic Partition Pruning (DPP):

SELECT *
FROM fact_orders f JOIN dim_dates d ON f.date_id = d.id
WHERE d.year = 2025;

Без DPP:
  Read all partitions of fact_orders
  Join with filtered dim_dates
  
С DPP:
  Read filtered dim_dates -> determine d.id values for 2025
  Broadcast d.id list back to fact_orders source
  Read only partitions of fact_orders matching d.id
  Then join

DPP — это runtime партиционирующий pushdown. Включается через table.optimizer.dynamic-filtering.enabled = true. В streaming концептуально не работает — нет фиксированных partitions для prune.

Unified API ловушки

Один SQL может работать иначе:

SELECT user_id, COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(rowtime, INTERVAL '1' HOUR);

В batch (tableEnv.from("clicks_paimon")):

  • Один проход по данным, фильтр окон, group by, emit финального результата
  • Operations blocking
  • Cardinality known

В stream (tableEnv.from("clicks_kafka")):

  • WindowAggregate с триггером по watermark
  • State: per (user_id, window_start) -> count
  • Emit когда watermark >= window_end
  • Late events handling
  • Operations non-blocking, incremental

Один SQL -> две разные runtime реализации. Это сила и сложность unified API.

WARNING

Streaming SQL не поддерживает все то, что batch. ORDER BY без time-bound, OFFSET без top-N, polynomial-time joins — недоступно. Если SQL компилируется в batch но падает в stream — это первая причина. Обычно решение: переписать запрос через window operations или CEP pattern matching.

Где смотреть на код правил

flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/
  logical/                                  -- shared logical rules
    FlinkAggregateRemoveRule
    FlinkAggregateRule
    SubQueryDecorrelator
  physical/stream/                          -- stream-only physical rules
    StreamPhysicalGroupAggregateRule
    StreamPhysicalChangelogNormalizeRule
    StreamPhysicalDeduplicateRule
    StreamPhysicalIntervalJoinRule
    StreamPhysicalTemporalJoinRule
    StreamPhysicalDeltaJoinRule
    StreamPhysicalMultiJoinRule
    StreamPhysicalWindowAggregateRule
    StreamPhysicalGlobalGroupAggregateRule
    StreamPhysicalLocalGroupAggregateRule
  physical/batch/
    BatchPhysicalHashJoinRule
    BatchPhysicalSortMergeJoinRule
    BatchPhysicalNestedLoopJoinRule
    BatchPhysicalHashAggregateRule
    BatchPhysicalSortAggregateRule
    BatchPhysicalDynamicFilteringTableSourceScanRule

Rule sets aggregators:
  FlinkStreamRuleSets.scala  -- какие правила в каких фазах для stream
  FlinkBatchRuleSets.scala   -- то же для batch
Decision point: EnvironmentSettings.inStreamingMode()/inBatchMode()
  -> StreamPlanner или BatchPlanner

Внутри планера:
  -> Использует STREAM_PHYSICAL или BATCH_PHYSICAL conventions
  -> Применяет FlinkStreamRuleSets или FlinkBatchRuleSets
  -> Использует разные cost models

Что не зависит от mode:
  - SQL parser (тот же)
  - Validator (тот же)
  - SqlToRelConverter (тот же)
  - Logical optimization rules (большинство shared)
  - Calcite framework (тот же)

Что зависит от mode:
  - Phase 4-7 оптимизации (physical conversion onwards)
  - Все физические операторы (StreamExecXxx vs BatchExecXxx)
  - Codegen некоторые details (state vs no-state)
  - Sink decision (какой ChangelogMode acceptable)

Streaming vs batch — checklist при дизайне SQL

Spawn streaming jobs:
  Watermark strategy?           -- если есть time-based agg
  State TTL для joins?          -- предотвращение unbounded state
  Idempotent sink?              -- handling retractions
  MiniBatch enabled?            -- throughput optimization
  Compiled plan saved?          -- защита от plan drift

Spawn batch jobs:
  Statistics свежие?            -- для cost-based optimization
  Partition columns?            -- для DPP
  Broadcast threshold?          -- для broadcast join trigger
  Spill-to-disk memory?         -- для huge joins/aggs
  Resource manager?             -- YARN/K8s/Standalone
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие join algorithms доступны в streaming режиме и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5