Time Travel, Checkpoints и VACUUM
В предыдущих уроках мы разобрали, как Delta Lake хранит историю изменений в transaction log. Каждый коммит — отдельный JSON-файл, remove action не удаляет Parquet-файл с диска. Это создаёт два важных следствия:
- Time travel — возможность прочитать таблицу в любом прошлом состоянии
- Необходимость VACUUM — физическая очистка файлов, которые больше не нужны
Этот урок — глубокий разбор обоих механизмов, плюс детали checkpoint-механики.
Time Travel: чтение прошлых версий
Delta Lake поддерживает два способа time travel:
Time Travel через delta-rs
from deltalake import DeltaTable
# Чтение текущей версии
dt = DeltaTable("./my_table")
print(f"Текущая версия: {dt.version()}")
df_current = dt.to_pandas()
print(f"Строк: {len(df_current)}")
# Time travel по версии
dt_v5 = DeltaTable("./my_table", version=5)
df_v5 = dt_v5.to_pandas()
print(f"Строк в версии 5: {len(df_v5)}")
# Time travel по timestamp (RFC 3339)
dt_ts = DeltaTable("./my_table", version="2025-01-15T10:00:00Z")
df_ts = dt_ts.to_pandas()
print(f"Строк на 2025-01-15: {len(df_ts)}")
# Сравнение версий
v_old = DeltaTable("./my_table", version=3)
v_new = DeltaTable("./my_table", version=10)
files_old = set(v_old.file_uris())
files_new = set(v_new.file_uris())
added = files_new - files_old
removed = files_old - files_new
print(f"Файлов добавлено: {len(added)}")
print(f"Файлов удалено: {len(removed)}")
Как работает time travel изнутри
Когда клиент запрашивает VERSION AS OF 5:
- Найти последний checkpoint с версией ≤ 5
- Загрузить checkpoint (например, v0.checkpoint.parquet)
- Replay JSON-коммитов v1, v2, v3, v4, v5
- Reconcile:
add−remove= активные файлы на версии 5 - Прочитать только эти Parquet-файлы
Time travel работает, пока Parquet-файлы физически существуют на диске. Если файл был удалён (remove action) и затем стёрт VACUUM — time travel к версии, которая ссылалась на этот файл, завершится ошибкой FileNotFoundException.
Ограничения time travel
- Retention period: по умолчанию
VACUUMудаляет файлы старше 7 дней. Time travel гарантирован только в пределах retention. - Log retention: JSON-коммиты тоже могут быть удалены. По умолчанию
delta.logRetentionDuration = 30 days. Без JSON-файлов log replay невозможен, даже если Parquet-файлы на месте. - Checkpoint coverage: если checkpoint не покрывает нужную версию и JSON удалены — time travel невозможен.
Checkpoint: детальная механика
Зачем checkpoints
Без checkpoints чтение таблицы с 100 000 версий требует replay 100 000 JSON-файлов. Каждый файл — HTTP-запрос к object storage (~50-100ms). Это 5 000–10 000 секунд только на метаданные.
Checkpoint — Parquet-файл с накопленным состоянием. Одно чтение вместо тысяч.
Trigger
По умолчанию checkpoint создаётся каждые 10 коммитов (delta.checkpointInterval = 10). Проверка:
if version % checkpointInterval == 0:
create_checkpoint(version)
Single-Part Checkpoint
Для таблиц с умеренным количеством файлов (до сотен тысяч):
_delta_log/
├── 00000000000000000010.checkpoint.parquet ← один файл
├── _last_checkpoint ← {"version": 10, "size": 42}
Parquet-файл содержит одну строку на каждый action в накопленном состоянии:
- Все активные
add(файлы, которые добавлены и не удалены) - Текущий
metaData - Текущий
protocol - Все
txnдействия
Multi-Part Checkpoint
Для очень больших таблиц (миллионы файлов) один Parquet-файл может быть слишком большим. Multi-part checkpoint разбивает его:
_delta_log/
├── 00000000000000000100.checkpoint.0000000001.0000000004.parquet
├── 00000000000000000100.checkpoint.0000000002.0000000004.parquet
├── 00000000000000000100.checkpoint.0000000003.0000000004.parquet
├── 00000000000000000100.checkpoint.0000000004.0000000004.parquet
├── _last_checkpoint ← {"version": 100, "size": 15000, "parts": 4}
Формат имени: {version}.checkpoint.{part_number}.{total_parts}.parquet. Все части читаются параллельно для ускорения.
V2 Checkpoints (Writer V7)
V2 Checkpoint = main file + sidecar files
V2 Checkpoints — улучшенный формат, введённый в Delta Lake 4.0 (Writer V7). Вместо одного большого Parquet-файла: основной файл (metaData + protocol) + sidecar files (file actions). Ускоряет парсинг для простых запросов.Алгоритм Log Replay: пошагово
Полный алгоритм восстановления состояния таблицы:
- Прочитать _last_checkpoint
- Checkpoint найден?
Загрузить checkpoint(N).parquet
Загрузить Parquet checkpoint. Получить базовое состояние: set активных файлов (add actions), текущие metaData, protocol, txn. Для multi-part — читать все parts параллельно.Пустое состояние, N=0
Нет checkpoint — начинаем с пустого состояния. Все JSON-файлы с версии 0 будут применены последовательно.- Listing: JSON-файлы с версией > N
- Replay: применить JSON actions по порядку
Состояние: active_files + metaData + protocol
Результат: текущее состояние таблицы. active_files = все файлы с add, у которых нет соответствующего remove. Текущая схема из последнего metaData. Протокол из последнего protocol.Reconcile: add vs remove
Ключевая операция — определить, какие файлы активны:
# Псевдокод log replay
active_files = {} # path → add_action
# Из checkpoint (если есть)
for action in checkpoint_actions:
if "add" in action:
active_files[action["add"]["path"]] = action["add"]
# Из JSON-коммитов после checkpoint
for json_file in sorted_json_files_after_checkpoint:
for action in read_ndjson(json_file):
if "add" in action:
active_files[action["add"]["path"]] = action["add"]
elif "remove" in action:
active_files.pop(action["remove"]["path"], None)
elif "metaData" in action:
current_metadata = action["metaData"]
elif "protocol" in action:
current_protocol = action["protocol"]
# active_files — текущее состояние таблицы
VACUUM: физическая очистка файлов
Проблема
Transaction log растёт: каждый UPDATE создаёт новые файлы (add) и помечает старые (remove). Старые Parquet-файлы остаются на диске для time travel. Без очистки — storage costs растут линейно.
Что делает VACUUM
VACUUM удаляет Parquet-файлы, которые:
- Не являются активными (нет соответствующего
addв текущем состоянии) - Старше retention period (по умолчанию 7 дней)
Storage освобождён. Time travel ограничен retention period.
Результат: storage освобождён. Time travel работает только в пределах retention period. Файлы, старше retention, удалены безвозвратно.VACUUM через delta-rs
from deltalake import DeltaTable
dt = DeltaTable("./my_table")
# Dry run: показать, какие файлы будут удалены
files_to_delete = dt.vacuum(dry_run=True)
print(f"Файлов к удалению: {len(files_to_delete)}")
for f in files_to_delete[:5]:
print(f" {f}")
# Выполнить VACUUM с retention 168 часов (7 дней, default)
deleted = dt.vacuum(retention_hours=168, dry_run=False)
print(f"Удалено: {len(deleted)} файлов")
# Агрессивный VACUUM: retention 0 часов
# ОПАСНО: удалит ВСЕ неактивные файлы, time travel станет невозможен
deleted = dt.vacuum(
retention_hours=0,
dry_run=False,
enforce_retention_duration=False # отключить safety check
)
Safety guarantees
Delta Lake защищает от случайного удаления:
-
Minimum retention: по умолчанию VACUUM с
retention < 168 hours(7 дней) выбрасывает ошибку. Нужно явно установитьenforce_retention_duration=False. -
Concurrent readers: VACUUM учитывает, что другие клиенты могут читать таблицу. Файлы удаляются, только если они гарантированно не нужны никому (старше retention).
-
Log retention: отдельная настройка
delta.logRetentionDuration(по умолчанию 30 дней) — сколько хранить JSON-коммиты. VACUUM не удаляет JSON-коммиты — это делает log cleanup.
# Настройки retention
dt = DeltaTable("./my_table")
config = dt.metadata().configuration
# Файловый retention (для VACUUM)
# delta.deletedFileRetentionDuration = "interval 7 days" (default)
print(f"File retention: {config.get('delta.deletedFileRetentionDuration', '7 days (default)')}")
# Log retention (для JSON cleanup)
# delta.logRetentionDuration = "interval 30 days" (default)
print(f"Log retention: {config.get('delta.logRetentionDuration', '30 days (default)')}")
# Checkpoint interval
# delta.checkpointInterval = 10 (default)
print(f"Checkpoint interval: {config.get('delta.checkpointInterval', '10 (default)')}")
В Delta Lake 4.1 для catalog-managed tables ручной VACUUM заблокирован — управление жизненным циклом данных переходит к каталогу. Это предотвращает конфликты между VACUUM и каталоговыми операциями (governance, access control, foreign key constraints).
Orphaned files
VACUUM также удаляет orphaned files — Parquet-файлы в директории таблицы, которые не упоминаются ни в одном action в transaction log:
- Файлы от прерванных записей (crash в середине операции)
- Файлы, созданные другими инструментами (случайно положили в директорию таблицы)
- Temporary файлы, которые не были очищены
# Обнаружение orphaned files
import os
from pathlib import Path
table_dir = Path("./my_table")
dt = DeltaTable(str(table_dir))
# Файлы, известные Delta Lake
known_files = set()
for uri in dt.file_uris():
known_files.add(Path(uri).name)
# Все Parquet-файлы на диске
all_parquet = set()
for f in table_dir.rglob("*.parquet"):
if "_delta_log" not in str(f):
all_parquet.add(f.name)
orphaned = all_parquet - known_files
print(f"Orphaned files: {len(orphaned)}")
Взаимосвязь: Time Travel ↔ Checkpoints ↔ VACUUM
Три механизма тесно связаны:
| Параметр | Default | Влияние |
|---|---|---|
delta.checkpointInterval | 10 | Частота checkpoints. Меньше → быстрее log replay, больше checkpoint-файлов |
delta.deletedFileRetentionDuration | 7 days | Retention для VACUUM. Дольше → больше time travel, больше storage |
delta.logRetentionDuration | 30 days | Retention для JSON-логов. Дольше → глубже history() и time travel |
delta.enableExpiredLogCleanup | true | Автоматическая очистка старых JSON-коммитов |
Типичные конфигурации:
# Analytics warehouse: длинный retention для аудита
# delta.deletedFileRetentionDuration = "interval 30 days"
# delta.logRetentionDuration = "interval 90 days"
# Streaming pipeline: короткий retention для экономии storage
# delta.deletedFileRetentionDuration = "interval 2 days"
# delta.logRetentionDuration = "interval 7 days"
# delta.checkpointInterval = 5
# Compliance: максимальный retention
# delta.deletedFileRetentionDuration = "interval 365 days"
# delta.logRetentionDuration = "interval 365 days"
Практический workflow: VACUUM + Time Travel
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
# 1. Создаём таблицу и записываем несколько версий
dt = DeltaTable("./demo_vacuum")
# Версия 0: initial data
# Версия 1: update (remove old + add new)
# Версия 2: delete some rows
# Версия 3: insert more rows
print(f"Текущая версия: {dt.version()}")
# 2. Time travel работает
for v in range(dt.version() + 1):
dt_v = DeltaTable("./demo_vacuum", version=v)
print(f" v{v}: {len(dt_v.to_pandas())} строк, {len(dt_v.file_uris())} файлов")
# 3. VACUUM dry run
files = dt.vacuum(dry_run=True, retention_hours=0)
print(f"\nVACUUM dry run: {len(files)} файлов к удалению")
# 4. VACUUM execute
deleted = dt.vacuum(
retention_hours=0,
dry_run=False,
enforce_retention_duration=False
)
print(f"Удалено: {len(deleted)} файлов")
# 5. Time travel ПОСЛЕ vacuum — ошибка для старых версий!
try:
dt_old = DeltaTable("./demo_vacuum", version=0)
df = dt_old.to_pandas()
except Exception as e:
print(f"\nTime travel v0 после VACUUM: {type(e).__name__}")
print(f" {e}")
Перед VACUUM всегда делайте dry run (dry_run=True), чтобы увидеть список файлов к удалению. Особенно если меняете retention — убедитесь, что не удаляете нужные для time travel файлы.
Итоги
| Механизм | Суть |
|---|---|
| Time Travel (version) | DeltaTable(path, version=N) — точная версия по номеру коммита |
| Time Travel (timestamp) | DeltaTable(path, version="2025-01-15T...") — последняя версия до timestamp |
| Checkpoint (single) | Один Parquet-файл с полным состоянием. Каждые 10 коммитов. |
| Checkpoint (multi-part) | Несколько Parquet-файлов для больших таблиц. Параллельное чтение. |
| V2 Checkpoint | Main file + sidecars. Быстрый доступ к schema. Version checksums. |
| Log Replay | _last_checkpoint → checkpoint.parquet → replay JSON → active files |
| VACUUM | Удаляет неактивные файлы старше retention period. Dry run обязателен. |
| Orphaned files | Файлы не в log — тоже удаляются VACUUM |
| Safety | Retention < 7 days требует явного override. Catalog-managed tables блокируют ручной VACUUM. |
В следующем уроке — оптимизация данных: data skipping по min/max статистикам, OPTIMIZE (compaction), Z-ORDER и Liquid Clustering.