Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 40 мин
Продвинутый
Apache PaimonChangelogChangelog ProducerStreaming ReadCDC_row_kindInput ModeLookup ModeFull-Compaction ModeChangelog Normalization

Changelog Producer

В Уроке 01 мы видели, что SST-файлы в Paimon хранят системную колонку _VALUE_KIND, кодирующую тип операции: +I (insert), -U (update before), +U (update after), -D (delete). В Уроке 02 — что merge engine’ы используют эти маркеры для merge. Теперь ключевой вопрос: как Paimon генерирует корректный changelog для downstream-потребителей?

Changelog — поток CDC-событий, описывающий каждое изменение в таблице. Для стриминговых pipeline’ов это фундамент: Flink job читает changelog одной таблицы и генерирует changelog для следующей.

Зачем нужен changelog?

Streaming Pipeline: changelog как связующее звено

Kafka CDC Source

Источник: Kafka topic с CDC-событиями (Debezium) из operational RDBMS. Каждое событие — insert, update или delete. Flink job читает topic и записывает в Paimon.

Paimon Table A

Paimon Table A: принимает CDC-поток, хранит через LSM-дерево с merge engine. При стриминговом чтении — отдаёт changelog: полный поток изменений для downstream. Changelog = вход для следующего звена pipeline.
changelog

Flink Aggregation

Flink job агрегации: читает changelog из Table A, вычисляет агрегаты (SUM, COUNT, AVG). Для корректной агрегации нужен ПОЛНЫЙ changelog: -U (вычесть старое) + +U (прибавить новое). Без -U агрегат будет некорректным.
changelog

Paimon Table B

Paimon Table B: материализованные агрегаты. Доступна для batch-запросов (dashboard, BI) и дальнейшего стриминга. Каждое звено получает changelog от предыдущего.

Проблема: не каждый changelog одинаково полезен. Для операторов агрегации (SUM, COUNT) нужен полный changelog с retraction-сообщениями: -U (вычесть старое значение) + +U (прибавить новое). Без ретракции SUM будет расти бесконечно вместо корректного обновления.

_row_kind: 4 типа CDC-событий

Paimon кодирует тип операции в колонке _row_kind (при чтении) и _VALUE_KIND (при хранении):

4 типа CDC-событий в Paimon
+I (INSERT)Новая запись. Первое появление primary key в таблице. Downstream-оператор: добавляет строку в состояние. SUM += new_value. COUNT += 1.
-U (UPDATE BEFORE)Retraction: старое значение перед обновлением. Содержит полную строку ДО изменения. Downstream-оператор: вычитает из агрегата. SUM -= old_value. Без -U агрегат будет двойным.
+U (UPDATE AFTER)Новое значение после обновления. Содержит полную строку ПОСЛЕ изменения. Downstream-оператор: прибавляет к агрегату. SUM += new_value. Пара -U/+U = атомарное обновление.
-D (DELETE)Удаление записи. Содержит полную строку на момент удаления. Downstream-оператор: удаляет из состояния. SUM -= deleted_value. COUNT -= 1.

Полный changelog — это поток, содержащий все 4 типа событий. Неполный changelog — только +I и +U (без retraction). Тип changelog определяется режимом changelog producer.

4 режима Changelog Producer

Paimon предлагает 4 режима, отличающихся полнотой changelog и стоимостью его генерации:

4 режима Changelog Producer
noneChangelog не генерируется. Стриминговое чтение видит только raw SST-файлы без _row_kind. Подходит ТОЛЬКО для append-only workload'ов или batch-только таблиц. Нельзя использовать для стриминговых агрегаций.
inputChangelog = входящий поток as-is. Если источник (Debezium) отправляет полный CDC (+I/-U/+U/-D), Paimon просто пробрасывает его. Нулевой overhead. Но если источник отправляет только +I/+U (без retraction) — changelog будет неполным.
lookupPaimon ищет СТАРОЕ значение в LSM-дереве перед записью нового. Если PK существует — генерирует -U (старое) + +U (новое). Если нет — генерирует +I. Overhead: один lookup на запись. Но changelog ВСЕГДА полный, независимо от источника.
full-compactionChangelog генерируется при компакции: diff между старым и новым sorted run. Нет overhead при записи. Changelog доступен только после компакции — задержка от секунд до минут. Подходит для near-real-time (не real-time).

none

CREATE TABLE batch_only_table (
 id BIGINT,
 value STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'changelog-producer' = 'none'
);

