Delta Lake и DataFusion: time travel, ACID и file-level metadata skipping
В уроке 01 мы рассмотрели lakehouse-архитектуру и роль DataFusion как query engine layer. Теперь погружаемся в практику: как подключить Delta-таблицу к DataFusion, использовать time travel и получить ускорение в 2x за счёт file-level metadata skipping.
Архитектура интеграции
DataFusion интегрируется с Delta Lake через библиотеку delta-rs — Rust-реализацию протокола Delta Lake. В Python доступен пакет deltalake, который использует delta-rs под капотом:
Ключевая абстракция — DeltaTable из delta-rs реализует TableProvider. DataFusion видит его как обычную таблицу: получает schema, запрашивает statistics, выполняет scan. Все детали transaction log скрыты за абстракцией.
Подключение Delta-таблицы
Python API
import datafusion
from deltalake import DeltaTable
ctx = datafusion.SessionContext()
# Открытие локальной Delta-таблицы
delta = DeltaTable("./data/events")
# Регистрация в DataFusion как TableProvider
ctx.register_table_provider("events", delta)
# Обычные SQL-запросы — DataFusion не знает, что это Delta
df = ctx.sql("SELECT * FROM events WHERE date = '2024-01-15'")
df.show()
Метод register_table_provider() принимает любой объект, реализующий протокол TableProvider. Delta-таблица, Iceberg-таблица и обычный Parquet-файл регистрируются одинаково — DataFusion работает с абстракцией.
Python API: таблица на S3
from deltalake import DeltaTable
# S3 — credentials из переменных окружения или ~/.aws/credentials
delta = DeltaTable(
"s3://my-bucket/warehouse/events",
storage_options={
"AWS_REGION": "eu-west-1",
# Можно явно: "AWS_ACCESS_KEY_ID": "...", "AWS_SECRET_ACCESS_KEY": "..."
}
)
ctx.register_table_provider("events", delta)
Rust API
use datafusion::prelude::*;
use deltalake::open_table;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Открытие Delta-таблицы (локально или S3)
let delta = open_table("./data/events").await?;
// Регистрация — DeltaTable реализует TableProvider
ctx.register_table("events", Arc::new(delta))?;
let df = ctx.sql("SELECT count(*) FROM events").await?;
df.show().await?;
Ok(())
}
Для продакшена всегда указывайте storage_options явно — не полагайтесь на environment variables, которые могут отсутствовать в контейнерах или lambda-функциях.
Time Travel
Delta Lake хранит историю всех изменений в transaction log. Каждый commit — это JSON-файл с номером версии. DataFusion может запрашивать данные на конкретную версию или timestamp.
Запрос по версии
from deltalake import DeltaTable
# Открытие конкретной версии таблицы
delta_v5 = DeltaTable("./data/events", version=5)
ctx.register_table_provider("events_v5", delta_v5)
# Данные как они были на момент commit #5
df = ctx.sql("SELECT count(*) FROM events_v5")
Запрос по timestamp
from deltalake import DeltaTable
from datetime import datetime
# Данные на конкретный момент времени
delta_jan = DeltaTable(
"./data/events",
storage_options={"AWS_REGION": "eu-west-1"}
)
delta_jan.load_as_of("2024-01-15T00:00:00Z")
ctx.register_table_provider("events_jan", delta_jan)
Сценарии использования time travel
Time travel доступен только пока файлы данных не удалены операцией VACUUM. По умолчанию Delta Lake хранит историю 30 дней. Для ML-сценариев с длинным горизонтом увеличьте retention period.
ACID-транзакции
Delta Lake обеспечивает ACID через optimistic concurrency control на уровне transaction log.
Как это работает
- Writer читает текущую версию таблицы (номер последнего commit)
- Формирует набор изменений (добавление/удаление файлов)
- Пытается атомарно записать новый commit (JSON-файл с номером version+1)
- Если другой writer успел записать commit с тем же номером — retry с перечитыванием
Writer A: read version=5 → prepare changes → write version=6 ✓
Writer B: read version=5 → prepare changes → write version=6 ✗ (conflict)
Writer B: re-read version=6 → re-prepare → write version=7 ✓
Isolation levels
| Уровень | Поведение |
|---|---|
| Serializable (default) | Полная сериализуемость — конфликтующие writers retry |
| WriteSerializable | Допускает concurrent appends без конфликтов |
Для аналитических ETL-пайплайнов, где несколько job-ов append в одну таблицу, WriteSerializable снижает количество retry.
Запись из DataFusion
import datafusion
from deltalake import write_deltalake
ctx = datafusion.SessionContext()
ctx.register_parquet("raw", "./data/raw_events/")
# Трансформация
result = ctx.sql("""
SELECT event_id, user_id, event_type,
CAST(ts AS TIMESTAMP) as event_ts
FROM raw
WHERE event_type IS NOT NULL
""")
# Запись в Delta-формат
write_deltalake(
"./data/events_delta",
result.to_arrow_table(),
mode="append", # или "overwrite"
partition_by=["event_date"]
)
write_deltalake() — функция из пакета deltalake, не из DataFusion. DataFusion отвечает за чтение и трансформацию, а delta-rs — за запись с соблюдением транзакционного протокола.
File-level metadata skipping
Главный источник ускорения при использовании Delta Lake — пропуск файлов, которые гарантированно не содержат нужных данных.
Механизм
При каждом commit Delta Lake записывает в transaction log статистики по каждому добавленному файлу:
{
"add": {
"path": "part-00042-...-c000.parquet",
"size": 67108864,
"stats": {
"numRecords": 100000,
"minValues": { "date": "2024-01-15", "amount": 0.5 },
"maxValues": { "date": "2024-01-15", "amount": 9999.99 },
"nullCount": { "date": 0, "amount": 12 }
}
}
}
Когда DataFusion выполняет WHERE date = '2024-03-01', Delta-провайдер проверяет min/max каждого файла в transaction log и возвращает только файлы, где min(date) <= '2024-03-01' AND max(date) >= '2024-03-01'.
Сравнение с raw Parquet
Без Delta Lake DataFusion тоже фильтрует Parquet-файлы — но через row group statistics внутри каждого файла. Разница:
| Метод | Что читаем | I/O |
|---|---|---|
| Raw Parquet | Footer каждого файла (последние ~10 KB) → row group stats | N файлов × footer read |
| Delta Lake | Один JSON/checkpoint из _delta_log | 1 файл (или 1 checkpoint Parquet) |
Для таблицы с 1000 файлов: raw Parquet делает 1000 маленьких I/O-запросов к object store (каждый с латентностью ~50ms на S3). Delta делает 1 запрос к transaction log. На практике это даёт ускорение в ~2x для селективных запросов.
Максимальный эффект от metadata skipping достигается, когда данные отсортированы по колонке фильтрации. Если файлы содержат случайный диапазон дат, min/max будут широкими и пропуск файлов не сработает.
Оптимизация: OPTIMIZE и Z-ORDER
Delta Lake предоставляет команды для улучшения файловой раскладки:
from deltalake import DeltaTable
dt = DeltaTable("./data/events")
# Компактировка мелких файлов в крупные
dt.optimize.compact()
# Z-ORDER — интерливинг нескольких колонок для multi-column pruning
dt.optimize.z_order(columns=["date", "user_id"])
OPTIMIZE compact — объединяет мелкие файлы (результат частых append) в файлы оптимального размера (128-256 MB). Уменьшает количество файлов → меньше metadata для обработки.
Z-ORDER — переупорядочивает данные внутри файлов для улучшения metadata skipping по нескольким колонкам одновременно. Полезно, когда запросы фильтруют по разным колонкам (date + user_id + region).
Производительность: когда Delta Lake быстрее raw Parquet
Delta Lake не всегда быстрее — есть overhead на чтение transaction log. Выигрыш зависит от сценария:
| Сценарий | Delta vs Raw Parquet |
|---|---|
| Селективный запрос (WHERE на 1% данных) | ~2x быстрее — metadata skipping |
| Full scan (SELECT *) | ~1x — overhead минимален |
| Первый запрос (cold start) | ~0.9x медленнее — загрузка transaction log |
| Много мелких файлов (>10000) | ~3-5x быстрее — checkpoint вместо 10000 footer reads |
| Concurrent reads/writes | Корректнее — ACID vs potential corruption |
Не используйте Delta Lake для одноразовых аналитических запросов к маленьким датасетам (<100 файлов). Overhead transaction log не окупится. Delta Lake оправдан для: (1) больших таблиц с selective queries, (2) таблиц с concurrent writes, (3) сценариев с time travel.
Change Data Feed
Delta Lake может отслеживать, какие строки были добавлены, обновлены или удалены между версиями:
from deltalake import DeltaTable
dt = DeltaTable("./data/events")
# Получить изменения между версиями
changes = dt.load_cdf(
starting_version=5,
ending_version=10
)
# changes содержит колонки: _change_type (insert/update_preimage/update_postimage/delete),
# _commit_version, _commit_timestamp + все колонки таблицы
Это полезно для:
- Incremental ETL — обработать только новые/изменённые записи, не перечитывая всю таблицу
- CDC pipelines — отправлять изменения в downstream-системы
- Аудит — кто, когда и что изменил
Практический пример: аналитический сервис
Соберём полный пример — аналитический сервис, читающий Delta-таблицу с time travel и metadata skipping:
import datafusion
from deltalake import DeltaTable
def create_analytics_context(table_path: str, version: int = None):
"""Создаёт DataFusion контекст с Delta-таблицей."""
ctx = datafusion.SessionContext()
# Открытие с опциональным time travel
if version is not None:
delta = DeltaTable(table_path, version=version)
else:
delta = DeltaTable(table_path)
ctx.register_table_provider("events", delta)
return ctx
# Текущие данные
ctx = create_analytics_context("s3://warehouse/events")
current = ctx.sql("""
SELECT event_type, count(*) as cnt
FROM events
WHERE date >= '2024-03-01' AND date < '2024-04-01'
GROUP BY event_type
ORDER BY cnt DESC
""")
# Данные неделю назад (time travel)
ctx_old = create_analytics_context("s3://warehouse/events", version=42)
previous = ctx_old.sql("""
SELECT event_type, count(*) as cnt
FROM events
WHERE date >= '2024-03-01' AND date < '2024-04-01'
GROUP BY event_type
ORDER BY cnt DESC
""")
Ключевые выводы
- delta-rs реализует
TableProvider— подключается к DataFusion черезregister_table_provider() - Time travel — доступ к любой версии через
DeltaTable(path, version=N)илиload_as_of(timestamp) - ACID — optimistic concurrency на transaction log, serializable по умолчанию
- File-level metadata skipping — ~2x ускорение для селективных запросов за счёт min/max stats в transaction log
- OPTIMIZE + Z-ORDER — улучшение файловой раскладки для лучшего pruning
- Change Data Feed — отслеживание изменений между версиями для incremental ETL
Для углублённого изучения внутренней архитектуры Delta Lake (transaction log, checkpoints, ACID-гарантии, data skipping) см. курс Storage Formats Deep-Dive.