Снимки, коммиты и time travel
В предыдущем уроке мы разобрали трёхуровневую иерархию метаданных. Центральное понятие в этой иерархии — snapshot (снимок): иммутабельная точка во времени, описывающая полное состояние таблицы. Каждая операция записи создаёт новый snapshot, а каталог атомарно переключает указатель на него.
В этом уроке — механика коммитов: как Iceberg обеспечивает ACID в распределённой среде без координатора, как работает optimistic concurrency control (OCC), какие операции создают snapshots, и как использовать time travel, branches и tags.
Примеры кода — на Python с pyiceberg 0.11.1 (март 2026). Версия спецификации — V2.
Snapshot — иммутабельная точка во времени
Snapshot — это полное описание состояния таблицы в конкретный момент. Каждый snapshot содержит:
Ключевое свойство: snapshots иммутабельны. После создания snapshot никогда не изменяется. Новая операция создаёт новый snapshot, ссылающийся на предыдущий через parent-snapshot-id.
Цепочка snapshots
Snapshot 1 (CREATE TABLE, empty)
└─→ Snapshot 2 (INSERT 50K records)
└─→ Snapshot 3 (INSERT 30K records)
└─→ Snapshot 4 (DELETE WHERE region = 'EU')
└─→ Snapshot 5 (ALTER TABLE ADD COLUMN) ← current
Каждый snapshot содержит полное состояние таблицы (через manifest list), а не дельту изменений. Manifest files переиспользуются между snapshots — если INSERT добавил 3 файла, новый manifest list содержит новый manifest (с 3 файлами) + ссылки на все существующие manifests из предыдущего snapshot.
Snapshot 3
Snapshot 3: состояние таблицы после второго INSERT. Manifest list содержит два manifest file — один от первого INSERT (M1) и один от второго (M2).Snapshot 4
Snapshot 4: DELETE удалил часть данных. Manifest list содержит: M1 (из snapshot 2, без изменений), M2 (из snapshot 3, без изменений), и M3 — новый manifest с delete files.Протокол коммита (OCC)
Iceberg использует optimistic concurrency control (OCC) для обеспечения ACID. Протокол коммита:
- Прочитать текущий metadata file из каталога
- Записать data files, manifests, новый metadata file на storage
- Atomic CAS: if current == base, set current = new
Реализация CAS в разных каталогах
| Каталог | Механизм CAS |
|---|---|
| REST Catalog | HTTP POST /v1/.../tables/{table} с base-table-metadata в body. Сервер проверяет, что base совпадает с текущим |
| Hive Metastore | ALTER TABLE SET TBLPROPERTIES('metadata_location'=...) + optimistic locking через версию |
| AWS Glue | UpdateTable API с VersionId — Glue возвращает ConcurrentModificationException при конфликте |
| JDBC | UPDATE iceberg_tables SET metadata_location=? WHERE ... AND metadata_location=? — SQL-уровень CAS |
| Hadoop (legacy) | Rename файла — v5.metadata.json.tmp → v5.metadata.json. Atomic rename на HDFS, не atomic на S3! |
Hadoop каталог deprecated с Iceberg 1.6.0 и будет удалён. На object storage (S3, GCS) атомарный rename невозможен, что делает Hadoop каталог небезопасным для concurrent writes на S3. Используйте REST, Glue или JDBC каталоги.
Retry с conflict resolution
При конфликте writer не просто повторяет операцию — он проверяет совместимость изменений:
Writer A: INSERT INTO orders (region='EU') -- base: snapshot 5
Writer B: INSERT INTO orders (region='US') -- base: snapshot 5
Writer A коммитит → snapshot 6 (успех)
Writer B получает конфликт:
1. Перечитывает metadata → видит snapshot 6
2. Проверяет: мои изменения (добавление файлов в region='US')
конфликтуют с изменениями A (добавление файлов в region='EU')?
3. Нет конфликта → rebase на snapshot 6, повторный коммит → snapshot 7
Виды проверок при rebase:
Число retry настраивается свойством таблицы:
# При создании таблицы
table = catalog.create_table("db.orders",
schema=schema,
properties={
"commit.retry.num-retries": "4", # максимум 4 retry
"commit.retry.min-wait-ms": "100", # начальная задержка
"commit.retry.max-wait-ms": "60000", # максимальная задержка
"commit.retry.total-timeout-ms": "1800000", # общий timeout 30 мин
}
)
Операции и типы snapshots
Каждый snapshot создаётся определённой операцией. Поле summary.operation в snapshot описывает тип:
| Операция | Описание | Что меняется |
|---|---|---|
append | Добавление новых данных (INSERT) | Новые data files добавляются |
overwrite | Замена данных в партиции | Старые data files удалены + новые добавлены |
replace | Compaction / OPTIMIZE | Data files заменены оптимизированными версиями, данные не изменились |
delete | Удаление строк | Создание delete files (position или equality) или удаление целых data files |
# Просмотр истории операций
for entry in table.history():
snap = table.snapshot_by_id(entry.snapshot_id)
if snap:
op = snap.summary.get("operation", "unknown")
added = snap.summary.get("added-records", "0")
deleted = snap.summary.get("deleted-records", "0")
print(f" v{snap.sequence_number}: {op} "
f"+{added}/-{deleted} records "
f"@ {snap.timestamp_ms}")
Sequence numbers (V2)
В V2 каждый snapshot получает sequence number — монотонно возрастающий номер. Sequence numbers решают проблему ordering при merge-on-read:
Snapshot 1 (seq=1): INSERT файлы A, B, C
Snapshot 2 (seq=2): DELETE WHERE id=42 (position delete file D)
Snapshot 3 (seq=3): INSERT файлы E, F
Delete file D (seq=2) применяется только к data files с seq ≤ 2 (A, B, C). Файлы E, F (seq=3) не затрагиваются delete file D, даже если они содержат id=42. Это гарантирует, что INSERT после DELETE не создаёт ложных удалений.
Sequence numbers — одна из ключевых причин использовать V2 (а не V1). Без них корректная работа position/equality delete files в присутствии concurrent writers невозможна.
Snapshot references: branches и tags
Iceberg поддерживает именованные ссылки на snapshots — аналог git refs:
Работа с branches и tags через pyiceberg
# Создание тега
table.manage_snapshots().create_tag(
snapshot_id=table.current_snapshot().snapshot_id,
tag_name="audit-2025-q1"
).commit()
# Создание ветки
table.manage_snapshots().create_branch(
snapshot_id=table.current_snapshot().snapshot_id,
branch_name="staging"
).commit()
# Чтение данных из тега
arrow_table = table.scan(branch="audit-2025-q1").to_arrow()
# Список refs
for ref_name, ref in table.metadata.refs.items():
print(f" {ref_name}: snapshot={ref.snapshot_id}, type={ref.snapshot_ref_type}")
Write-Audit-Publish (WAP)
WAP — паттерн для изоляции непроверенных данных:
- Создать branch ‘staging’ от main
- Записать данные в staging branch
- Audit: проверить качество данных в staging
4a. Publish: fast-forward main → staging snapshot
Проверки пройдены — fast-forward merge staging в main. Данные мгновенно видимы всем reader-ам. Staging branch удаляется.4b. Reject: удалить staging branch
Проверки не пройдены — staging branch удаляется. Data files на storage остаются как мусор до следующего expire_snapshots. Main не затронут.# WAP: Write-Audit-Publish
# 1. Создать staging branch
table.manage_snapshots().create_branch(
snapshot_id=table.current_snapshot().snapshot_id,
branch_name="staging"
).commit()
# 2. Записать данные в staging
# (в pyiceberg — через wap.branch или table properties)
# 3. Audit
staging_data = table.scan(branch="staging").to_arrow()
null_count = staging_data.column("amount").null_count
assert null_count == 0, f"Found {null_count} NULLs in amount"
# 4a. Publish (fast-forward main to staging)
staging_snapshot = table.metadata.refs["staging"].snapshot_id
table.manage_snapshots().fast_forward(
branch_name="main",
update_to=staging_snapshot
).commit()
# Cleanup
table.manage_snapshots().remove_branch("staging").commit()
Time travel
Iceberg поддерживает чтение данных в прошлых состояниях — time travel. Три способа:
1. По snapshot ID
# Найти snapshot ID из истории
history = table.history()
for entry in history:
snap = table.snapshot_by_id(entry.snapshot_id)
if snap:
print(f" {snap.snapshot_id}: {snap.summary.get('operation')} "
f"@ {snap.timestamp_ms}")
# Чтение конкретного snapshot
old_data = table.scan(snapshot_id=2894710293847102938).to_arrow()
2. По timestamp
# Чтение состояния на определённый момент
from datetime import datetime, timezone
# Состояние таблицы на 15 января 2025
point_in_time = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
old_data = table.scan(
as_of_timestamp=int(point_in_time.timestamp() * 1000)
).to_arrow()
Time travel по timestamp находит snapshot с timestamp-ms ≤ requested_timestamp. Если writer-ы имеют рассинхронизированные часы, snapshot с timestamp T1 может быть создан после snapshot с timestamp T2, где T1 < T2. Для точного time travel используйте snapshot_id.
3. По branch/tag
# Чтение из тега
audited_data = table.scan(branch="audit-2025-q1").to_arrow()
# Чтение из ветки
staging_data = table.scan(branch="staging").to_arrow()
SQL-синтаксис time travel (в Spark/Trino)
-- По snapshot ID
SELECT * FROM orders VERSION AS OF 2894710293847102938;
-- По timestamp
SELECT * FROM orders TIMESTAMP AS OF '2025-01-15 12:00:00';
-- По branch/tag
SELECT * FROM orders VERSION AS OF 'audit-2025-q1';
-- Incremental read (два snapshot-а)
SELECT * FROM orders
BETWEEN SNAPSHOT 2894710293847102938
AND SNAPSHOT 3497810934857103984;
Rollback
Rollback в Iceberg — это не удаление истории, а переключение current-snapshot-id на предыдущий snapshot:
# Rollback к предыдущему snapshot
bad_snap = table.current_snapshot().snapshot_id
prev_snap = table.current_snapshot().parent_snapshot_id
table.manage_snapshots().rollback_to(
snapshot_id=prev_snap
).commit()
print(f"Rolled back from {bad_snap} to {prev_snap}")
print(f"Current snapshot: {table.current_snapshot().snapshot_id}")
Rollback:
- Мгновенный — только обновление указателя в metadata file + atomic CAS в каталоге
- Без копирования данных — старые data files уже на storage
- Обратимый — snapshot 3 не удалён, можно снова переключиться на него
Cherry-pick
Cherry-pick — применение изменений из конкретного snapshot к текущему состоянию:
# Cherry-pick: применить изменения из snapshot X к текущему состоянию
table.manage_snapshots().cherry_pick(
snapshot_id=specific_snapshot_id
).commit()
Cherry-pick полезен для восстановления конкретной операции после rollback: откатили на snapshot 2, но хотим применить INSERT из snapshot 4 (пропустив проблемный DELETE из snapshot 3).
Snapshot lifecycle management
Snapshots накапливаются и занимают storage (metadata files + data files, на которые ссылаются только старые snapshots). Управление lifecycle:
expire_snapshots
Удаляет старые snapshots и связанные data files, которые больше не нужны:
# Удалить snapshots старше 7 дней
from datetime import datetime, timezone, timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
cutoff_ms = int(cutoff.timestamp() * 1000)
# Через table properties (рекомендуется)
with table.update_properties() as props:
props.set("history.expire.max-snapshot-age-ms", str(7 * 24 * 3600 * 1000))
props.set("history.expire.min-snapshots-to-keep", "3")
После expire_snapshots time travel к удалённым snapshots невозможен. Data files, на которые ссылались только удалённые snapshots, физически удаляются с storage. Перед expire убедитесь, что все необходимые теги созданы.
Retention policies через properties
table_properties = {
# Snapshot retention
"history.expire.max-snapshot-age-ms": str(7 * 24 * 3600 * 1000), # 7 дней
"history.expire.min-snapshots-to-keep": "5", # минимум 5 снимков
# Ref retention
"refs.audit-2025-q1.max-ref-age-ms": str(90 * 24 * 3600 * 1000), # тег живёт 90 дней
}
Сравнение с Delta Lake
| Аспект | Delta Lake | Apache Iceberg |
|---|---|---|
| Concurrency control | OCC через rename (HDFS) или conflict resolution | OCC через atomic CAS в каталоге |
| Time travel | По версии или timestamp | По snapshot ID, timestamp, branch/tag |
| Branching | нативной поддержки | Branches + tags (refs) |
| WAP | нативной поддержки | Встроенный механизм через branches |
| Rollback | RESTORE TABLE ... TO VERSION n | manage_snapshots().rollback_to() |
| Sequence numbers | (версии файлов) | Монотонные sequence numbers (V2) |
| Snapshot содержимое | Дельта (add/remove actions) | Полное состояние (manifest list) |
Итоги
- Snapshot = иммутабельная точка во времени с полным описанием состояния таблицы
- OCC с retry: compare-and-swap в каталоге + conflict resolution при конфликтах
- Manifest переиспользование: новые snapshots ссылаются на существующие manifests, не копируя их
- Branches и tags: именованные refs для WAP, аудита, изоляции данных
- Time travel по трём осям: snapshot ID (точный), timestamp (приблизительный), branch/tag (именованный)
- Rollback мгновенный: переключение указателя, без копирования данных
- Sequence numbers (V2): правильный ordering delete files при concurrent writes