Режим по умолчанию для batch-таблиц. Стриминговое чтение вернёт все записи из новых snapshot’ов, но без _row_kind. Downstream-операторы не могут различить INSERT и UPDATE — агрегации будут некорректными.

input

Input Mode: проброс входящего changelog

Debezium CDC: op=u before: amount=50 after: amount=75

Debezium CDC source: каждое событие содержит op (c/u/d), before и after. Полный CDC с retraction: при UPDATE отправляет и старое (before), и новое (after) значение. Flink конвертирует в _row_kind.
Flink: convert to _row_kind
-URetraction: старое значение. Flink преобразует Debezium before в -U запись. Paimon записывает её в MemTable и пробрасывает в changelog.
+UAccumulate: новое значение. Flink преобразует Debezium after в +U запись. Пара -U/+U — атомарное обновление, корректное для downstream агрегаций.
Paimon stores + produces changelog
ChangelogInput mode: changelog = входящий поток. Paimon не генерирует дополнительные события — просто пробрасывает то, что пришло от Flink. Overhead = 0. Но полнота changelog зависит от источника.

Когда использовать: источник уже генерирует полный CDC (Debezium, Maxwell, Canal). Нулевой overhead — Paimon не выполняет дополнительных lookup’ов.

Ограничение: если источник отправляет только INSERT-подобные события (например, обновление как новый INSERT без retraction), changelog будет неполным.

lookup

Lookup Mode: генерация полного changelog

Входящая запись: order_id=100, amount=75 (без старого значения)

Входящая запись: order_id=100, amount=75. Источник НЕ содержит старое значение (только новое). Например, API-вызов или INSERT INTO ... ON DUPLICATE KEY UPDATE. Paimon не знает, что было ДО обновления.
1. Lookup в LSM-дереве

LSM Lookup: order_id=100 Найдено: amount=50 (старое значение)

Paimon ищет текущее значение order_id=100 в LSM-дереве: MemTable → sorted runs (от Level 0 к max level). Находит: order_id=100, amount=50. Это СТАРОЕ значение для retraction.
2. Генерация retraction
-U (сгенерирован)Paimon генерирует retraction из найденного старого значения. -U: order_id=100, amount=50. Этот event НЕ существовал во входящем потоке — Paimon создал его из lookup результата.
+U (входящая)Новое значение — из входящей записи as-is. +U: order_id=100, amount=75. Пара -U/+U: downstream SUM корректно обновится: SUM -= 50, SUM += 75.
ChangelogПолный changelog с retraction: [-U(100, 50), +U(100, 75)]. Lookup mode ГАРАНТИРУЕТ полноту changelog независимо от источника. Цена: один LSM-дерево lookup на каждую запись с обновлением PK.

Когда использовать: источник не содержит retraction (только новые значения), но downstream нуждается в полном changelog для агрегаций. Типичный сценарий: API-запись, ETL с INSERT OVERWRITE-семантикой.

Стоимость: один lookup в LSM-дереве на каждую запись. Для write-heavy workload’ов (>100K записей/сек) overhead может быть значительным.

CREATE TABLE orders (
 order_id BIGINT,
 amount DECIMAL(10, 2),
 status STRING,
 PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
 'changelog-producer' = 'lookup',
 'merge-engine' = 'deduplicate'
);
TIP

Lookup mode использует changelog-producer.lookup-wait — время ожидания компакции перед lookup. По умолчанию 0 (lookup сразу). Если данные на нижних уровнях LSM-дерева ещё не скомпактированы, lookup может вернуть устаревшее значение. Для критичных pipeline’ов: настройте changelog-producer.compaction-interval для более частой компакции.

full-compaction

Full-Compaction Mode: changelog из diff'а компакции
До компакцииСостояние таблицы перед компакцией: order_id=100, amount=50. Это 'снимок' всех merged sorted runs на момент предыдущей компакции.
Новые записиSorted runs, накопившиеся с предыдущей компакции. Содержат: order_id=100, amount=75 (update) и order_id=200, amount=30 (insert). Эти данные ещё не merge'ены с основным sorted run.
full compaction

Merge + Diff вычисление

Компакция merge-ит все sorted runs в один. Одновременно вычисляет diff: что изменилось? order_id=100: 50→75 (update). order_id=200: не существовал → 30 (insert). Diff = changelog.
ChangelogРезультат diff: [+I(200, 30), -U(100, 50), +U(100, 75)]. Полный changelog с retraction. Доступен только ПОСЛЕ компакции — задержка = интервал компакции (по умолчанию зависит от trigger).
Snapshot N+1Новый snapshot после компакции: order_id=100, amount=75 и order_id=200, amount=30. Этот snapshot станет baseline для следующей компакции.

