Compaction и vacuum: обслуживание данных
Зачем нужно обслуживание данных
Данные в озере данных (data lake) имеют тенденцию деградировать со временем:
- Накопление маленьких файлов — каждый streaming micro-batch, каждая инкрементальная загрузка создаёт новые файлы
- Устаревшие версии — в Delta Lake/Iceberg каждая операция (INSERT, UPDATE, DELETE) создаёт новую версию данных, но старые файлы остаются
- Фрагментация данных — после множества UPDATE/DELETE данные разбросаны по файлам, data skipping теряет эффективность
Без обслуживания таблица из 100 оптимальных файлов превращается в 100,000 маленьких за несколько месяцев.
Compaction (OPTIMIZE)
Compaction — это процесс объединения множества маленьких файлов в меньшее количество крупных. В Delta Lake это делается командой OPTIMIZE:
-- Компактизация таблицы
OPTIMIZE events
-- Компактизация конкретной партиции
OPTIMIZE events
WHERE event_date = '2024-02-01'
-- Компактизация с Z-ordering
OPTIMIZE events
ZORDER BY (user_id, event_type)
# Через DeltaTable API
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/data/events/")
delta_table.optimize().executeCompaction()
# С Z-ordering
delta_table.optimize().executeZOrderBy("user_id", "event_type")
Что происходит при OPTIMIZE
Целевой размер файла: по умолчанию spark.databricks.delta.optimize.maxFileSize = 1073741824 (1GB) в Databricks, или delta.targetFileSize = 134217728 (128MB) в open-source Delta Lake.
Bin-packing vs Z-ordering
OPTIMIZE может работать в двух режимах:
| Режим | Что делает | Когда использовать |
|---|---|---|
| Bin-packing (default) | Объединяет маленькие файлы, сохраняя порядок | Когда data skipping уже эффективен |
| Z-ordering | Объединяет + перераспределяет по Z-кривой | Когда нужен data skipping по нескольким столбцам |
Z-ordering при OPTIMIZE — единственный способ применить Z-ordering к существующим данным.
Vacuum: очистка устаревших файлов
После OPTIMIZE старые маленькие файлы не удаляются — они остаются для поддержки time travel (запросы к предыдущим версиям данных). VACUUM удаляет файлы, которые старше retention period:
-- Удалить файлы старше 7 дней (default)
VACUUM events
-- Удалить файлы старше 30 дней
VACUUM events RETAIN 720 HOURS
# Через DeltaTable API
delta_table = DeltaTable.forPath(spark, "/data/events/")
delta_table.vacuum(retentionHours=168) # 7 дней
Что удаляет VACUUM
3 файла удалены — старше 7 дней retention period
Конфигурация retention
| Параметр | Default | Описание |
|---|---|---|
delta.deletedFileRetentionDuration | interval 7 days | Минимальный retention для VACUUM |
delta.logRetentionDuration | interval 30 days | Retention для _delta_log |
spark.databricks.delta.retentionDurationCheck.enabled | true | Защита от слишком маленького retention |
# Установка retention для таблицы
spark.sql("""
ALTER TABLE delta.`/data/events/`
SET TBLPROPERTIES (
'delta.deletedFileRetentionDuration' = 'interval 30 days'
)
""")
VACUUM с retention < 7 дней опасен! Time travel позволяет запрашивать предыдущие версии таблицы. Если VACUUM удалит файлы, от которых зависит активный запрос или конкурентная запись, результат — ошибка FileNotFoundException или corrupted data. По умолчанию Spark блокирует VACUUM с retention < 7 дней.
Анти-паттерн: VACUUM с retention < 7 дней
# ПЛОХО: агрессивный vacuum
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(retentionHours=0) # удалить ВСЁ старое
# Последствия:
# 1. Time travel сломан -- нельзя запросить вчерашнюю версию
# 2. Конкурентные читатели могут получить FileNotFoundException
# 3. Streaming checkpoint может указывать на удалённые файлы
# ХОРОШО: разумный retention
delta_table.vacuum(retentionHours=168) # 7 дней (default)
# Для critical tables с длительными jobs:
delta_table.vacuum(retentionHours=720) # 30 дней
Планирование maintenance jobs
Compaction и vacuum должны выполняться регулярно как maintenance jobs:
# Ежедневный maintenance job
from datetime import datetime, timedelta
def daily_maintenance(table_path: str):
delta_table = DeltaTable.forPath(spark, table_path)
# 1. Компактизация файлов
delta_table.optimize().executeCompaction()
# 2. Очистка устаревших версий
delta_table.vacuum(retentionHours=168)
print(f"Maintenance complete for {table_path} at {datetime.now()}")
# Расписание: ежедневно в 03:00 (off-peak hours)
tables = ["/data/events/", "/data/orders/", "/data/users/"]
for table in tables:
daily_maintenance(table)
Рекомендации по расписанию
| Частота записи | OPTIMIZE | VACUUM |
|---|---|---|
| Streaming (continuous) | Каждые 1-4 часа | Ежедневно |
| Micro-batch (hourly) | Ежедневно | Еженедельно |
| Daily append | Еженедельно | Ежемесячно |
| Редкие обновления | По необходимости | Ежемесячно |
Подробнее о Delta Lake, Iceberg и Hudi — в модуле Lakehouse (Phase 66). Там мы разберём ACID-транзакции, time travel, schema evolution и production-ready конфигурации.
Для углублённого изучения maintenance-операций lakehouse-форматов см. курс Storage Formats Deep-Dive — compaction и vacuum в Delta Lake, и maintenance в Iceberg (expire snapshots, rewrite data files).
Что дальше?
В следующем уроке мы подробно разберём проблему маленьких файлов — почему она возникает, как диагностировать и какие инструменты Spark предоставляет для её решения помимо OPTIMIZE.