Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 16 мин
Продвинутый
Delta Lakedelta-rsTableProviderTime TravelACIDMetadata SkippingDeltaTableTransaction Log

Delta Lake и DataFusion: time travel, ACID и file-level metadata skipping

В уроке 01 мы рассмотрели lakehouse-архитектуру и роль DataFusion как query engine layer. Теперь погружаемся в практику: как подключить Delta-таблицу к DataFusion, использовать time travel и получить ускорение в 2x за счёт file-level metadata skipping.

Архитектура интеграции

DataFusion интегрируется с Delta Lake через библиотеку delta-rs — Rust-реализацию протокола Delta Lake. В Python доступен пакет deltalake, который использует delta-rs под капотом:

Стек интеграции Delta Lake + DataFusion
SQL / DataFrame APISQL-запросы пользователей
DataFusion
TableProvider (delta-rs)
_delta_log/
Parquet files

Ключевая абстракция — DeltaTable из delta-rs реализует TableProvider. DataFusion видит его как обычную таблицу: получает schema, запрашивает statistics, выполняет scan. Все детали transaction log скрыты за абстракцией.

Подключение Delta-таблицы

Python API

import datafusion
from deltalake import DeltaTable

ctx = datafusion.SessionContext()

# Открытие локальной Delta-таблицы
delta = DeltaTable("./data/events")

# Регистрация в DataFusion как TableProvider
ctx.register_table_provider("events", delta)

# Обычные SQL-запросы — DataFusion не знает, что это Delta
df = ctx.sql("SELECT * FROM events WHERE date = '2024-01-15'")
df.show()
NOTE

Метод register_table_provider() принимает любой объект, реализующий протокол TableProvider. Delta-таблица, Iceberg-таблица и обычный Parquet-файл регистрируются одинаково — DataFusion работает с абстракцией.

Python API: таблица на S3

from deltalake import DeltaTable

# S3 — credentials из переменных окружения или ~/.aws/credentials
delta = DeltaTable(
    "s3://my-bucket/warehouse/events",
    storage_options={
        "AWS_REGION": "eu-west-1",
        # Можно явно: "AWS_ACCESS_KEY_ID": "...", "AWS_SECRET_ACCESS_KEY": "..."
    }
)

ctx.register_table_provider("events", delta)

Rust API

use datafusion::prelude::*;
use deltalake::open_table;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    // Открытие Delta-таблицы (локально или S3)
    let delta = open_table("./data/events").await?;

    // Регистрация — DeltaTable реализует TableProvider
    ctx.register_table("events", Arc::new(delta))?;

    let df = ctx.sql("SELECT count(*) FROM events").await?;
    df.show().await?;
    Ok(())
}
TIP

Для продакшена всегда указывайте storage_options явно — не полагайтесь на environment variables, которые могут отсутствовать в контейнерах или lambda-функциях.

Time Travel

Delta Lake хранит историю всех изменений в transaction log. Каждый commit — это JSON-файл с номером версии. DataFusion может запрашивать данные на конкретную версию или timestamp.

Запрос по версии

from deltalake import DeltaTable

# Открытие конкретной версии таблицы
delta_v5 = DeltaTable("./data/events", version=5)
ctx.register_table_provider("events_v5", delta_v5)

# Данные как они были на момент commit #5
df = ctx.sql("SELECT count(*) FROM events_v5")

Запрос по timestamp

from deltalake import DeltaTable
from datetime import datetime

# Данные на конкретный момент времени
delta_jan = DeltaTable(
    "./data/events",
    storage_options={"AWS_REGION": "eu-west-1"}
)
delta_jan.load_as_of("2024-01-15T00:00:00Z")

ctx.register_table_provider("events_jan", delta_jan)

Сценарии использования time travel

Сценарии time travel
Аудит измененийСравнение метрик между версиями
Пример
ВосстановлениеОткат после ошибочной записи
Пример
ML reproducibilityВоспроизводимые эксперименты на фиксированных данных
Пример
WARNING