Когда использовать: write-heavy workload, где lookup overhead неприемлем, но near-real-time changelog достаточен. Задержка changelog = интервал компакции.

Стоимость: нет overhead при записи. Changelog генерируется как побочный продукт компакции — “бесплатно” с точки зрения дополнительных I/O.

CREATE TABLE metrics (
 metric_id STRING,
 value DOUBLE,
 ts TIMESTAMP,
 PRIMARY KEY (metric_id) NOT ENFORCED
) WITH (
 'changelog-producer' = 'full-compaction',
 'full-compaction.delta-commits' = '3'
);
NOTE

full-compaction.delta-commits определяет, через сколько delta-коммитов (snapshot’ов) запускается full compaction. Значение 3 означает: каждые 3 snapshot’а = одна full compaction = один changelog batch. Меньше = чаще changelog, но больше I/O. Больше = реже changelog, но меньше overhead.

Сравнение режимов

4 режима: полнота vs стоимость
noneНет changelog. Стриминг видит raw данные без _row_kind. Стоимость: 0. Полнота: нет. Задержка: N/A. Для batch-only таблиц.
inputПроброс входящего потока. Стоимость: 0. Полнота: зависит от источника (Debezium = полный, API = неполный). Задержка: мгновенная.
lookupLSM-lookup для каждой записи. Стоимость: один lookup на запись. Полнота: всегда полный. Задержка: мгновенная. Универсальный, но дорогой для write-heavy.
full-compactionDiff при компакции. Стоимость: 0 при записи (только при компакции). Полнота: всегда полный. Задержка: интервал компакции. Near-real-time для write-heavy.
РежимПолный changelogOverhead записиЗадержкаBest for
none0N/ABatch-only таблицы
inputЕсли источник полный0МгновеннаяCDC (Debezium)
lookupВсегда1 lookup/записьМгновеннаяЛюбой источник, real-time
full-compactionВсегда0Интервал компакцииWrite-heavy, near-RT

Changelog нормализация

Некоторые downstream-операторы (например, Flink GROUP BY с UDAF) требуют нормализованный changelog: каждый UPDATE представлен как пара -U/+U, каждый INSERT — как +I, каждый DELETE — как -D. Paimon автоматически нормализует changelog при стриминговом чтении:

Changelog Normalization: raw → normalized

Raw changelog (из sorted runs)

Raw changelog из LSM-дерева может содержать 'грязные' события: например, +U без предшествующего -U (если предыдущая версия была на другом уровне LSM-дерева). Или +I для PK, который уже существует на нижнем уровне.
normalization
До нормализацииRaw events: [+U(100, 75)]. Downstream оператор SUM не знает, что вычесть — нет -U. Или: [+I(100, 50), +I(100, 75)] — два INSERT для одного PK, хотя второй должен быть UPDATE.
После нормализацииNormalized events: [-U(100, 50), +U(100, 75)] — полная пара. Или: [+I(100, 50), -U(100, 50), +U(100, 75)] — первый INSERT + корректный UPDATE. Downstream операторы работают корректно.
NOTE

Нормализация происходит на стороне reader, не writer. Paimon хранит events as-is в SST-файлах. При стриминговом чтении, reader конструирует normalized changelog, добавляя недостающие -U events из предыдущего состояния таблицы. Для changelog-producer = 'lookup' и 'full-compaction' нормализация уже встроена — они генерируют полные пары.

Streaming Pipeline Patterns

Pattern 1: CDC → Paimon → Aggregation

Pattern 1: CDC Pipeline с changelog

MySQL + Debezium

MySQL с Debezium: полный CDC (op: c/u/d + before/after). Flink CDC connector конвертирует в _row_kind (+I/-U/+U/-D). Источник уже полный — input mode достаточен.
CDC

ODS Table (input mode)

Paimon ODS table: merge-engine=deduplicate, changelog-producer=input. Хранит актуальное состояние + changelog для downstream. Input mode: нулевой overhead, потому что Debezium уже генерирует полный CDC.
changelog stream

Flink: GROUP BY user_id SUM(amount)

Flink job: GROUP BY user_id, SUM(amount). Читает changelog из ODS table. -U: вычитает старое amount из SUM. +U: прибавляет новое. Результат: real-time агрегат.
changelog

DWS Table (aggregates)

Paimon DWS table: агрегированные метрики. merge-engine=deduplicate. Downstream: BI dashboard, API. Весь pipeline — streaming, end-to-end задержка: секунды.

