Stream vs batch — два мира под одним SQL
Flink — один из немногих движков, где stream и batch обрабатываются единым SQL planner. Это unifying selling point: один запрос работает и для исторических данных (batch), и для real-time потока (stream). Но внутри планер использует разные наборы правил и разные физические операторы. Стрим и бэтч живут вместе, но почти не пересекаются.
Правила оптимизации в Catalyst (Spark) Dynamic Tables и changelog в Flink SQLВ этом уроке мы разбираемся, где Flink делит мир на stream и batch, какие правила специфичны для каждого режима, и почему один и тот же SQL дает совершенно разные физические планы.
Откуда берутся два режима
Mode определяется на уровне TableEnvironment:
// Stream mode
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Batch mode
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().inBatchMode().build()
);
// Unified (1.14+) — режим выводится из source
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().inStreamingMode().build()
);
В runtime эти режимы используют разные планеры:
StreamTableEnvironment -> StreamPlanner
TableEnvironment(BATCH) -> BatchPlanner
Оба расширяют PlannerBase, но:
- Разные RelTrait.Convention (STREAM_PHYSICAL vs BATCH_PHYSICAL)
- Разные наборы правил (FlinkStreamRuleSets vs FlinkBatchRuleSets)
- Разные физические операторы (StreamExecXxx vs BatchExecXxx)
- Разные ExecNode классы
После Phase 5 оптимизатора (physical conversion) узлы попадают либо в STREAM_PHYSICAL, либо в BATCH_PHYSICAL. Конвенции взаимоисключающи — не бывает узла, который и stream, и batch.
Концептуальная разница: что значит “запрос”
Batch SQL семантика:
- Дано: фиксированный, конечный набор строк
- Результат: запрос вычисляется один раз, выдает финальный ответ
- Optimization: оптимизатор знает row count и может cost-based выбирать
- Operators: блокирующие OK (сортировка, hash join build)
Stream SQL семантика:
- Дано: бесконечный поток событий, может содержать обновления и удаления
- Результат: запрос непрерывно обновляет результат
- Optimization: cardinality unknown, статистики ограничены
- Operators: must быть incremental, non-blocking
Стрим SQL имеет специальную семантику: каждая запись может быть insert, update (изменение существующей записи) или delete. Это называется changelog.
Changelog modes
ChangelogMode:
INSERT_ONLY -- только добавление (append-only)
RETRACT -- удаление + добавление (полное представление update)
UPSERT -- update_before optional, требуется primary key
ALL -- любая комбинация
Поток как changelog:
INSERT_ONLY:
+I (user=A, val=1)
+I (user=B, val=2)
+I (user=A, val=3)
RETRACT (например, поток после GROUP BY):
+I (user=A, count=1)
-U (user=A, count=1) -- retraction (cancellation предыдущего)
+U (user=A, count=2) -- new value
+I (user=B, count=1)
UPSERT (sink на материализованную таблицу):
+I (user=A, count=1) -- insert
+U (user=A, count=2) -- update (по PK user)
+I (user=B, count=1)
Stream SQL planner должен решить, в каком changelog mode каждый промежуточный узел работает. От этого зависит:
- Какие операторы можно использовать.
- Какой sink можно подключить (sink заявляет, какой changelog принимает).
- Нужны ли промежуточные ChangelogNormalize узлы.
Stream-only правила (retract management)
ChangelogNormalize
Когда source поставляет UPSERT поток (например, CDC из Debezium), а downstream нужен RETRACT — нужно вставить ChangelogNormalize. Этот оператор хранит state с последним известным значением каждой строки по PK и эмитит retraction + new value:
UPSERT input: ChangelogNormalize state: RETRACT output:
(PK -> last row)
+I (id=1, v=A) {1 -> (A)} +I (id=1, v=A)
+U (id=1, v=B) {1 -> (B)} -U (id=1, v=A)
+U (id=1, v=B)
+I (id=2, v=C) {1 -> (B), 2 -> (C)} +I (id=2, v=C)
State = таблица из last-known-version всех ключей. Размер state равен числу уникальных PK. Может быть огромным для долгоживущего стрима.
Правило: StreamPhysicalChangelogNormalizeRule
Добавляет ChangelogNormalize, когда:
- upstream changelog mode = UPSERT
- downstream требует RETRACT
ChangelogNormalize требует PK на source — иначе ошибка.
Retract aggregations
StreamPhysicalGroupAggregate поддерживает RETRACT входов. Если на вход прилетает retraction предыдущего значения, агрегат должен “откатить” вычисление:
SUM с retract support:
state = {sum: long}
on +I (val=10): state.sum += 10
on +U (val=15) which retracts +I (val=10):
state.sum -= 10 -- retract
state.sum += 15 -- new
Для не-retract sources (insert-only) можно использовать оптимизированный append-only aggregate без retract handling. Это правило StreamPhysicalAppendOnlyGroupAggregateRule.
Deduplicate operator
ROW_NUMBER() OVER (PARTITION BY k ORDER BY t DESC) FETCH FIRST 1 ROW — типичный паттерн для “взять последнее значение по ключу”. В стриме это превращается не в обычный Sort + Limit (это были бы blocking операторы), а в специальный StreamPhysicalDeduplicate:
SQL:
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY user ORDER BY ts DESC) AS rn
FROM events
) WHERE rn = 1;
Stream физический план:
StreamPhysicalDeduplicate (key=user, mode=keepLastRow)
StreamPhysicalExchange (hash by user)
StreamPhysicalTableSourceScan(events)
State per key: единственная последняя строка.
Когда приходит новая строка с большим ts — retract старую, emit новую.
Правило: DeduplicateLastRowRule
Распознает ROW_NUMBER + FETCH FIRST паттерн.
MiniBatch
MiniBatch — оптимизация для streaming аггрегаций. Без неё каждая входящая запись триггерит state lookup + update + emit. С MiniBatch несколько записей буферизуются и обрабатываются батчем.
Без MiniBatch:
per record:
state.get(key) -- RocksDB read (10 µs)
update value
state.put(key, value) -- RocksDB write (5 µs)
emit -- send downstream (1 µs)
С MiniBatch (size=1000):
буфер 1000 records
group by key in buffer (in-memory)
для каждого уникального ключа:
state.get(key) -- 1 read
apply all updates
state.put(key, value) -- 1 write
emit final result -- 1 emit
Эффект: до 10x throughput для high-skew workloads
Включается:
table.exec.mini-batch.enabled = true
table.exec.mini-batch.allow-latency = '2s'
table.exec.mini-batch.size = 5000
Правила:
TwoStageOptimizedAggregateRule:
- Разделяет агрегат на local + global
- local: per parallelism subtask
- global: per key после shuffle
- Local working с MiniBatch буфером
- Global нужен для финального ответа
SplitAggregateRule:
- Для distinct aggregates (COUNT DISTINCT)
- Разбивает на bucket + final
- Решает проблему data skew по ключу
Window aggregate
Время — first-class concept в стриме. Window aggregates имеют свои правила:
WindowAggregateRule:
TUMBLE/HOP/SESSION/CUMULATE table functions
-> StreamPhysicalWindowAggregate operator
Watermark management:
WatermarkAssigner placement
Late events handling
Window emit strategy
Window join:
StreamPhysicalWindowJoin
Только для events в одинаковом окне
Window операторы используют WindowTriggerStrategy, EventTimeSemantics — это собственный мир в Flink.
Batch-only правила (cost-based algorithm selection)
Выбор join algorithm
В batch у вас есть полный набор алгоритмов join, каждый со своими trade-off:
HashJoin (StreamingHashJoin в batch):
Build side: меньшая таблица, hash в memory
Probe side: большая таблица, scan + probe
Memory: O(build_size)
Time: O(build + probe)
Best when: build fits in memory
SortMergeJoin:
Обе стороны сортируются по join key (если ещё не отсортированы)
Merge phase: walking simultaneously
Memory: O(1) если sorted, O(N) для sort
Time: O(N log N + M log M + N + M)
Best when: large data, есть ordering или индекс
NestedLoopJoin:
Inner cross product с условием
Memory: O(1)
Time: O(N × M)
Best when: маленькие таблицы, non-equi conditions
BroadcastHashJoin:
Меньшая таблица broadcast на все subtasks
Большая таблица — без shuffle
Memory: O(broadcast_size) на каждой subtask
Time: O(probe per subtask)
Best when: одна таблица очень маленькая (~MB), вторая огромная
В streaming доступны только HashJoin (через RocksDB state) и interval join. SortMerge невозможен — поток не отсортирован.
Batch правила:
JoinDistributionStrategyRule:
Решает: PARTITIONED (обе стороны shuffle by key)
или BROADCAST (меньшая broadcast)
Cost-based по сравнению network cost
JoinExecStrategyRule:
После distribution выбирает HashJoin / SortMergeJoin / NestedLoopJoin
Cost-based по сравнению с memory budget
JoinReorderRule (общая, но cost очень разный):
В batch known cardinalities -> чёткое join reordering
В stream cardinality unknown -> часто не работает
Aggregate algorithm
HashAggregate:
Hash table in memory: key -> aggregator state
Streaming output: всё in-memory или spill to disk
Best when: умеренное число groups
SortAggregate:
Сначала sort by group key, потом scan with state per group
Memory: O(1) после sort
Best when: огромное число groups, не помещается в hash table
Правило BatchExecAggregateRule выбирает на основе оценки числа групп и memory budget.
Source pushdown — full power
Batch sources почти всегда поддерживают:
SupportsFilterPushDown— predicates как WHERE в Parquet/Iceberg.SupportsProjectionPushDown— только нужные колонки.SupportsPartitionPushDown— partition pruning (Hive partitions, Iceberg partition specs).SupportsAggregatePushDown— некоторые aggregations выполняются в источнике (например, JDBC).SupportsLimitPushDown— LIMIT в источник.
В streaming большинство этих оптимизаций не применимы — нет static partitions, цикл бесконечный.
Различия конкретных операторов
Specific: stream join с RocksDB state
В стриме у join нет понятия “build side fits in memory”. Обе стороны бесконечны. Решение — обе стороны хранятся в state (RocksDB или ForStDB):
Stream regular join (INNER):
Left state: RocksDB MapState<JoinKey, List<LeftRow>>
Right state: RocksDB MapState<JoinKey, List<RightRow>>
On left record:
1. state.put(key, currentLeftRows + thisRow)
2. lookup right state by key
3. for each right row matching: emit join result
On right record:
1. state.put(key, currentRightRows + thisRow)
2. lookup left state by key
3. for each left row matching: emit join result
State growth = ∞ (без TTL). Memory bound by ключи.
Это критично: stream regular join без TTL = unbounded state, в итоге OOM.
Solution 1: state TTL
/*+ STATE_TTL('orders' = '12h', 'payments' = '24h') */
SELECT ... FROM orders JOIN payments ON ...
-- старые записи удаляются автоматически
Solution 2: interval join (event time)
SELECT *
FROM orders o JOIN payments p
ON o.order_id = p.order_id
AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR
-- State bound by interval width
Solution 3: temporal join (для dimension data)
SELECT *
FROM orders o JOIN dim FOR SYSTEM_TIME AS OF o.ts ON o.k = dim.k
-- Lookup join, dim state управляется connector'ом
Solution 4: DeltaJoin (Flink 2.1+, default 2.2+)
-- См. урок 5 этого модуля
Specific: stream aggregate emit strategy
В batch aggregate эмитит результат один раз, после обработки всего входа. В стриме результат меняется со временем, и emit strategy критична:
EmitStrategy для streaming aggregate:
1. CONTINUOUS (default для GROUP BY):
Эмит обновления как только меняется агрегат
Для каждого ключа: +I (initial) -> -U (retract) -> +U (new) ...
High network/sink load
2. FINAL_ONLY (для window aggregate):
Эмит только финальный результат, когда watermark прошёл конец окна
Один emit per window per key
Low load, но increased latency
3. EARLY (early firing):
Эмит preliminary результат до конца окна
Контролируется через TableConfig.WINDOW_ALLOW_RETRACT
Trade-off: latency vs network
4. LATE (allowed lateness):
Если приходит late event после window close, retract + emit corrected
Контролируется через TableConfig
Правила emit strategy в StreamPhysicalGroupWindowAggregateRule и связанных.
Specific: batch dynamic partition pruning
Batch имеет уникальную оптимизацию — Dynamic Partition Pruning (DPP):
SELECT *
FROM fact_orders f JOIN dim_dates d ON f.date_id = d.id
WHERE d.year = 2025;
Без DPP:
Read all partitions of fact_orders
Join with filtered dim_dates
С DPP:
Read filtered dim_dates -> determine d.id values for 2025
Broadcast d.id list back to fact_orders source
Read only partitions of fact_orders matching d.id
Then join
DPP — это runtime партиционирующий pushdown. Включается через table.optimizer.dynamic-filtering.enabled = true. В streaming концептуально не работает — нет фиксированных partitions для prune.
Unified API ловушки
Один SQL может работать иначе:
SELECT user_id, COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(rowtime, INTERVAL '1' HOUR);
В batch (tableEnv.from("clicks_paimon")):
- Один проход по данным, фильтр окон, group by, emit финального результата
- Operations blocking
- Cardinality known
В stream (tableEnv.from("clicks_kafka")):
- WindowAggregate с триггером по watermark
- State: per (user_id, window_start) -> count
- Emit когда watermark >= window_end
- Late events handling
- Operations non-blocking, incremental
Один SQL -> две разные runtime реализации. Это сила и сложность unified API.
Streaming SQL не поддерживает все то, что batch. ORDER BY без time-bound, OFFSET без top-N, polynomial-time joins — недоступно. Если SQL компилируется в batch но падает в stream — это первая причина. Обычно решение: переписать запрос через window operations или CEP pattern matching.
Где смотреть на код правил
flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/
logical/ -- shared logical rules
FlinkAggregateRemoveRule
FlinkAggregateRule
SubQueryDecorrelator
physical/stream/ -- stream-only physical rules
StreamPhysicalGroupAggregateRule
StreamPhysicalChangelogNormalizeRule
StreamPhysicalDeduplicateRule
StreamPhysicalIntervalJoinRule
StreamPhysicalTemporalJoinRule
StreamPhysicalDeltaJoinRule
StreamPhysicalMultiJoinRule
StreamPhysicalWindowAggregateRule
StreamPhysicalGlobalGroupAggregateRule
StreamPhysicalLocalGroupAggregateRule
physical/batch/
BatchPhysicalHashJoinRule
BatchPhysicalSortMergeJoinRule
BatchPhysicalNestedLoopJoinRule
BatchPhysicalHashAggregateRule
BatchPhysicalSortAggregateRule
BatchPhysicalDynamicFilteringTableSourceScanRule
Rule sets aggregators:
FlinkStreamRuleSets.scala -- какие правила в каких фазах для stream
FlinkBatchRuleSets.scala -- то же для batch
Как Flink принимает решение stream vs batch
Decision point: EnvironmentSettings.inStreamingMode()/inBatchMode()
-> StreamPlanner или BatchPlanner
Внутри планера:
-> Использует STREAM_PHYSICAL или BATCH_PHYSICAL conventions
-> Применяет FlinkStreamRuleSets или FlinkBatchRuleSets
-> Использует разные cost models
Что не зависит от mode:
- SQL parser (тот же)
- Validator (тот же)
- SqlToRelConverter (тот же)
- Logical optimization rules (большинство shared)
- Calcite framework (тот же)
Что зависит от mode:
- Phase 4-7 оптимизации (physical conversion onwards)
- Все физические операторы (StreamExecXxx vs BatchExecXxx)
- Codegen некоторые details (state vs no-state)
- Sink decision (какой ChangelogMode acceptable)
Streaming vs batch — checklist при дизайне SQL
Spawn streaming jobs:
Watermark strategy? -- если есть time-based agg
State TTL для joins? -- предотвращение unbounded state
Idempotent sink? -- handling retractions
MiniBatch enabled? -- throughput optimization
Compiled plan saved? -- защита от plan drift
Spawn batch jobs:
Statistics свежие? -- для cost-based optimization
Partition columns? -- для DPP
Broadcast threshold? -- для broadcast join trigger
Spill-to-disk memory? -- для huge joins/aggs
Resource manager? -- YARN/K8s/Standalone