Time travel доступен только пока файлы данных не удалены операцией VACUUM. По умолчанию Delta Lake хранит историю 30 дней. Для ML-сценариев с длинным горизонтом увеличьте retention period.

ACID-транзакции

Delta Lake обеспечивает ACID через optimistic concurrency control на уровне transaction log.

Как это работает

  1. Writer читает текущую версию таблицы (номер последнего commit)
  2. Формирует набор изменений (добавление/удаление файлов)
  3. Пытается атомарно записать новый commit (JSON-файл с номером version+1)
  4. Если другой writer успел записать commit с тем же номером — retry с перечитыванием
Writer A: read version=5 → prepare changes → write version=6 ✓
Writer B: read version=5 → prepare changes → write version=6 ✗ (conflict)
Writer B: re-read version=6 → re-prepare → write version=7 ✓

Isolation levels

УровеньПоведение
Serializable (default)Полная сериализуемость — конфликтующие writers retry
WriteSerializableДопускает concurrent appends без конфликтов

Для аналитических ETL-пайплайнов, где несколько job-ов append в одну таблицу, WriteSerializable снижает количество retry.

Запись из DataFusion

import datafusion
from deltalake import write_deltalake

ctx = datafusion.SessionContext()
ctx.register_parquet("raw", "./data/raw_events/")

# Трансформация
result = ctx.sql("""
    SELECT event_id, user_id, event_type,
           CAST(ts AS TIMESTAMP) as event_ts
    FROM raw
    WHERE event_type IS NOT NULL
""")

# Запись в Delta-формат
write_deltalake(
    "./data/events_delta",
    result.to_arrow_table(),
    mode="append",  # или "overwrite"
    partition_by=["event_date"]
)
NOTE

write_deltalake() — функция из пакета deltalake, не из DataFusion. DataFusion отвечает за чтение и трансформацию, а delta-rs — за запись с соблюдением транзакционного протокола.

File-level metadata skipping

Главный источник ускорения при использовании Delta Lake — пропуск файлов, которые гарантированно не содержат нужных данных.

Механизм

При каждом commit Delta Lake записывает в transaction log статистики по каждому добавленному файлу:

{
  "add": {
    "path": "part-00042-...-c000.parquet",
    "size": 67108864,
    "stats": {
      "numRecords": 100000,
      "minValues": { "date": "2024-01-15", "amount": 0.5 },
      "maxValues": { "date": "2024-01-15", "amount": 9999.99 },
      "nullCount": { "date": 0, "amount": 12 }
    }
  }
}

Когда DataFusion выполняет WHERE date = '2024-03-01', Delta-провайдер проверяет min/max каждого файла в transaction log и возвращает только файлы, где min(date) <= '2024-03-01' AND max(date) >= '2024-03-01'.

File-level metadata skipping
WHERE date = ‘2024-03-01’Предикат из SQL WHERE
Delta Transaction Log
1 файл из 4 → DataFusion ScanТолько файлы, содержащие данные за март

Сравнение с raw Parquet

Без Delta Lake DataFusion тоже фильтрует Parquet-файлы — но через row group statistics внутри каждого файла. Разница:

МетодЧто читаемI/O
Raw ParquetFooter каждого файла (последние ~10 KB) → row group statsN файлов × footer read
Delta LakeОдин JSON/checkpoint из _delta_log1 файл (или 1 checkpoint Parquet)

Для таблицы с 1000 файлов: raw Parquet делает 1000 маленьких I/O-запросов к object store (каждый с латентностью ~50ms на S3). Delta делает 1 запрос к transaction log. На практике это даёт ускорение в ~2x для селективных запросов.

TIP

Максимальный эффект от metadata skipping достигается, когда данные отсортированы по колонке фильтрации. Если файлы содержат случайный диапазон дат, min/max будут широкими и пропуск файлов не сработает.

Оптимизация: OPTIMIZE и Z-ORDER

Delta Lake предоставляет команды для улучшения файловой раскладки:

from deltalake import DeltaTable

dt = DeltaTable("./data/events")

# Компактировка мелких файлов в крупные
dt.optimize.compact()