Pattern 2: API → Paimon → Streaming Join

Pattern 2: Lookup Mode для API-источника

REST API (только новые значения)

REST API: отправляет только текущее состояние заказа (нет before-значения). Paimon не знает старое значение — input mode дал бы неполный changelog. Нужен lookup mode.

Orders Table (lookup mode)

Paimon orders table: changelog-producer=lookup. При каждой записи — lookup в LSM-дереве для генерации -U (старое значение). Overhead: один lookup на запись, но changelog ВСЕГДА полный.
full changelog

Flink: orders JOIN users

Flink streaming join: orders JOIN users ON user_id. Для корректного join нужен полный changelog — при UPDATE заказа нужно обновить join-результат. Без retraction join будет дублировать строки.

Enriched Orders

Paimon enriched orders: результат join. Содержит данные заказа + данные пользователя. Dashboard-ready.

Pattern 3: High-Volume → Near-RT Analytics

Pattern 3: Full-Compaction для high-volume

IoT Sensors (1M+ events/sec)

IoT sensor stream: 1M+ events/sec. Lookup mode при таком объёме — overhead 1M lookup'ов/sec, неприемлемо. Full-compaction mode: 0 overhead при записи, changelog генерируется при компакции.

Sensor Table (full-compaction)

Paimon sensor table: changelog-producer=full-compaction, full-compaction.delta-commits=5. Запись: максимальная скорость (нет lookup). Changelog: каждые 5 snapshot'ов (30-60 секунд). Near-real-time, не real-time.
changelog (каждые 30-60 сек)

Flink: AVG, MAX per region

Flink aggregation: AVG(temperature), MAX(pressure) по region. Changelog приходит batch'ами (после компакции). Агрегаты обновляются не в real-time, а near-real-time. Для IoT-аналитики — достаточно.

Dashboard (near-RT)

Dashboard table: обновляется каждые 30-60 секунд. Для мониторинга IoT — приемлемая задержка. Для payment processing — нет (нужен lookup mode).

Сравнение с CDC в других форматах

Changelog: Paimon vs Hudi vs Delta Lake
PaimonChangelog — first-class citizen. 4 режима producer, _VALUE_KIND в SST-файлах, нормализация на чтении. Стриминговый changelog — основной паттерн потребления. Оптимизирован для Flink streaming pipeline'ов.
HudiCDC через incremental queries: snapshot → snapshot diff. MOR log files содержат CDC-события, но формат привязан к Hudi internals. CDC-extraction — отдельная операция, не встроенный changelog поток.
Delta LakeChange Data Feed (CDF): активируется через delta.enableChangeDataFeed=true. Записывает _change_type (insert/update_preimage/update_postimage/delete) в отдельные файлы. Batch-ориентирован: readChanges() для временного диапазона.
АспектPaimonHudiDelta Lake
Changelog в storage (_VALUE_KIND)Частично (log files)CDF-файлы (опционально)
Streaming native (producer modes)Incremental queryreadChanges() batch
Retraction (-U) (lookup/full-compaction) нативноupdate_preimage
Overhead0 (input) — 1 lookup (lookup) overheadУдвоение записи (CDF)
Downstream агрегацииПолная поддержкаОграниченнаяОграниченная
TIP

Changelog producer — главное конкурентное преимущество Paimon. Ни один другой формат не предлагает 4 встроенных режима генерации changelog с нативной нормализацией. Для streaming-first архитектур (Flink → Paimon → Flink → Paimon) это фундамент.

Ключевые выводы

  1. _row_kind кодирует 4 типа CDC-событий: +I (insert), -U (update before), +U (update after), -D (delete)
  2. 4 режима changelog producer: none (batch), input (проброс), lookup (LSM lookup), full-compaction (diff при компакции)
  3. Полный changelog с retraction (-U/+U пары) необходим для корректных стриминговых агрегаций
  4. lookup — универсальный, но дорогой; full-compaction — дешёвый, но с задержкой; input — бесплатный, но зависит от источника
  5. Нормализация происходит на стороне reader — Paimon хранит raw events, а нормализует при стриминговом чтении
  6. Paimon — единственный из 4 форматов (Delta, Iceberg, Hudi, Paimon) с нативным changelog producer как первоклассной концепцией

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Flink pipeline читает changelog из Paimon-таблицы (changelog-producer=input) и выполняет SUM(amount) GROUP BY user_id. Источник — Debezium CDC из PostgreSQL. При UPDATE заказа (amount: 50→75) SUM корректно обновляется. Почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6