Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 35 мин
Продвинутый
Apache IcebergSnapshotCommit ProtocolOCCTime TravelRollbackBranchTagWAPOptimistic Concurrency Control

Снимки, коммиты и time travel

В предыдущем уроке мы разобрали трёхуровневую иерархию метаданных. Центральное понятие в этой иерархии — snapshot (снимок): иммутабельная точка во времени, описывающая полное состояние таблицы. Каждая операция записи создаёт новый snapshot, а каталог атомарно переключает указатель на него.

В этом уроке — механика коммитов: как Iceberg обеспечивает ACID в распределённой среде без координатора, как работает optimistic concurrency control (OCC), какие операции создают snapshots, и как использовать time travel, branches и tags.

NOTE

Примеры кода — на Python с pyiceberg 0.11.1 (март 2026). Версия спецификации — V2.

Snapshot — иммутабельная точка во времени

Snapshot — это полное описание состояния таблицы в конкретный момент. Каждый snapshot содержит:

Анатомия Snapshot
snapshot-idУникальный 64-bit ID снимка. Генерируется как случайное число при создании. Используется для time travel: table.scan(snapshot_id=...).
parent-snapshot-idID предыдущего снимка. Образует цепочку (linked list) всей истории таблицы. У первого snapshot parent = null. Позволяет восстановить историю изменений.
sequence-numberМонотонно возрастающий номер. Используется для ordering delete files в merge-on-read: delete file с sequence-number=5 применяется к data files с sequence-number <= 5. Введён в V2.
timestamp-msEpoch milliseconds — время создания снимка. Используется для time travel по времени: table.scan(as_of_timestamp=...). Не обязательно монотонный — часы на разных writer-ах могут рассинхронизироваться.
manifest-listПуть к Avro-файлу manifest list. Содержит полный список manifest files, описывающих все data files и delete files этого снимка. Один snapshot = один manifest list.
summaryMap со статистикой операции: operation (append/overwrite/delete/replace), added-data-files, deleted-data-files, added-records, deleted-records, total-records, total-data-files. Позволяет оценить масштаб изменений без чтения manifest list.

Ключевое свойство: snapshots иммутабельны. После создания snapshot никогда не изменяется. Новая операция создаёт новый snapshot, ссылающийся на предыдущий через parent-snapshot-id.

Цепочка snapshots

Snapshot 1 (CREATE TABLE, empty)
 └─→ Snapshot 2 (INSERT 50K records)
 └─→ Snapshot 3 (INSERT 30K records)
 └─→ Snapshot 4 (DELETE WHERE region = 'EU')
 └─→ Snapshot 5 (ALTER TABLE ADD COLUMN) ← current

Каждый snapshot содержит полное состояние таблицы (через manifest list), а не дельту изменений. Manifest files переиспользуются между snapshots — если INSERT добавил 3 файла, новый manifest list содержит новый manifest (с 3 файлами) + ссылки на все существующие manifests из предыдущего snapshot.

Переиспользование manifest files между snapshots

Snapshot 3

Snapshot 3: состояние таблицы после второго INSERT. Manifest list содержит два manifest file — один от первого INSERT (M1) и один от второго (M2).
Manifest list 3Manifest list для snapshot 3. Содержит записи, указывающие на manifest M1 (из snapshot 2) и manifest M2 (новый, из snapshot 3). Manifest M1 не копируется — переиспользуется.

Snapshot 4

Snapshot 4: DELETE удалил часть данных. Manifest list содержит: M1 (из snapshot 2, без изменений), M2 (из snapshot 3, без изменений), и M3 — новый manifest с delete files.
Manifest list 4Manifest list для snapshot 4. Содержит M1, M2 (переиспользованы), и новый M3 с delete markers. Старые данные не переписываются — создаются delete файлы.
Общие manifest filesM1 и M2 физически существуют на storage в одном экземпляре, но ссылаются на них оба manifest list. Это экономит storage и ускоряет коммиты — не нужно копировать manifests.

Протокол коммита (OCC)

Iceberg использует optimistic concurrency control (OCC) для обеспечения ACID. Протокол коммита:

