Learning Platform
Глоссарий Troubleshooting
Урок 13.01 · 20 мин
Средний
Table APISQLDynamic TablesChangelogContinuous Query

Dynamic Tables и changelog

Когда вы пишете SELECT count(*) FROM clicks GROUP BY user_id в обычном Postgres, запрос отрабатывает один раз: сканирует таблицу, агрегирует, возвращает результат. В Flink SQL та же самая строка делает нечто принципиально иное. Запрос запускается один раз и работает бесконечно — каждое новое событие в топике clicks мгновенно обновляет результат. Источник никогда не заканчивается, и выходной “результат” — это не статичный набор строк, а поток обновлений.

Чтобы такое было математически корректно, Flink опирается на концепцию dynamic table. Это и есть тот мостик, который связывает декларативный SQL с потоковой моделью. В этом уроке мы разберём, что такое dynamic table, какие изменения она представляет (+I, -U, +U, -D), и почему continuous query на dynamic table — это просто SQL на бесконечном входе.


От static к dynamic

Классическая реляционная таблица — это snapshot: множество строк в момент времени. Запрос SELECT * FROM users возвращает то, что лежит в таблице сейчас. Через секунду таблица может измениться, но это уже другой запрос.

Flink меняет правила. Dynamic table — это таблица, которая меняется во времени. Формально: dynamic table в момент t — это snapshot, который получится, если применить все изменения от начала до t. Через секунду snapshot будет другой, через минуту — третий.

Источник изменений — это changelog stream. Каждое событие в changelog описывает дельту: какая строка появилась, какая обновилась, какая исчезла. Dynamic table и changelog stream — это дуальные представления одних и тех же данных. По changelog можно восстановить таблицу в любой момент. По двум snapshot’ам таблицы можно восстановить дельту.

NOTE

Эта дуальность не уникальна для Flink. Точно та же идея лежит в основе Kafka Streams (KStream vs KTable), Materialize, RisingWave и Spark Structured Streaming. Все современные SQL-streaming движки строятся на dynamic tables.

KStream и KTable: та же дуальность в Kafka Streams

Четыре типа изменений

Изменения в changelog кодируются четырьмя event types:

  • +I (insert) — новая строка появилась в таблице.
  • -U (update before) — старое значение строки перед обновлением.
  • +U (update after) — новое значение строки после обновления.
  • -D (delete) — строка удалена из таблицы.

Update — это пара событий -U/+U, идущих подряд. Это нужно, чтобы downstream-оператор мог корректно “откатить” агрегат: если предыдущее значение баланса было 100, а новое — 120, то sum должен сначала вычесть 100, потом прибавить 120. Без -U downstream не знал бы, что именно вычитать.

Dynamic table как changelog stream
События приходят в changelog, dynamic table формируется применением событий по порядку
Event 1+I (user=alice, clicks=1). Insert новой строки. До этого таблица была пуста, теперь содержит одну запись.
Event 2+I (user=bob, clicks=1). Вторая insert. Таблица теперь содержит две независимые записи: alice и bob.
Event 3-4-U (alice, 1) + +U (alice, 2). Update: alice кликнула повторно. Старое значение 1 откатывается, новое 2 применяется. Эта пара идёт атомарно — downstream видит её как одну транзакцию.
Event 5-D (bob, 1). Delete. Bob исчезает из таблицы. После этого события dynamic table содержит только alice=2.

Snapshot: alice=2

Snapshot dynamic table после применения всех событий: одна запись (alice=2). bob удалён, alice обновлена до 2.

Append-only vs upsert vs retract

В зависимости от запроса и типа источника, dynamic table может работать в разных режимах:

Append-only — таблица только растёт. Никаких -U, -D. Это самый простой случай: Kafka топик с raw-событиями, fact table в DWH. Stateless-операции (filter, projection) сохраняют append-only режим. Большинство sink-коннекторов нативно поддерживают только append-only.

Upsert — таблица обновляется по primary key. События +I для нового ключа, +U для существующего, -D для удаления. Семантика близка к key-value store: один ключ — одна актуальная строка. Источники с upsert-семантикой: CDC-стримы (MySQL CDC), агрегации GROUP BY pk, upsert-Kafka.

Retract — полный changelog с -U/+U парами. Самый общий, но и самый дорогой по объёму трафика и сложности downstream-операторов. Возникает в агрегациях без primary key, в outer join’ах, в windowing.

TIP

