Dynamic Tables и changelog
Когда вы пишете SELECT count(*) FROM clicks GROUP BY user_id в обычном Postgres, запрос отрабатывает один раз: сканирует таблицу, агрегирует, возвращает результат. В Flink SQL та же самая строка делает нечто принципиально иное. Запрос запускается один раз и работает бесконечно — каждое новое событие в топике clicks мгновенно обновляет результат. Источник никогда не заканчивается, и выходной “результат” — это не статичный набор строк, а поток обновлений.
Чтобы такое было математически корректно, Flink опирается на концепцию dynamic table. Это и есть тот мостик, который связывает декларативный SQL с потоковой моделью. В этом уроке мы разберём, что такое dynamic table, какие изменения она представляет (+I, -U, +U, -D), и почему continuous query на dynamic table — это просто SQL на бесконечном входе.
От static к dynamic
Классическая реляционная таблица — это snapshot: множество строк в момент времени. Запрос SELECT * FROM users возвращает то, что лежит в таблице сейчас. Через секунду таблица может измениться, но это уже другой запрос.
Flink меняет правила. Dynamic table — это таблица, которая меняется во времени. Формально: dynamic table в момент t — это snapshot, который получится, если применить все изменения от начала до t. Через секунду snapshot будет другой, через минуту — третий.
Источник изменений — это changelog stream. Каждое событие в changelog описывает дельту: какая строка появилась, какая обновилась, какая исчезла. Dynamic table и changelog stream — это дуальные представления одних и тех же данных. По changelog можно восстановить таблицу в любой момент. По двум snapshot’ам таблицы можно восстановить дельту.
Эта дуальность не уникальна для Flink. Точно та же идея лежит в основе Kafka Streams (KStream vs KTable), Materialize, RisingWave и Spark Structured Streaming. Все современные SQL-streaming движки строятся на dynamic tables.
Четыре типа изменений
Изменения в changelog кодируются четырьмя event types:
+I(insert) — новая строка появилась в таблице.-U(update before) — старое значение строки перед обновлением.+U(update after) — новое значение строки после обновления.-D(delete) — строка удалена из таблицы.
Update — это пара событий -U/+U, идущих подряд. Это нужно, чтобы downstream-оператор мог корректно “откатить” агрегат: если предыдущее значение баланса было 100, а новое — 120, то sum должен сначала вычесть 100, потом прибавить 120. Без -U downstream не знал бы, что именно вычитать.
Snapshot: alice=2
Snapshot dynamic table после применения всех событий: одна запись (alice=2). bob удалён, alice обновлена до 2.Append-only vs upsert vs retract
В зависимости от запроса и типа источника, dynamic table может работать в разных режимах:
Append-only — таблица только растёт. Никаких -U, -D. Это самый простой случай: Kafka топик с raw-событиями, fact table в DWH. Stateless-операции (filter, projection) сохраняют append-only режим. Большинство sink-коннекторов нативно поддерживают только append-only.
Upsert — таблица обновляется по primary key. События +I для нового ключа, +U для существующего, -D для удаления. Семантика близка к key-value store: один ключ — одна актуальная строка. Источники с upsert-семантикой: CDC-стримы (MySQL CDC), агрегации GROUP BY pk, upsert-Kafka.
Retract — полный changelog с -U/+U парами. Самый общий, но и самый дорогой по объёму трафика и сложности downstream-операторов. Возникает в агрегациях без primary key, в outer join’ах, в windowing.
Когда выбираете sink-коннектор, всегда проверяйте, какой режим он поддерживает. JDBC sink с PRIMARY KEY умеет upsert. Iceberg/Paimon — upsert и append. Kafka обычный — только append; для upsert нужен upsert-kafka коннектор. Если ваш запрос производит retract changelog, а sink умеет только append — Flink выкинет ошибку на этапе планирования.
Continuous query
Запрос на dynamic table называется continuous query. По определению — это запрос, который никогда не завершается. Он подписан на изменения входной dynamic table и инкрементально обновляет выходную dynamic table.
-- Source: append-only поток кликов из Kafka
CREATE TABLE clicks (
user_id STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Continuous query: подсчёт кликов по пользователю
SELECT user_id, COUNT(*) AS cnt
FROM clicks
GROUP BY user_id;
Этот запрос порождает retract-changelog: для каждого нового клика Alice сначала отправляется -U (alice, N), потом +U (alice, N+1). Если запустить тот же SELECT в Postgres, мы получим один снэпшот. В Flink — бесконечную последовательность обновлений.
Важное свойство continuous query: результат семантически эквивалентен тому, что получился бы из batch-запроса на снэпшоте источника в момент t. Это и есть унификация streaming/batch: один SQL — две модели исполнения. Если вы понимаете, что вернёт обычный GROUP BY в SQL, вы понимаете, что вернёт continuous query в Flink.
Сравнение с другими streaming SQL движками
| Движок | Модель | Changelog | Особенности |
|---|---|---|---|
| Flink Table/SQL | Dynamic tables | +I/-U/+U/-D | Унификация streaming/batch, retract/upsert/append |
| Kafka Streams | KStream/KTable | Update (для KTable) | Низкоуровневый DSL, без full SQL |
| ksqlDB | Pull/Push queries | INSERT/UPDATE | SQL надстройка над Kafka Streams |
| Spark Structured Streaming | Micro-batch DataFrame | Append/Update/Complete output modes | Micro-batch (1s+), не event-at-a-time |
| Materialize | Differential dataflow | Multiset с timestamps | Incremental view maintenance, sub-second latency |
| RisingWave | Materialized views | Postgres-совместимый CDC | PostgreSQL wire-protocol |
Flink stands out двумя вещами: (1) первоклассная unified batch/streaming семантика — один и тот же SQL работает и в batch-режиме, и в streaming; (2) zero-CDC-loss поддержка retract-changelog для сложных запросов вроде outer join.
SQL и DataStream — мосты
DataStream API оперирует низкоуровневыми событиями: DataStream<Click>. Table API/SQL оперирует dynamic tables. Между ними есть мосты:
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// DataStream -> Table (append-only)
DataStream<Click> clicks = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "clicks");
Table clicksTable = tEnv.fromDataStream(clicks);
// Table -> DataStream (только если результат append-only)
DataStream<Row> result = tEnv.toDataStream(clicksTable);
// Table -> DataStream с changelog (retract или upsert)
DataStream<Row> changelog = tEnv.toChangelogStream(aggregatedTable);
Detail в уроке 5 этого модуля. Главное запомнить: toDataStream работает только для append-only, для retract/upsert используется toChangelogStream.
Попробуй сам
- Опиши, какой changelog породит запрос
SELECT MAX(ts) FROM clicks GROUP BY user_idесли на вход приходят клики(alice, t1),(alice, t2),(bob, t3). - Подумай, может ли запрос
SELECT * FROM clicks WHERE ts > NOW() - INTERVAL '1' MINUTEбыть выражен в Flink. Какие будут проблемы сNOW()в streaming-контексте? - Возьми реальную бизнес-задачу (например, “топ-10 продуктов за последний час”) и попробуй сформулировать, какие dynamic tables вам нужны и какой будет changelog режим у итоговой.