Протокол коммита Iceberg (Optimistic Concurrency Control)
  1. Прочитать текущий metadata file из каталога
Шаг 1: Writer читает текущий metadata file из каталога. Это 'base' — состояние, на котором базируются изменения. Writer запоминает версию metadata file.
  1. Записать data files, manifests, новый metadata file на storage
Шаг 2: Writer выполняет операцию: пишет data files на storage, создаёт новые manifest files, формирует новый manifest list, создаёт новый metadata file. Все файлы пишутся на storage, но каталог ещё не обновлён.
  1. Atomic CAS: if current == base, set current = new
Шаг 3: Atomic compare-and-swap в каталоге. Writer говорит: 'если текущий metadata file всё ещё X (тот, что я читал в шаге 1), замени его на Y (мой новый metadata file)'. Если X изменился — коммит отклоняется.
Успех +CAS успешен — каталог обновлён, новый snapshot видим всем reader-ам. Операция завершена. Файлы, записанные на storage, теперь часть таблицы.
Конфликт −CAS неуспешен — другой writer обновил метаданные между шагами 1 и 3. Writer должен перечитать текущий metadata file, проверить совместимость изменений (conflict resolution), и попробовать снова.

Реализация CAS в разных каталогах

КаталогМеханизм CAS
REST CatalogHTTP POST /v1/.../tables/{table} с base-table-metadata в body. Сервер проверяет, что base совпадает с текущим
Hive MetastoreALTER TABLE SET TBLPROPERTIES('metadata_location'=...) + optimistic locking через версию
AWS GlueUpdateTable API с VersionId — Glue возвращает ConcurrentModificationException при конфликте
JDBCUPDATE iceberg_tables SET metadata_location=? WHERE ... AND metadata_location=? — SQL-уровень CAS
Hadoop (legacy)Rename файла — v5.metadata.json.tmpv5.metadata.json. Atomic rename на HDFS, не atomic на S3!
WARNING

Hadoop каталог deprecated с Iceberg 1.6.0 и будет удалён. На object storage (S3, GCS) атомарный rename невозможен, что делает Hadoop каталог небезопасным для concurrent writes на S3. Используйте REST, Glue или JDBC каталоги.

Retry с conflict resolution

При конфликте writer не просто повторяет операцию — он проверяет совместимость изменений:

Writer A: INSERT INTO orders (region='EU') -- base: snapshot 5
Writer B: INSERT INTO orders (region='US') -- base: snapshot 5

Writer A коммитит → snapshot 6 (успех)
Writer B получает конфликт:
 1. Перечитывает metadata → видит snapshot 6
 2. Проверяет: мои изменения (добавление файлов в region='US')
 конфликтуют с изменениями A (добавление файлов в region='EU')?
 3. Нет конфликта → rebase на snapshot 6, повторный коммит → snapshot 7

Виды проверок при rebase:

Conflict resolution при OCC retry
Append + AppendДва INSERT в разные партиции — нет конфликта. Оба добавляют новые файлы, не трогая существующие. Rebase автоматический.
Append + DeleteINSERT + DELETE (если delete не затрагивает добавленные файлы) — нет конфликта. Rebase: новый snapshot включает и новые данные, и удаления.
Delete + DeleteДва DELETE на пересекающихся данных — потенциальный конфликт. Один writer мог удалить файлы, которые второй тоже хочет удалить. Требует validation: если оба удаляют одни и те же файлы — конфликт.
Overwrite + AnyOVERWRITE заменяет данные в определённом partition range. Если другой writer изменил данные в том же range — конфликт. OVERWRITE + OVERWRITE на пересекающихся partitions всегда конфликтует.
Schema change + WriteALTER TABLE изменил схему, пока writer записывал данные по старой схеме. Обычно не конфликт, если новые колонки nullable. Конфликт, если удалена колонка, на которую ссылается writer.
Compaction + WriteOPTIMIZE/compaction перезаписывает файлы (replace action). Concurrent write добавляет новые файлы. Нет конфликта — compaction не трогает новые файлы, writer не трогает compacted файлы.

Число retry настраивается свойством таблицы:

