Paimon tags и branches: time travel, DR, эксперименты
Один из killer features lakehouse — time travel. Возможность query state таблицы как он был X дней назад. В Iceberg/Hudi это есть, в Paimon тоже — но дополнительно есть две концепции, которые отсутствуют в других форматах: tags (named immutable snapshots) и branches (independent timelines данных).
В этом уроке: что такое tags vs snapshots, как использовать tags для DR и архивации, как branches позволяют experimenting без touch prod data, и production-практики управления retention.
Apache Iceberg: snapshots и time travelSnapshots: автоматические, retention-managed
Каждый Flink checkpoint в Paimon = новый snapshot. Это автоматическое поведение.
-- Все snapshots в таблице
SELECT * FROM orders$snapshots;
/*
snapshot_id | schema_id | commit_user | commit_time | commit_kind | total_record_count
1 | 0 | flink-job-orders | 2026-05-19 09:00:00.000 | APPEND | 100000
2 | 0 | flink-job-orders | 2026-05-19 09:00:30.000 | APPEND | 150000
3 | 0 | flink-job-orders | 2026-05-19 09:01:00.000 | APPEND | 200000
...
1440 | 0 | flink-job-orders | 2026-05-20 09:00:00.000 | APPEND | 86400000
*/
При 30-second checkpoint interval — 2880 snapshots/day. Это growth, который не можно копить вечно.
Retention controls:
ALTER TABLE orders SET (
'snapshot.num-retained.min' = '10', -- always keep last 10
'snapshot.num-retained.max' = '5000', -- never more than 5000
'snapshot.time-retained' = '7 d', -- TTL 7 days
'snapshot.expire.execution-mode' = 'sync' -- expire синхронно при commit
);
Каждый commit Paimon делает expire:
- Считает текущие snapshots: 5001 total.
- Удаляет oldest ones, пока не останется max (5000) или пока age > 7d (whatever happens first).
- Удаляет orphan files (parquet, на которые нет manifest references).
Expired snapshots исчезают навсегда — больше нельзя query historical state. Это не подходит для:
- DR (disaster recovery) — нужны long-term recovery points (1-30 дней назад).
- Archival — нужны specific dates для compliance / audit.
- Reproducibility — нужны specific states для ML training reproducibility.
Для этого — tags.
Tags: named, immutable, не подлежат expire
Tag — это именованная ссылка на specific snapshot, который не удаляется по retention.
-- Создать tag
CALL sys.create_tag(
table => 'orders',
tag => 'before-promotion-launch',
snapshot_id => 12345
);
-- Или из текущего snapshot
CALL sys.create_tag(
table => 'orders',
tag => 'eod-2026-05-19'
);
-- List tags
SELECT * FROM orders$tags;
/*
tag_name | snapshot_id | schema_id | commit_time | record_count
before-promotion-launch | 12345 | 2 | 2026-05-19 09:00:00.000 | 86_400_000
eod-2026-05-19 | 14432 | 2 | 2026-05-19 23:59:30.000 | 92_100_000
*/
Snapshot 12345 теперь “frozen” — даже если retention policy 7 дней, snapshot 12345 не expire-нится. Все parquet files, на которые он указывает, тоже сохраняются.
Tag-and-forget pattern для DR:
-- Auto-create tag каждый день в полночь (через scheduled SQL)
CALL sys.create_tag(
table => 'orders',
tag => CONCAT('eod-', DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd'))
);
Можно настроить через tag.automatic-creation:
ALTER TABLE orders SET (
'tag.automatic-creation' = 'daily', -- auto-create daily tags
'tag.creation-period' = 'daily',
'tag.creation-delay' = '10 min', -- 10 минут после midnight (на late events)
'tag.num-retained-max' = '365', -- keep 1 year of daily tags
'tag.default-time-retained' = '365 d' -- TTL для default tags
);
После этого Paimon сам создаёт tag-and-day automatic-ally. Reading historical day:
-- Read как было EOD 2026-04-30 (2 weeks ago)
SELECT * FROM orders /*+ OPTIONS('scan.tag-name'='eod-2026-04-30') */;
-- Time travel via timestamp
SELECT * FROM orders /*+ OPTIONS('scan.timestamp-millis'='1714521600000') */;
-- Via snapshot ID
SELECT * FROM orders /*+ OPTIONS('scan.snapshot-id'='12345') */;
Tags для disaster recovery
DR pattern: data corruption discovered, нужно restore до известного состояния.
Сценарий: 2026-05-20 утром обнаружено, что job вчера записал bad data (bug в transformation logic). Нужно rollback до состояния EOD 2026-05-18.
-- Step 1: проверить что tag exists
SELECT * FROM orders$tags WHERE tag_name = 'eod-2026-05-18';
-- snapshot_id = 12345
-- Step 2: rollback таблицы на этот snapshot
CALL sys.rollback_to(
table => 'orders',
snapshot_id => 12345
);
-- ИЛИ
CALL sys.rollback_to(
table => 'orders',
tag => 'eod-2026-05-18'
);
rollback_to создаёт новый snapshot, который реferences те же files что и snapshot 12345. LATEST pointer updates. Все snapshots > 12345 expired/cleaned.
Caution: rollback irreversible (если subsequent snapshots cleaned). Лучше: создать new tag перед rollback для backup.
-- Safer:
CALL sys.create_tag(
table => 'orders',
tag => 'pre-rollback-backup',
snapshot_id => 14500 -- current latest
);
CALL sys.rollback_to(table => 'orders', tag => 'eod-2026-05-18');
-- Если что — обратно через rollback_to с pre-rollback-backup
Rollback применяется immediately и breaks downstream consumers. Если у тебя streaming downstream consume Paimon table — он будет confused (last seen snapshot 14500, now LATEST=12346). Best practice: pause downstream consumers, rollback, restart with fresh state. Также — coordinate с team, document когда был rollback (для audit).
Branches: independent timelines
Branches — features Paimon (отсутствует в Iceberg/Hudi на момент 2026). Идея заимствована из Git: branch — это independent “линия development” данных.
-- Создать branch от текущего snapshot
CALL sys.create_branch(
table => 'orders',
branch => 'feature-new-pricing'
);
-- Или от specific snapshot/tag
CALL sys.create_branch(
table => 'orders',
branch => 'experiment-2026-Q2',
tag => 'eod-2026-05-19'
);
После create branch:
- Main branch (
main) продолжает receive writes от prod Flink job. - Feature branch (
feature-new-pricing) — independent, можно writing/reading независимо. - Storage: branch имеет свою snapshot/, manifest/ — но shares data files с main initially (copy-on-write).
Use cases:
1. A/B testing models / aggregations.
Создаём branch, на нём applies new pricing logic. Сравниваем results с main без rebuilding всей pipeline.
-- Write в branch
SET 'table.exec.sink.upsert-materialize' = 'NONE';
INSERT INTO orders /*+ OPTIONS('branch'='feature-new-pricing') */
SELECT id, name, new_pricing(amount) AS amount, ...
FROM source;
-- Сравнение
SELECT
m.id,
m.amount AS main_amount,
b.amount AS branch_amount
FROM orders m
JOIN orders /*+ OPTIONS('branch'='feature-new-pricing') */ b
ON m.id = b.id
WHERE m.amount != b.amount;
2. Schema migration testing.
Test backward-incompatible schema change на branch без breaking main.
3. Time-bounded analysis.
“Какие были orders 7 дней назад, ESL ли мы применили refund-logic, который сейчас в discussion?”. Create branch from tag, apply refund logic, query results.
Branch merge и delete
Если эксперимент successful, можно merge branch в main:
-- Merge нет direct command — обычно через INSERT OVERWRITE main from branch
INSERT OVERWRITE orders
SELECT * FROM orders /*+ OPTIONS('branch'='feature-new-pricing') */;
-- Cleanup
CALL sys.delete_branch(
table => 'orders',
branch => 'feature-new-pricing'
);
Если эксперимент unsuccessful — просто delete branch без merge:
CALL sys.delete_branch(
table => 'orders',
branch => 'experiment-failed'
);
Delete branch removes branch-specific manifests/snapshots, но не data files если они referenced главным branch-ем. Только orphaned files (только этим branch-ем referenced) cleaned.
Time travel queries
Time travel — query таблицы как она была в past. Несколько способов pinning:
-- 1. По snapshot ID
SELECT * FROM orders /*+ OPTIONS('scan.snapshot-id'='12345') */;
-- 2. По tag name
SELECT * FROM orders /*+ OPTIONS('scan.tag-name'='eod-2026-05-19') */;
-- 3. По timestamp (epoch milliseconds)
SELECT * FROM orders /*+ OPTIONS('scan.timestamp-millis'='1716067200000') */;
-- 4. По human-readable timestamp (Flink SQL)
SELECT * FROM orders FOR SYSTEM_TIME AS OF TIMESTAMP '2026-05-19 09:00:00';
-- 5. По branch
SELECT * FROM orders /*+ OPTIONS('branch'='feature-new-pricing') */;
При timestamp — Paimon finds snapshot который был active на этот timestamp (closest before-or-equal).
Use cases time travel:
- Audit: “Что был state на 9:00, до bug fix?”
- Compliance: “Show data на end of fiscal year”.
- ML reproducibility: “Train new model на тех же features, что были N days ago”.
- Debug: “Compare snapshot N vs N+1 to understand what changed”.
Snapshot diff и changelog
Paimon может generate changelog — описание changes между snapshots:
ALTER TABLE orders SET ('changelog-producer' = 'input');
-- 'input', 'full-compaction', 'lookup', 'none'
Changelog modes:
- input: changes generated при write time (-U for old value, +U for new). Cheapest, но requires upstream Flink job знать deletes.
- full-compaction: changes generated при compaction. Larger latency (после compaction visible), но accurate.
- lookup: changes generated при write через lookup в existing data. Highest cost, но accurate even without upstream support.
- none: changelog disabled. Streaming reads only get new INSERTs.
Streaming read с changelog:
-- Read как changelog (включая -U, +U records)
SELECT * FROM orders /*+ OPTIONS('streaming'='true', 'changelog-producer'='input') */;
Это позволяет downstream consume полную информацию о mutations (для materialized view maintenance, CDC propagation).
Snapshot inspection и debugging
Системные tables для inspection:
-- Все snapshots
SELECT * FROM orders$snapshots;
-- Все tags
SELECT * FROM orders$tags;
-- Все branches
SELECT * FROM orders$branches;
-- Files current snapshot
SELECT * FROM orders$files;
/*
partition | bucket | file_name | level | min_key | max_key | record_count | size_bytes
date=... | 0 | data-uuid-1.parquet | 0 | {id:1} | {id:100}| 1000 | 50000
date=... | 0 | data-uuid-2.parquet | 1 | {id:1} | {id:500}| 5000 | 200000
*/
-- Schema versions
SELECT * FROM orders$schemas;
-- Manifests
SELECT * FROM orders$manifests;
-- Compactions
SELECT * FROM orders$audit_log;
Debugging slow query через EXPLAIN:
EXPLAIN SELECT * FROM orders WHERE id = 12345;
/*
== Optimized Physical Plan ==
TableSourceScan(table=[[paimon, db, orders]],
fields=[id, name, ...],
pushedFilters=[id=12345],
bucketIds=[7])
predicate: id = 12345
*/
Если bucketIds=[ALL] — bucket-key не используется для filter pushdown, query будет scan все buckets.
Production-перспектива: full strategy
Recommended retention strategy для production:
ALTER TABLE orders SET (
-- Snapshot retention: keep last 24 hours + at least 50
'snapshot.time-retained' = '24 h',
'snapshot.num-retained.min' = '50',
'snapshot.num-retained.max' = '5000',
-- Daily tags: keep 90 days
'tag.automatic-creation' = 'daily',
'tag.creation-period' = 'daily',
'tag.creation-delay' = '5 min',
'tag.num-retained-max' = '90',
'tag.default-time-retained' = '90 d',
-- Weekly tags: keep 1 year (configure separately)
-- Manual create каждую неделю или через scheduled SQL
);
Pattern: hot snapshots (24h, для real-time queries / quick rollback) + daily tags (90d, для analytics historical) + weekly/monthly tags (1y+, для compliance/DR).
Storage cost analysis:
- 24h * 2880 snapshots/day = 2880 snapshots. Snapshot metadata mostly small (~KB). Data files shared между snapshots — single set.
- 90 daily tags = 90 frozen snapshots. Each tag preserves data files от этой snapshot.
- Если daily compaction merges most files в L2, tags reference relatively stable L2 files. Storage growth modest.
DR drill (recommended quarterly):
- Pick random old tag (например, eod-2026-02-15).
CALL sys.rollback_to(table => 'test_orders', tag => 'eod-2026-02-15').- Verify data consistent (counts match historical reports).
- Restore from
pre-rollback-backupto current.
Этот drill catches retention misconfiguration (tag deleted раньше времени) или corruption (S3 object missing).