# Z-ORDER — интерливинг нескольких колонок для multi-column pruning
dt.optimize.z_order(columns=["date", "user_id"])

OPTIMIZE compact — объединяет мелкие файлы (результат частых append) в файлы оптимального размера (128-256 MB). Уменьшает количество файлов → меньше metadata для обработки.

Z-ORDER — переупорядочивает данные внутри файлов для улучшения metadata skipping по нескольким колонкам одновременно. Полезно, когда запросы фильтруют по разным колонкам (date + user_id + region).

Производительность: когда Delta Lake быстрее raw Parquet

Delta Lake не всегда быстрее — есть overhead на чтение transaction log. Выигрыш зависит от сценария:

СценарийDelta vs Raw Parquet
Селективный запрос (WHERE на 1% данных)~2x быстрее — metadata skipping
Full scan (SELECT *)~1x — overhead минимален
Первый запрос (cold start)~0.9x медленнее — загрузка transaction log
Много мелких файлов (>10000)~3-5x быстрее — checkpoint вместо 10000 footer reads
Concurrent reads/writesКорректнее — ACID vs potential corruption
WARNING

Не используйте Delta Lake для одноразовых аналитических запросов к маленьким датасетам (<100 файлов). Overhead transaction log не окупится. Delta Lake оправдан для: (1) больших таблиц с selective queries, (2) таблиц с concurrent writes, (3) сценариев с time travel.

Change Data Feed

Delta Lake может отслеживать, какие строки были добавлены, обновлены или удалены между версиями:

from deltalake import DeltaTable

dt = DeltaTable("./data/events")

# Получить изменения между версиями
changes = dt.load_cdf(
    starting_version=5,
    ending_version=10
)

# changes содержит колонки: _change_type (insert/update_preimage/update_postimage/delete),
# _commit_version, _commit_timestamp + все колонки таблицы

Это полезно для:

  • Incremental ETL — обработать только новые/изменённые записи, не перечитывая всю таблицу
  • CDC pipelines — отправлять изменения в downstream-системы
  • Аудит — кто, когда и что изменил

Практический пример: аналитический сервис

Соберём полный пример — аналитический сервис, читающий Delta-таблицу с time travel и metadata skipping:

import datafusion
from deltalake import DeltaTable

def create_analytics_context(table_path: str, version: int = None):
    """Создаёт DataFusion контекст с Delta-таблицей."""
    ctx = datafusion.SessionContext()

    # Открытие с опциональным time travel
    if version is not None:
        delta = DeltaTable(table_path, version=version)
    else:
        delta = DeltaTable(table_path)

    ctx.register_table_provider("events", delta)
    return ctx

# Текущие данные
ctx = create_analytics_context("s3://warehouse/events")
current = ctx.sql("""
    SELECT event_type, count(*) as cnt
    FROM events
    WHERE date >= '2024-03-01' AND date < '2024-04-01'
    GROUP BY event_type
    ORDER BY cnt DESC
""")

# Данные неделю назад (time travel)
ctx_old = create_analytics_context("s3://warehouse/events", version=42)
previous = ctx_old.sql("""
    SELECT event_type, count(*) as cnt
    FROM events
    WHERE date >= '2024-03-01' AND date < '2024-04-01'
    GROUP BY event_type
    ORDER BY cnt DESC
""")

Ключевые выводы

  1. delta-rs реализует TableProvider — подключается к DataFusion через register_table_provider()
  2. Time travel — доступ к любой версии через DeltaTable(path, version=N) или load_as_of(timestamp)
  3. ACID — optimistic concurrency на transaction log, serializable по умолчанию
  4. File-level metadata skipping — ~2x ускорение для селективных запросов за счёт min/max stats в transaction log
  5. OPTIMIZE + Z-ORDER — улучшение файловой раскладки для лучшего pruning
  6. Change Data Feed — отслеживание изменений между версиями для incremental ETL
TIP

Для углублённого изучения внутренней архитектуры Delta Lake (transaction log, checkpoints, ACID-гарантии, data skipping) см. курс Storage Formats Deep-Dive.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Как подключить Delta-таблицу к DataFusion в Python?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4