# При создании таблицы
table = catalog.create_table("db.orders",
 schema=schema,
 properties={
 "commit.retry.num-retries": "4", # максимум 4 retry
 "commit.retry.min-wait-ms": "100", # начальная задержка
 "commit.retry.max-wait-ms": "60000", # максимальная задержка
 "commit.retry.total-timeout-ms": "1800000", # общий timeout 30 мин
 }
)

Операции и типы snapshots

Каждый snapshot создаётся определённой операцией. Поле summary.operation в snapshot описывает тип:

ОперацияОписаниеЧто меняется
appendДобавление новых данных (INSERT)Новые data files добавляются
overwriteЗамена данных в партицииСтарые data files удалены + новые добавлены
replaceCompaction / OPTIMIZEData files заменены оптимизированными версиями, данные не изменились
deleteУдаление строкСоздание delete files (position или equality) или удаление целых data files
# Просмотр истории операций
for entry in table.history():
 snap = table.snapshot_by_id(entry.snapshot_id)
 if snap:
 op = snap.summary.get("operation", "unknown")
 added = snap.summary.get("added-records", "0")
 deleted = snap.summary.get("deleted-records", "0")
 print(f" v{snap.sequence_number}: {op} "
 f"+{added}/-{deleted} records "
 f"@ {snap.timestamp_ms}")

Sequence numbers (V2)

В V2 каждый snapshot получает sequence number — монотонно возрастающий номер. Sequence numbers решают проблему ordering при merge-on-read:

Snapshot 1 (seq=1): INSERT файлы A, B, C
Snapshot 2 (seq=2): DELETE WHERE id=42 (position delete file D)
Snapshot 3 (seq=3): INSERT файлы E, F

Delete file D (seq=2) применяется только к data files с seq ≤ 2 (A, B, C). Файлы E, F (seq=3) не затрагиваются delete file D, даже если они содержат id=42. Это гарантирует, что INSERT после DELETE не создаёт ложных удалений.

TIP

Sequence numbers — одна из ключевых причин использовать V2 (а не V1). Без них корректная работа position/equality delete files в присутствии concurrent writers невозможна.

Snapshot references: branches и tags

Iceberg поддерживает именованные ссылки на snapshots — аналог git refs:

Branches и Tags в Iceberg
Snapshot 1Первый snapshot — CREATE TABLE. Пустая таблица.
Snapshot 2INSERT 50K записей. Создано 5 data files.
Snapshot 3INSERT ещё 30K записей. Создано 3 data files.
Snapshot 4DELETE WHERE region='EU'. Создан position delete file. Текущий main snapshot.
main (branch)Основная ветка — мутабельная ссылка, всегда указывает на последний коммит. Каждый новый коммит перемещает main на новый snapshot. Аналог HEAD/main в git.
audit-q1 (tag)Тег — иммутабельная ссылка. Фиксирует snapshot 2 для аудита Q1 данных. Не перемещается при коммитах. Может иметь max-ref-age-ms для автоматического удаления.
staging (branch)Дополнительная ветка для WAP (Write-Audit-Publish). Данные пишутся в staging, проверяются, затем merge-ятся в main. Позволяет изолировать непроверенные данные.

Работа с branches и tags через pyiceberg

# Создание тега
table.manage_snapshots().create_tag(
 snapshot_id=table.current_snapshot().snapshot_id,
 tag_name="audit-2025-q1"
).commit()

# Создание ветки
table.manage_snapshots().create_branch(
 snapshot_id=table.current_snapshot().snapshot_id,
 branch_name="staging"
).commit()

# Чтение данных из тега
arrow_table = table.scan(branch="audit-2025-q1").to_arrow()

# Список refs
for ref_name, ref in table.metadata.refs.items():
 print(f" {ref_name}: snapshot={ref.snapshot_id}, type={ref.snapshot_ref_type}")

Write-Audit-Publish (WAP)

WAP — паттерн для изоляции непроверенных данных:

Write-Audit-Publish (WAP) flow
  1. Создать branch ‘staging’ от main
