Changelog Producer
В Уроке 01 мы видели, что SST-файлы в Paimon хранят системную колонку _VALUE_KIND, кодирующую тип операции: +I (insert), -U (update before), +U (update after), -D (delete). В Уроке 02 — что merge engine’ы используют эти маркеры для merge. Теперь ключевой вопрос: как Paimon генерирует корректный changelog для downstream-потребителей?
Changelog — поток CDC-событий, описывающий каждое изменение в таблице. Для стриминговых pipeline’ов это фундамент: Flink job читает changelog одной таблицы и генерирует changelog для следующей.
Зачем нужен changelog?
Kafka CDC Source
Источник: Kafka topic с CDC-событиями (Debezium) из operational RDBMS. Каждое событие — insert, update или delete. Flink job читает topic и записывает в Paimon.Paimon Table A
Paimon Table A: принимает CDC-поток, хранит через LSM-дерево с merge engine. При стриминговом чтении — отдаёт changelog: полный поток изменений для downstream. Changelog = вход для следующего звена pipeline.Flink Aggregation
Flink job агрегации: читает changelog из Table A, вычисляет агрегаты (SUM, COUNT, AVG). Для корректной агрегации нужен ПОЛНЫЙ changelog: -U (вычесть старое) + +U (прибавить новое). Без -U агрегат будет некорректным.Paimon Table B
Paimon Table B: материализованные агрегаты. Доступна для batch-запросов (dashboard, BI) и дальнейшего стриминга. Каждое звено получает changelog от предыдущего.Проблема: не каждый changelog одинаково полезен. Для операторов агрегации (SUM, COUNT) нужен полный changelog с retraction-сообщениями: -U (вычесть старое значение) + +U (прибавить новое). Без ретракции SUM будет расти бесконечно вместо корректного обновления.
_row_kind: 4 типа CDC-событий
Paimon кодирует тип операции в колонке _row_kind (при чтении) и _VALUE_KIND (при хранении):
Полный changelog — это поток, содержащий все 4 типа событий. Неполный changelog — только +I и +U (без retraction). Тип changelog определяется режимом changelog producer.
4 режима Changelog Producer
Paimon предлагает 4 режима, отличающихся полнотой changelog и стоимостью его генерации:
none
CREATE TABLE batch_only_table (
id BIGINT,
value STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'changelog-producer' = 'none'
);
Режим по умолчанию для batch-таблиц. Стриминговое чтение вернёт все записи из новых snapshot’ов, но без _row_kind. Downstream-операторы не могут различить INSERT и UPDATE — агрегации будут некорректными.
input
Debezium CDC: op=u before: amount=50 after: amount=75
Debezium CDC source: каждое событие содержит op (c/u/d), before и after. Полный CDC с retraction: при UPDATE отправляет и старое (before), и новое (after) значение. Flink конвертирует в _row_kind.Когда использовать: источник уже генерирует полный CDC (Debezium, Maxwell, Canal). Нулевой overhead — Paimon не выполняет дополнительных lookup’ов.
Ограничение: если источник отправляет только INSERT-подобные события (например, обновление как новый INSERT без retraction), changelog будет неполным.
lookup
Входящая запись: order_id=100, amount=75 (без старого значения)
Входящая запись: order_id=100, amount=75. Источник НЕ содержит старое значение (только новое). Например, API-вызов или INSERT INTO ... ON DUPLICATE KEY UPDATE. Paimon не знает, что было ДО обновления.LSM Lookup: order_id=100 Найдено: amount=50 (старое значение)
Paimon ищет текущее значение order_id=100 в LSM-дереве: MemTable → sorted runs (от Level 0 к max level). Находит: order_id=100, amount=50. Это СТАРОЕ значение для retraction.Когда использовать: источник не содержит retraction (только новые значения), но downstream нуждается в полном changelog для агрегаций. Типичный сценарий: API-запись, ETL с INSERT OVERWRITE-семантикой.
Стоимость: один lookup в LSM-дереве на каждую запись. Для write-heavy workload’ов (>100K записей/сек) overhead может быть значительным.
CREATE TABLE orders (
order_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'changelog-producer' = 'lookup',
'merge-engine' = 'deduplicate'
);
Lookup mode использует changelog-producer.lookup-wait — время ожидания компакции перед lookup. По умолчанию 0 (lookup сразу). Если данные на нижних уровнях LSM-дерева ещё не скомпактированы, lookup может вернуть устаревшее значение. Для критичных pipeline’ов: настройте changelog-producer.compaction-interval для более частой компакции.
full-compaction
Merge + Diff вычисление
Компакция merge-ит все sorted runs в один. Одновременно вычисляет diff: что изменилось? order_id=100: 50→75 (update). order_id=200: не существовал → 30 (insert). Diff = changelog.Когда использовать: write-heavy workload, где lookup overhead неприемлем, но near-real-time changelog достаточен. Задержка changelog = интервал компакции.
Стоимость: нет overhead при записи. Changelog генерируется как побочный продукт компакции — “бесплатно” с точки зрения дополнительных I/O.
CREATE TABLE metrics (
metric_id STRING,
value DOUBLE,
ts TIMESTAMP,
PRIMARY KEY (metric_id) NOT ENFORCED
) WITH (
'changelog-producer' = 'full-compaction',
'full-compaction.delta-commits' = '3'
);
full-compaction.delta-commits определяет, через сколько delta-коммитов (snapshot’ов) запускается full compaction. Значение 3 означает: каждые 3 snapshot’а = одна full compaction = один changelog batch. Меньше = чаще changelog, но больше I/O. Больше = реже changelog, но меньше overhead.
Сравнение режимов
| Режим | Полный changelog | Overhead записи | Задержка | Best for |
|---|---|---|---|---|
| none | 0 | N/A | Batch-only таблицы | |
| input | Если источник полный | 0 | Мгновенная | CDC (Debezium) |
| lookup | Всегда | 1 lookup/запись | Мгновенная | Любой источник, real-time |
| full-compaction | Всегда | 0 | Интервал компакции | Write-heavy, near-RT |
Changelog нормализация
Некоторые downstream-операторы (например, Flink GROUP BY с UDAF) требуют нормализованный changelog: каждый UPDATE представлен как пара -U/+U, каждый INSERT — как +I, каждый DELETE — как -D. Paimon автоматически нормализует changelog при стриминговом чтении:
Raw changelog (из sorted runs)
Raw changelog из LSM-дерева может содержать 'грязные' события: например, +U без предшествующего -U (если предыдущая версия была на другом уровне LSM-дерева). Или +I для PK, который уже существует на нижнем уровне.Нормализация происходит на стороне reader, не writer. Paimon хранит events as-is в SST-файлах. При стриминговом чтении, reader конструирует normalized changelog, добавляя недостающие -U events из предыдущего состояния таблицы. Для changelog-producer = 'lookup' и 'full-compaction' нормализация уже встроена — они генерируют полные пары.
Streaming Pipeline Patterns
Pattern 1: CDC → Paimon → Aggregation
MySQL + Debezium
MySQL с Debezium: полный CDC (op: c/u/d + before/after). Flink CDC connector конвертирует в _row_kind (+I/-U/+U/-D). Источник уже полный — input mode достаточен.ODS Table (input mode)
Paimon ODS table: merge-engine=deduplicate, changelog-producer=input. Хранит актуальное состояние + changelog для downstream. Input mode: нулевой overhead, потому что Debezium уже генерирует полный CDC.Flink: GROUP BY user_id SUM(amount)
Flink job: GROUP BY user_id, SUM(amount). Читает changelog из ODS table. -U: вычитает старое amount из SUM. +U: прибавляет новое. Результат: real-time агрегат.DWS Table (aggregates)
Paimon DWS table: агрегированные метрики. merge-engine=deduplicate. Downstream: BI dashboard, API. Весь pipeline — streaming, end-to-end задержка: секунды.Pattern 2: API → Paimon → Streaming Join
REST API (только новые значения)
REST API: отправляет только текущее состояние заказа (нет before-значения). Paimon не знает старое значение — input mode дал бы неполный changelog. Нужен lookup mode.Orders Table (lookup mode)
Paimon orders table: changelog-producer=lookup. При каждой записи — lookup в LSM-дереве для генерации -U (старое значение). Overhead: один lookup на запись, но changelog ВСЕГДА полный.Flink: orders JOIN users
Flink streaming join: orders JOIN users ON user_id. Для корректного join нужен полный changelog — при UPDATE заказа нужно обновить join-результат. Без retraction join будет дублировать строки.Enriched Orders
Paimon enriched orders: результат join. Содержит данные заказа + данные пользователя. Dashboard-ready.Pattern 3: High-Volume → Near-RT Analytics
IoT Sensors (1M+ events/sec)
IoT sensor stream: 1M+ events/sec. Lookup mode при таком объёме — overhead 1M lookup'ов/sec, неприемлемо. Full-compaction mode: 0 overhead при записи, changelog генерируется при компакции.Sensor Table (full-compaction)
Paimon sensor table: changelog-producer=full-compaction, full-compaction.delta-commits=5. Запись: максимальная скорость (нет lookup). Changelog: каждые 5 snapshot'ов (30-60 секунд). Near-real-time, не real-time.Flink: AVG, MAX per region
Flink aggregation: AVG(temperature), MAX(pressure) по region. Changelog приходит batch'ами (после компакции). Агрегаты обновляются не в real-time, а near-real-time. Для IoT-аналитики — достаточно.Dashboard (near-RT)
Dashboard table: обновляется каждые 30-60 секунд. Для мониторинга IoT — приемлемая задержка. Для payment processing — нет (нужен lookup mode).Сравнение с CDC в других форматах
| Аспект | Paimon | Hudi | Delta Lake |
|---|---|---|---|
| Changelog в storage | (_VALUE_KIND) | Частично (log files) | CDF-файлы (опционально) |
| Streaming native | (producer modes) | Incremental query | readChanges() batch |
| Retraction (-U) | (lookup/full-compaction) | нативно | update_preimage |
| Overhead | 0 (input) — 1 lookup (lookup) | overhead | Удвоение записи (CDF) |
| Downstream агрегации | Полная поддержка | Ограниченная | Ограниченная |
Changelog producer — главное конкурентное преимущество Paimon. Ни один другой формат не предлагает 4 встроенных режима генерации changelog с нативной нормализацией. Для streaming-first архитектур (Flink → Paimon → Flink → Paimon) это фундамент.
Ключевые выводы
- _row_kind кодирует 4 типа CDC-событий: +I (insert), -U (update before), +U (update after), -D (delete)
- 4 режима changelog producer: none (batch), input (проброс), lookup (LSM lookup), full-compaction (diff при компакции)
- Полный changelog с retraction (-U/+U пары) необходим для корректных стриминговых агрегаций
- lookup — универсальный, но дорогой; full-compaction — дешёвый, но с задержкой; input — бесплатный, но зависит от источника
- Нормализация происходит на стороне reader — Paimon хранит raw events, а нормализует при стриминговом чтении
- Paimon — единственный из 4 форматов (Delta, Iceberg, Hudi, Paimon) с нативным changelog producer как первоклассной концепцией