Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 24 мин
Продвинутый
PaimonTagsBranchesTime travelDisaster recoverySnapshot retention

Paimon tags и branches: time travel, DR, эксперименты

Один из killer features lakehouse — time travel. Возможность query state таблицы как он был X дней назад. В Iceberg/Hudi это есть, в Paimon тоже — но дополнительно есть две концепции, которые отсутствуют в других форматах: tags (named immutable snapshots) и branches (independent timelines данных).

В этом уроке: что такое tags vs snapshots, как использовать tags для DR и архивации, как branches позволяют experimenting без touch prod data, и production-практики управления retention.

Apache Iceberg: snapshots и time travel

Snapshots: автоматические, retention-managed

Каждый Flink checkpoint в Paimon = новый snapshot. Это автоматическое поведение.

-- Все snapshots в таблице
SELECT * FROM orders$snapshots;

/*
snapshot_id | schema_id | commit_user      | commit_time              | commit_kind | total_record_count
1           | 0         | flink-job-orders | 2026-05-19 09:00:00.000 | APPEND      | 100000
2           | 0         | flink-job-orders | 2026-05-19 09:00:30.000 | APPEND      | 150000
3           | 0         | flink-job-orders | 2026-05-19 09:01:00.000 | APPEND      | 200000
...
1440        | 0         | flink-job-orders | 2026-05-20 09:00:00.000 | APPEND      | 86400000
*/

При 30-second checkpoint interval — 2880 snapshots/day. Это growth, который не можно копить вечно.

Retention controls:

ALTER TABLE orders SET (
  'snapshot.num-retained.min' = '10',      -- always keep last 10
  'snapshot.num-retained.max' = '5000',    -- never more than 5000
  'snapshot.time-retained' = '7 d',        -- TTL 7 days
  'snapshot.expire.execution-mode' = 'sync' -- expire синхронно при commit
);

Каждый commit Paimon делает expire:

  1. Считает текущие snapshots: 5001 total.
  2. Удаляет oldest ones, пока не останется max (5000) или пока age > 7d (whatever happens first).
  3. Удаляет orphan files (parquet, на которые нет manifest references).

Expired snapshots исчезают навсегда — больше нельзя query historical state. Это не подходит для:

  • DR (disaster recovery) — нужны long-term recovery points (1-30 дней назад).
  • Archival — нужны specific dates для compliance / audit.
  • Reproducibility — нужны specific states для ML training reproducibility.

Для этого — tags.


Tags: named, immutable, не подлежат expire

Tag — это именованная ссылка на specific snapshot, который не удаляется по retention.

-- Создать tag
CALL sys.create_tag(
  table => 'orders',
  tag => 'before-promotion-launch',
  snapshot_id => 12345
);

-- Или из текущего snapshot
CALL sys.create_tag(
  table => 'orders',
  tag => 'eod-2026-05-19'
);

-- List tags
SELECT * FROM orders$tags;

/*
tag_name                 | snapshot_id | schema_id | commit_time              | record_count
before-promotion-launch  | 12345       | 2         | 2026-05-19 09:00:00.000 | 86_400_000
eod-2026-05-19           | 14432       | 2         | 2026-05-19 23:59:30.000 | 92_100_000
*/

Snapshot 12345 теперь “frozen” — даже если retention policy 7 дней, snapshot 12345 не expire-нится. Все parquet files, на которые он указывает, тоже сохраняются.

Snapshots vs Tags: lifecycle
Snapshot 1auto, expiresСоздаётся автоматически каждым commit (Flink checkpoint). Подлежит retention/expire — старые удаляются по snapshot.num-retained.max / time-retained
Snapshot 2auto, expiresSame — automatic, expirable
Snapshot Nauto, expiresSame. После retention period — gone forever
Tag 'eod-may-19'manual, frozenNamed reference на specific snapshot. Не expire-ится по retention. Может иметь свою policy (tag.num-retained-max). Для DR, архивации, reproducibility
points to
Snapshot 12345preserved by tagЭтот snapshot имел бы expire по обычной retention, но tag preserves его — references parquet files не удаляются
Tag 'before-launch'manualДругой tag — на другой snapshot. Independent lifecycle. Можно создать любое количество tags на любые snapshots
points to
Snapshot 11200preserved by tagТакже frozen by tag. Если в future нужно rollback к этому состоянию — tag даёт guarantee что данные остались