Шаг 1: Writer создаёт ветку 'staging' от текущего main. Staging — изолированная область для записи непроверенных данных.
  1. Записать данные в staging branch
Шаг 2: Writer записывает данные в staging branch. Данные видимы только при явном чтении из staging branch. Читатели main видят старые данные.
  1. Audit: проверить качество данных в staging
Шаг 3: Data quality проверки на staging. Запросы к staging branch: COUNT(*), NULL checks, range validation, дубликаты. Если проверки не пройдены — staging удаляется, данные не попадают в main.

4a. Publish: fast-forward main → staging snapshot

Проверки пройдены — fast-forward merge staging в main. Данные мгновенно видимы всем reader-ам. Staging branch удаляется.

4b. Reject: удалить staging branch

Проверки не пройдены — staging branch удаляется. Data files на storage остаются как мусор до следующего expire_snapshots. Main не затронут.
# WAP: Write-Audit-Publish
# 1. Создать staging branch
table.manage_snapshots().create_branch(
 snapshot_id=table.current_snapshot().snapshot_id,
 branch_name="staging"
).commit()

# 2. Записать данные в staging
# (в pyiceberg — через wap.branch или table properties)

# 3. Audit
staging_data = table.scan(branch="staging").to_arrow()
null_count = staging_data.column("amount").null_count
assert null_count == 0, f"Found {null_count} NULLs in amount"

# 4a. Publish (fast-forward main to staging)
staging_snapshot = table.metadata.refs["staging"].snapshot_id
table.manage_snapshots().fast_forward(
 branch_name="main",
 update_to=staging_snapshot
).commit()

# Cleanup
table.manage_snapshots().remove_branch("staging").commit()

Time travel

Iceberg поддерживает чтение данных в прошлых состояниях — time travel. Три способа:

1. По snapshot ID

# Найти snapshot ID из истории
history = table.history()
for entry in history:
 snap = table.snapshot_by_id(entry.snapshot_id)
 if snap:
 print(f" {snap.snapshot_id}: {snap.summary.get('operation')} "
 f"@ {snap.timestamp_ms}")

# Чтение конкретного snapshot
old_data = table.scan(snapshot_id=2894710293847102938).to_arrow()

2. По timestamp

# Чтение состояния на определённый момент
from datetime import datetime, timezone

# Состояние таблицы на 15 января 2025
point_in_time = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
old_data = table.scan(
 as_of_timestamp=int(point_in_time.timestamp() * 1000)
).to_arrow()
WARNING

Time travel по timestamp находит snapshot с timestamp-ms ≤ requested_timestamp. Если writer-ы имеют рассинхронизированные часы, snapshot с timestamp T1 может быть создан после snapshot с timestamp T2, где T1 < T2. Для точного time travel используйте snapshot_id.

3. По branch/tag

# Чтение из тега
audited_data = table.scan(branch="audit-2025-q1").to_arrow()

# Чтение из ветки
staging_data = table.scan(branch="staging").to_arrow()

SQL-синтаксис time travel (в Spark/Trino)

-- По snapshot ID
SELECT * FROM orders VERSION AS OF 2894710293847102938;

-- По timestamp
SELECT * FROM orders TIMESTAMP AS OF '2025-01-15 12:00:00';

-- По branch/tag
SELECT * FROM orders VERSION AS OF 'audit-2025-q1';

-- Incremental read (два snapshot-а)
SELECT * FROM orders
 BETWEEN SNAPSHOT 2894710293847102938
 AND SNAPSHOT 3497810934857103984;

Rollback

Rollback в Iceberg — это не удаление истории, а переключение current-snapshot-id на предыдущий snapshot:

Rollback — переключение current-snapshot-id
Snap 1Snapshot 1: исходное состояние. Становится текущим после rollback.
Snap 2Snapshot 2: данные добавлены. После rollback — всё ещё существует, но не является текущим.
Snap 3Snapshot 3: проблемные данные. Было current до rollback. После rollback — остаётся в metadata, но current-snapshot-id указывает на Snap 2.
До rollbackcurrent-snapshot-id указывает на Snapshot 3 с плохими данными. Все reader-ы видят состояние Snap 3.
rollback
После rollbackcurrent-snapshot-id переключён на Snapshot 2. Reader-ы теперь видят состояние Snap 2. Snap 3 сохраняется в metadata для audit trail.
# Rollback к предыдущему snapshot
bad_snap = table.current_snapshot().snapshot_id
prev_snap = table.current_snapshot().parent_snapshot_id