Когда выбираете sink-коннектор, всегда проверяйте, какой режим он поддерживает. JDBC sink с PRIMARY KEY умеет upsert. Iceberg/Paimon — upsert и append. Kafka обычный — только append; для upsert нужен upsert-kafka коннектор. Если ваш запрос производит retract changelog, а sink умеет только append — Flink выкинет ошибку на этапе планирования.


Continuous query

Запрос на dynamic table называется continuous query. По определению — это запрос, который никогда не завершается. Он подписан на изменения входной dynamic table и инкрементально обновляет выходную dynamic table.

-- Source: append-only поток кликов из Kafka
CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'
);

-- Continuous query: подсчёт кликов по пользователю
SELECT user_id, COUNT(*) AS cnt
FROM clicks
GROUP BY user_id;

Этот запрос порождает retract-changelog: для каждого нового клика Alice сначала отправляется -U (alice, N), потом +U (alice, N+1). Если запустить тот же SELECT в Postgres, мы получим один снэпшот. В Flink — бесконечную последовательность обновлений.

Важное свойство continuous query: результат семантически эквивалентен тому, что получился бы из batch-запроса на снэпшоте источника в момент t. Это и есть унификация streaming/batch: один SQL — две модели исполнения. Если вы понимаете, что вернёт обычный GROUP BY в SQL, вы понимаете, что вернёт continuous query в Flink.


Сравнение с другими streaming SQL движками

ДвижокМодельChangelogОсобенности
Flink Table/SQLDynamic tables+I/-U/+U/-DУнификация streaming/batch, retract/upsert/append
Kafka StreamsKStream/KTableUpdate (для KTable)Низкоуровневый DSL, без full SQL
ksqlDBPull/Push queriesINSERT/UPDATESQL надстройка над Kafka Streams
Spark Structured StreamingMicro-batch DataFrameAppend/Update/Complete output modesMicro-batch (1s+), не event-at-a-time
MaterializeDifferential dataflowMultiset с timestampsIncremental view maintenance, sub-second latency
RisingWaveMaterialized viewsPostgres-совместимый CDCPostgreSQL wire-protocol

Flink stands out двумя вещами: (1) первоклассная unified batch/streaming семантика — один и тот же SQL работает и в batch-режиме, и в streaming; (2) zero-CDC-loss поддержка retract-changelog для сложных запросов вроде outer join.


SQL и DataStream — мосты

DataStream API оперирует низкоуровневыми событиями: DataStream<Click>. Table API/SQL оперирует dynamic tables. Между ними есть мосты:

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// DataStream -> Table (append-only)
DataStream<Click> clicks = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "clicks");
Table clicksTable = tEnv.fromDataStream(clicks);

// Table -> DataStream (только если результат append-only)
DataStream<Row> result = tEnv.toDataStream(clicksTable);

// Table -> DataStream с changelog (retract или upsert)
DataStream<Row> changelog = tEnv.toChangelogStream(aggregatedTable);

Detail в уроке 5 этого модуля. Главное запомнить: toDataStream работает только для append-only, для retract/upsert используется toChangelogStream.


Попробуй сам

  1. Опиши, какой changelog породит запрос SELECT MAX(ts) FROM clicks GROUP BY user_id если на вход приходят клики (alice, t1), (alice, t2), (bob, t3).
  2. Подумай, может ли запрос SELECT * FROM clicks WHERE ts > NOW() - INTERVAL '1' MINUTE быть выражен в Flink. Какие будут проблемы с NOW() в streaming-контексте?
  3. Возьми реальную бизнес-задачу (например, “топ-10 продуктов за последний час”) и попробуй сформулировать, какие dynamic tables вам нужны и какой будет changelog режим у итоговой.
Проверка знанийKnowledge check
Запрос SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id запущен в Flink SQL. На вход приходят три события: (alice), (bob), (alice). Какой changelog породит этот запрос, и почему он не может быть append-only?
ОтветAnswer
Запрос произведёт retract-changelog: +I (alice, 1), +I (bob, 1), -U (alice, 1), +U (alice, 2). Третье событие (повторный клик alice) требует обновления уже отправленной вниз по pipeline строки (alice, 1). Append-only не позволяет это сделать — downstream получил бы две строки (alice, 1) и (alice, 2) и не знал бы, что вторая отменяет первую. Retract решает это явной парой -U/+U: downstream сначала вычитает старое значение, потом прибавляет новое. Альтернатива — upsert-changelog, если у нас есть primary key (user_id здесь и есть PK группы): +I (alice, 1), +I (bob, 1), +U (alice, 2). Upsert компактнее, но требует, чтобы sink поддерживал upsert семантику по PK.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Запрос SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id выполняется в Flink SQL. Какой changelog поток он порождает и почему он не может быть чисто append-only?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5