Tag-and-forget pattern для DR:

-- Auto-create tag каждый день в полночь (через scheduled SQL)
CALL sys.create_tag(
  table => 'orders',
  tag => CONCAT('eod-', DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd'))
);

Можно настроить через tag.automatic-creation:

ALTER TABLE orders SET (
  'tag.automatic-creation' = 'daily',           -- auto-create daily tags
  'tag.creation-period' = 'daily',
  'tag.creation-delay' = '10 min',              -- 10 минут после midnight (на late events)
  'tag.num-retained-max' = '365',               -- keep 1 year of daily tags
  'tag.default-time-retained' = '365 d'         -- TTL для default tags
);

После этого Paimon сам создаёт tag-and-day automatic-ally. Reading historical day:

-- Read как было EOD 2026-04-30 (2 weeks ago)
SELECT * FROM orders /*+ OPTIONS('scan.tag-name'='eod-2026-04-30') */;

-- Time travel via timestamp
SELECT * FROM orders /*+ OPTIONS('scan.timestamp-millis'='1714521600000') */;

-- Via snapshot ID
SELECT * FROM orders /*+ OPTIONS('scan.snapshot-id'='12345') */;

Tags для disaster recovery

DR pattern: data corruption discovered, нужно restore до известного состояния.

Сценарий: 2026-05-20 утром обнаружено, что job вчера записал bad data (bug в transformation logic). Нужно rollback до состояния EOD 2026-05-18.

-- Step 1: проверить что tag exists
SELECT * FROM orders$tags WHERE tag_name = 'eod-2026-05-18';
-- snapshot_id = 12345

-- Step 2: rollback таблицы на этот snapshot
CALL sys.rollback_to(
  table => 'orders',
  snapshot_id => 12345
);

-- ИЛИ
CALL sys.rollback_to(
  table => 'orders',
  tag => 'eod-2026-05-18'
);

rollback_to создаёт новый snapshot, который реferences те же files что и snapshot 12345. LATEST pointer updates. Все snapshots > 12345 expired/cleaned.

Caution: rollback irreversible (если subsequent snapshots cleaned). Лучше: создать new tag перед rollback для backup.

-- Safer:
CALL sys.create_tag(
  table => 'orders',
  tag => 'pre-rollback-backup',
  snapshot_id => 14500  -- current latest
);

CALL sys.rollback_to(table => 'orders', tag => 'eod-2026-05-18');
-- Если что — обратно через rollback_to с pre-rollback-backup
WARNING

Rollback применяется immediately и breaks downstream consumers. Если у тебя streaming downstream consume Paimon table — он будет confused (last seen snapshot 14500, now LATEST=12346). Best practice: pause downstream consumers, rollback, restart with fresh state. Также — coordinate с team, document когда был rollback (для audit).


Branches: independent timelines

Branches — features Paimon (отсутствует в Iceberg/Hudi на момент 2026). Идея заимствована из Git: branch — это independent “линия development” данных.

-- Создать branch от текущего snapshot
CALL sys.create_branch(
  table => 'orders',
  branch => 'feature-new-pricing'
);

-- Или от specific snapshot/tag
CALL sys.create_branch(
  table => 'orders',
  branch => 'experiment-2026-Q2',
  tag => 'eod-2026-05-19'
);

После create branch:

  • Main branch (main) продолжает receive writes от prod Flink job.
  • Feature branch (feature-new-pricing) — independent, можно writing/reading независимо.
  • Storage: branch имеет свою snapshot/, manifest/ — но shares data files с main initially (copy-on-write).

Use cases:

1. A/B testing models / aggregations.

Создаём branch, на нём applies new pricing logic. Сравниваем results с main без rebuilding всей pipeline.

-- Write в branch
SET 'table.exec.sink.upsert-materialize' = 'NONE';

INSERT INTO orders /*+ OPTIONS('branch'='feature-new-pricing') */
SELECT id, name, new_pricing(amount) AS amount, ...
FROM source;

-- Сравнение
SELECT
  m.id,
  m.amount AS main_amount,
  b.amount AS branch_amount
FROM orders m
JOIN orders /*+ OPTIONS('branch'='feature-new-pricing') */ b
  ON m.id = b.id
WHERE m.amount != b.amount;

2. Schema migration testing.

Test backward-incompatible schema change на branch без breaking main.

3. Time-bounded analysis.

“Какие были orders 7 дней назад, ESL ли мы применили refund-logic, который сейчас в discussion?”. Create branch from tag, apply refund logic, query results.


Branch merge и delete

Если эксперимент successful, можно merge branch в main:

-- Merge нет direct command — обычно через INSERT OVERWRITE main from branch
INSERT OVERWRITE orders
SELECT * FROM orders /*+ OPTIONS('branch'='feature-new-pricing') */;

-- Cleanup
CALL sys.delete_branch(
  table => 'orders',
  branch => 'feature-new-pricing'
);

Если эксперимент unsuccessful — просто delete branch без merge:

CALL sys.delete_branch(
  table => 'orders',
  branch => 'experiment-failed'
);

Delete branch removes branch-specific manifests/snapshots, но не data files если они referenced главным branch-ем. Только orphaned files (только этим branch-ем referenced) cleaned.


Time travel queries

Time travel — query таблицы как она была в past. Несколько способов pinning:

-- 1. По snapshot ID
SELECT * FROM orders /*+ OPTIONS('scan.snapshot-id'='12345') */;

-- 2. По tag name
SELECT * FROM orders /*+ OPTIONS('scan.tag-name'='eod-2026-05-19') */;

-- 3. По timestamp (epoch milliseconds)
SELECT * FROM orders /*+ OPTIONS('scan.timestamp-millis'='1716067200000') */;

-- 4. По human-readable timestamp (Flink SQL)
SELECT * FROM orders FOR SYSTEM_TIME AS OF TIMESTAMP '2026-05-19 09:00:00';

-- 5. По branch
SELECT * FROM orders /*+ OPTIONS('branch'='feature-new-pricing') */;

При timestamp — Paimon finds snapshot который был active на этот timestamp (closest before-or-equal).

Use cases time travel:

  • Audit: “Что был state на 9:00, до bug fix?”
  • Compliance: “Show data на end of fiscal year”.
  • ML reproducibility: “Train new model на тех же features, что были N days ago”.
  • Debug: “Compare snapshot N vs N+1 to understand what changed”.

Snapshot diff и changelog

Paimon может generate changelog — описание changes между snapshots:

ALTER TABLE orders SET ('changelog-producer' = 'input');
-- 'input', 'full-compaction', 'lookup', 'none'

Changelog modes:

  • input: changes generated при write time (-U for old value, +U for new). Cheapest, но requires upstream Flink job знать deletes.
  • full-compaction: changes generated при compaction. Larger latency (после compaction visible), но accurate.
  • lookup: changes generated при write через lookup в existing data. Highest cost, но accurate even without upstream support.
  • none: changelog disabled. Streaming reads only get new INSERTs.

Streaming read с changelog:

-- Read как changelog (включая -U, +U records)
SELECT * FROM orders /*+ OPTIONS('streaming'='true', 'changelog-producer'='input') */;

Это позволяет downstream consume полную информацию о mutations (для materialized view maintenance, CDC propagation).


Snapshot inspection и debugging

Системные tables для inspection:

-- Все snapshots
SELECT * FROM orders$snapshots;

-- Все tags
SELECT * FROM orders$tags;

-- Все branches
SELECT * FROM orders$branches;

-- Files current snapshot
SELECT * FROM orders$files;
/*
partition | bucket | file_name           | level | min_key | max_key | record_count | size_bytes
date=...  | 0      | data-uuid-1.parquet | 0     | {id:1}  | {id:100}| 1000         | 50000
date=...  | 0      | data-uuid-2.parquet | 1     | {id:1}  | {id:500}| 5000         | 200000
*/

-- Schema versions
SELECT * FROM orders$schemas;

-- Manifests
SELECT * FROM orders$manifests;

-- Compactions
SELECT * FROM orders$audit_log;

Debugging slow query через EXPLAIN:

EXPLAIN SELECT * FROM orders WHERE id = 12345;
/*
== Optimized Physical Plan ==
TableSourceScan(table=[[paimon, db, orders]],
  fields=[id, name, ...],
  pushedFilters=[id=12345],
  bucketIds=[7])
  predicate: id = 12345
*/

Если bucketIds=[ALL] — bucket-key не используется для filter pushdown, query будет scan все buckets.


Production-перспектива: full strategy

Recommended retention strategy для production:

ALTER TABLE orders SET (
  -- Snapshot retention: keep last 24 hours + at least 50
  'snapshot.time-retained' = '24 h',
  'snapshot.num-retained.min' = '50',
  'snapshot.num-retained.max' = '5000',

  -- Daily tags: keep 90 days
  'tag.automatic-creation' = 'daily',
  'tag.creation-period' = 'daily',
  'tag.creation-delay' = '5 min',
  'tag.num-retained-max' = '90',
  'tag.default-time-retained' = '90 d',

  -- Weekly tags: keep 1 year (configure separately)
  -- Manual create каждую неделю или через scheduled SQL
);

Pattern: hot snapshots (24h, для real-time queries / quick rollback) + daily tags (90d, для analytics historical) + weekly/monthly tags (1y+, для compliance/DR).

Storage cost analysis:

  • 24h * 2880 snapshots/day = 2880 snapshots. Snapshot metadata mostly small (~KB). Data files shared между snapshots — single set.
  • 90 daily tags = 90 frozen snapshots. Each tag preserves data files от этой snapshot.
  • Если daily compaction merges most files в L2, tags reference relatively stable L2 files. Storage growth modest.

DR drill (recommended quarterly):

  1. Pick random old tag (например, eod-2026-02-15).
  2. CALL sys.rollback_to(table => 'test_orders', tag => 'eod-2026-02-15').
  3. Verify data consistent (counts match historical reports).
  4. Restore from pre-rollback-backup to current.

Этот drill catches retention misconfiguration (tag deleted раньше времени) или corruption (S3 object missing).


Проверка знанийKnowledge check
Production Paimon table: 50TB на S3, snapshot.num-retained.max=10000, snapshot.time-retained=30d, tag.automatic-creation=daily, tag.num-retained-max=365. После 12 месяцев работы storage cost вырос с 50TB до 800TB — растёт нелинейно. Что могло пойти не так и как diagnose?
ОтветAnswer
Гипотеза: tag retention слишком aggressive. 365 daily tags = 365 frozen snapshots каждый "fixes" set data files. Если data has high churn (много updates / deletes), files referenced каждым tag могут быть mostly different sets. Каждый tag retains ALL his data files. Diagnose: (1) SELECT tag_name, snapshot_id FROM orders$tags ORDER BY commit_time — посмотреть распределение. (2) Для каждого tag — SELECT sum(size_bytes) FROM orders$files /*+ OPTIONS('scan.tag-name'='...') */ — узнать unique size. (3) Если sum(tag sizes) >> current snapshot size — confirmation что tags hold много "dead" data. Reasons: (a) High update rate — каждый day overwrites ~10% records, files churning, каждый tag holds different 'point in time'. (b) Compaction не merges много (e.g., L2 redoing daily). (c) Auto-cleanup orphan files не работает (если tag references files, cleanup не removes). Fixes: (1) Reduce tag retention 365 -> 90 (90 daily tags + weekly tags для 1 year = меньше footprint, similar utility). (2) Schedule compaction более aggressive — bigger L2 files, более stable между tags. (3) Если updates not needed daily (could be eventual): write append-only с logical-delete (separate delete-marker), скруп scrap compaction режим. (4) Audit: для каждого tag посмотреть unique data — может пара "expensive" tags (during huge migration period). (5) Если cost critical и daily tags overkill — переходить на weekly с retention 52, или monthly.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём разница между snapshot и tag в Paimon?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5