table.manage_snapshots().rollback_to(
 snapshot_id=prev_snap
).commit()

print(f"Rolled back from {bad_snap} to {prev_snap}")
print(f"Current snapshot: {table.current_snapshot().snapshot_id}")

Rollback:

  • Мгновенный — только обновление указателя в metadata file + atomic CAS в каталоге
  • Без копирования данных — старые data files уже на storage
  • Обратимый — snapshot 3 не удалён, можно снова переключиться на него

Cherry-pick

Cherry-pick — применение изменений из конкретного snapshot к текущему состоянию:

# Cherry-pick: применить изменения из snapshot X к текущему состоянию
table.manage_snapshots().cherry_pick(
 snapshot_id=specific_snapshot_id
).commit()

Cherry-pick полезен для восстановления конкретной операции после rollback: откатили на snapshot 2, но хотим применить INSERT из snapshot 4 (пропустив проблемный DELETE из snapshot 3).

Snapshot lifecycle management

Snapshots накапливаются и занимают storage (metadata files + data files, на которые ссылаются только старые snapshots). Управление lifecycle:

expire_snapshots

Удаляет старые snapshots и связанные data files, которые больше не нужны:

# Удалить snapshots старше 7 дней
from datetime import datetime, timezone, timedelta

cutoff = datetime.now(timezone.utc) - timedelta(days=7)
cutoff_ms = int(cutoff.timestamp() * 1000)

# Через table properties (рекомендуется)
with table.update_properties() as props:
 props.set("history.expire.max-snapshot-age-ms", str(7 * 24 * 3600 * 1000))
 props.set("history.expire.min-snapshots-to-keep", "3")
DANGER

После expire_snapshots time travel к удалённым snapshots невозможен. Data files, на которые ссылались только удалённые snapshots, физически удаляются с storage. Перед expire убедитесь, что все необходимые теги созданы.

Retention policies через properties

table_properties = {
 # Snapshot retention
 "history.expire.max-snapshot-age-ms": str(7 * 24 * 3600 * 1000), # 7 дней
 "history.expire.min-snapshots-to-keep": "5", # минимум 5 снимков

 # Ref retention
 "refs.audit-2025-q1.max-ref-age-ms": str(90 * 24 * 3600 * 1000), # тег живёт 90 дней
}

Сравнение с Delta Lake

АспектDelta LakeApache Iceberg
Concurrency controlOCC через rename (HDFS) или conflict resolutionOCC через atomic CAS в каталоге
Time travelПо версии или timestampПо snapshot ID, timestamp, branch/tag
Branching нативной поддержкиBranches + tags (refs)
WAP нативной поддержкиВстроенный механизм через branches
RollbackRESTORE TABLE ... TO VERSION nmanage_snapshots().rollback_to()
Sequence numbers (версии файлов)Монотонные sequence numbers (V2)
Snapshot содержимоеДельта (add/remove actions)Полное состояние (manifest list)

Итоги

  1. Snapshot = иммутабельная точка во времени с полным описанием состояния таблицы
  2. OCC с retry: compare-and-swap в каталоге + conflict resolution при конфликтах
  3. Manifest переиспользование: новые snapshots ссылаются на существующие manifests, не копируя их
  4. Branches и tags: именованные refs для WAP, аудита, изоляции данных
  5. Time travel по трём осям: snapshot ID (точный), timestamp (приблизительный), branch/tag (именованный)
  6. Rollback мгновенный: переключение указателя, без копирования данных
  7. Sequence numbers (V2): правильный ordering delete files при concurrent writes
Iceberg time travel в Spark — практика

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Два writer'а одновременно выполняют append в Iceberg-таблицу. Оба читают current snapshot S5. Writer A коммитит S6. Writer B пытается коммитить свой snapshot. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6