Apache Iceberg и DataFusion: каталоги, hidden partitioning, schema evolution
Iceberg — второй по популярности открытый формат таблиц в экосистеме DataFusion. В отличие от Delta Lake, Iceberg проектировался с нуля для multi-engine support: одна и та же таблица читается DataFusion, Spark, Flink и Trino без адаптеров.
Архитектура метаданных Iceberg
Iceberg использует трёхуровневую иерархию метаданных — это сложнее Delta Lake, но масштабируется лучше при большом количестве файлов и партиций.
Почему три уровня?
- Metadata file — атомарно заменяется при commit (как Delta JSON, но один файл на всю таблицу)
- Manifest list — позволяет быстро определить, какие manifest-файлы содержат данные для конкретного snapshot
- Manifest file — группирует data files по партициям, хранит per-file statistics для pruning
При наличии 100 000 файлов Delta Lake хранит stats в одном checkpoint (может быть сотни MB). Iceberg распределяет stats по manifest-файлам — каждый отвечает за ~1000 файлов, что позволяет читать только нужные manifests.
Правило: чем больше файлов в таблице, тем заметнее преимущество Iceberg перед Delta в скорости planning (построение списка файлов для чтения). Для таблиц <10 000 файлов разница минимальна.
Интеграция с DataFusion
Python API через pyiceberg
import datafusion
from pyiceberg.catalog import load_catalog
# Подключение к каталогу (REST catalog, Hive Metastore, Glue, etc.)
catalog = load_catalog(
"my_catalog",
**{
"type": "rest",
"uri": "http://iceberg-rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
}
)
# Загрузка таблицы из каталога
table = catalog.load_table("analytics.events")
# Регистрация в DataFusion
ctx = datafusion.SessionContext()
ctx.register_table_provider("events", table)
# SQL-запросы — hidden partitioning работает автоматически
df = ctx.sql("""
SELECT event_type, count(*) as cnt
FROM events
WHERE event_ts >= '2024-03-01' AND event_ts < '2024-04-01'
GROUP BY event_type
""")
df.show()
В отличие от Delta Lake, где таблица открывается по пути к директории, Iceberg требует каталог. Каталог — это реестр таблиц, который хранит указатель на текущий metadata file. Без каталога Iceberg-таблица недоступна.
Типы каталогов
| Каталог | Описание | Когда использовать |
|---|---|---|
| REST Catalog | HTTP API (Tabular, Polaris, Gravitino) | Продакшен, multi-engine |
| Hive Metastore | JDBC-хранилище метаданных | Существующий Hadoop/Spark-стек |
| AWS Glue | AWS-managed catalog | AWS-инфраструктура |
| SQL Catalog | PostgreSQL/MySQL/SQLite | Простые сетапы, разработка |
| File Catalog | Metadata в filesystem | Локальная разработка, тесты |
Для локальной разработки с DataFusion проще всего использовать SQL или File catalog:
# Локальный SQLite-каталог для разработки
catalog = load_catalog(
"dev",
**{
"type": "sql",
"uri": "sqlite:///tmp/iceberg_catalog.db",
"warehouse": "file:///tmp/iceberg_warehouse",
}
)
Rust API через iceberg-rust
use datafusion::prelude::*;
use iceberg::Catalog;
use iceberg_catalog_rest::RestCatalog;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Подключение к REST-каталогу
let catalog = RestCatalog::new(/* config */);
// Загрузка таблицы
let table = catalog
.load_table(&TableIdent::from_strs(["analytics", "events"])?)
.await?;
// Регистрация — IcebergTable реализует TableProvider
ctx.register_table("events", Arc::new(table))?;
let df = ctx.sql("SELECT count(*) FROM events").await?;
df.show().await?;
Ok(())
}
iceberg-rust находится в активной разработке (Apache инкубатор). API может меняться между минорными версиями. Проверяйте совместимость с вашей версией DataFusion в changelog iceberg-rust.
Hidden Partitioning
Главное архитектурное отличие Iceberg от Delta Lake — пользователю не нужно знать схему партиционирования для написания запросов.
Проблема с explicit partitioning
В Delta Lake (и Hive) партиционирование видно в запросах:
-- Delta Lake / Hive: нужно знать, что таблица партиционирована по date
-- и что партиция — это колонка в данных
SELECT * FROM events WHERE date = '2024-03-01'
-- Если партиционирование по month(event_ts), а вы пишете:
SELECT * FROM events WHERE event_ts = '2024-03-01T12:00:00'
-- ...партиция НЕ будет использована, будет full scan
Решение Iceberg: transform functions
Iceberg определяет партиционирование через transform functions, которые движок применяет автоматически:
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, TimestampType, StringType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import MonthTransform
schema = Schema(
NestedField(1, "event_id", LongType(), required=True),
NestedField(2, "event_ts", TimestampType(), required=True),
NestedField(3, "event_type", StringType(), required=True),
)
# Партиционирование: month(event_ts)
spec = PartitionSpec(
PartitionField(
source_id=2, # event_ts
field_id=1000,
transform=MonthTransform(),
name="event_month"
)
)
Теперь любой запрос с фильтром по event_ts автоматически использует партиции:
-- Iceberg: любой предикат на event_ts использует партиции автоматически
SELECT * FROM events
WHERE event_ts >= TIMESTAMP '2024-03-01 00:00:00'
AND event_ts < TIMESTAMP '2024-04-01 00:00:00'
-- Движок применяет month() к предикату → читает только партицию 2024-03
Доступные transforms
| Transform | Описание | Пример |
|---|---|---|
identity | Партиционирование по значению колонки | identity(country) |
year | По году из timestamp | year(event_ts) → 2024 |
month | По месяцу | month(event_ts) → 2024-03 |
day | По дню | day(event_ts) → 2024-03-15 |
hour | По часу | hour(event_ts) → 2024-03-15-14 |
bucket(N) | Hash bucketing | bucket(16, user_id) → 0-15 |
truncate(W) | Усечение строки/числа | truncate(3, city) → “Mos” |
Partition Evolution
Уникальная возможность Iceberg — изменение схемы партиционирования без перезаписи данных.
Сценарий
Таблица создана с month(event_ts). Через полгода нагрузка выросла, и нужно партиционирование по дням:
from pyiceberg.transforms import DayTransform
table = catalog.load_table("analytics.events")
# Эволюция партиционирования: month → day
with table.update_spec() as update:
update.remove_field("event_month") # удаляем month
update.add_field(
source_column_name="event_ts",
transform=DayTransform(),
name="event_day"
)
После этого:
- Старые файлы остаются в месячных партициях — не перезаписываются
- Новые файлы записываются в дневные партиции
- Запросы прозрачно читают обе схемы — Iceberg знает, какая схема действовала для каждого файла
Partition evolution — ключевое преимущество Iceberg для растущих систем. В Delta Lake изменение партиционирования требует полной перезаписи таблицы (OPTIMIZE + ZORDER).
Schema Evolution
Iceberg отслеживает колонки по уникальным ID, а не по именам. Это даёт безопасную schema evolution:
Поддерживаемые операции
table = catalog.load_table("analytics.events")
with table.update_schema() as update:
# Добавление новой колонки
update.add_column("session_id", StringType())
# Переименование колонки — файлы не перезаписываются
update.rename_column("event_type", "action_type")
# Удаление колонки (мягкое — данные остаются в файлах)
update.delete_column("legacy_field")
# Изменение типа (только расширяющие: int → long, float → double)
update.update_column("event_id", LongType())
Как это работает с DataFusion
DataFusion получает текущую schema от TableProvider (Iceberg). При чтении старых файлов, где отсутствует новая колонка session_id, Iceberg автоматически подставляет NULL:
-- session_id добавлен вчера. Старые файлы не содержат эту колонку.
SELECT event_id, session_id FROM events LIMIT 5;
-- Результат:
-- event_id | session_id
-- 1 | NULL ← старый файл, колонки нет → NULL
-- 2 | NULL
-- 3 | "abc-123" ← новый файл, колонка заполнена
Time Travel в Iceberg
Как и Delta Lake, Iceberg поддерживает запросы к историческим версиям:
# По snapshot ID
table = catalog.load_table("analytics.events")
scan = table.scan(snapshot_id=3051729675574597004)
arrow_table = scan.to_arrow()
# По timestamp
from datetime import datetime
scan = table.scan(
snapshot_id=table.snapshot_as_of_timestamp(
datetime(2024, 3, 15).timestamp() * 1000
).snapshot_id
)
Snapshots vs Versions
Iceberg использует snapshot ID (уникальный long), а Delta Lake — последовательный номер версии (0, 1, 2, …). Snapshot ID не последователен, что затрудняет навигацию вручную, но даёт гарантию уникальности в распределённых системах.
# Просмотр истории snapshots
for snapshot in table.metadata.snapshots:
print(f"ID: {snapshot.snapshot_id}, "
f"Timestamp: {snapshot.timestamp_ms}, "
f"Operation: {snapshot.summary.operation}")
Практический пример: аналитика с Iceberg + DataFusion
import datafusion
from pyiceberg.catalog import load_catalog
def setup_iceberg_analytics():
"""Настройка аналитического контекста с Iceberg."""
catalog = load_catalog(
"production",
**{
"type": "rest",
"uri": "http://iceberg-rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
}
)
ctx = datafusion.SessionContext()
# Регистрация нескольких таблиц из каталога
for table_name in ["events", "users", "sessions"]:
table = catalog.load_table(f"analytics.{table_name}")
ctx.register_table_provider(table_name, table)
return ctx
ctx = setup_iceberg_analytics()
# JOIN между Iceberg-таблицами — DataFusion оптимизирует как обычно
result = ctx.sql("""
SELECT
u.country,
count(DISTINCT e.user_id) as active_users,
count(*) as event_count
FROM events e
JOIN users u ON e.user_id = u.user_id
WHERE e.event_ts >= TIMESTAMP '2024-03-01 00:00:00'
AND e.event_ts < TIMESTAMP '2024-04-01 00:00:00'
GROUP BY u.country
ORDER BY active_users DESC
LIMIT 20
""")
result.show()
При JOIN между Iceberg-таблицами DataFusion применяет standard join optimization (hash join, predicate pushdown). Iceberg-провайдер делает file pruning до начала join — каждая таблица возвращает только нужные файлы.
Сравнение с Delta Lake: когда выбрать Iceberg
| Аспект | Выбрать Iceberg | Выбрать Delta Lake |
|---|---|---|
| Партиционирование | Нужна partition evolution или hidden partitioning | Фиксированная схема, простота |
| Multi-engine | Spark + Flink + Trino + DataFusion | Databricks + DataFusion |
| Schema evolution | Частые rename/drop колонок | Только add columns |
| Каталог | Уже есть REST/Glue/Hive catalog | Нет каталога, работаем по путям |
| Файловая раскладка | Очень много файлов (>100K) | Умеренное количество |
| Экосистема | Multi-vendor стратегия | Databricks-стек |
Ключевые выводы
- Iceberg использует трёхуровневую архитектуру метаданных (metadata → manifest list → manifests), масштабируясь лучше Delta при большом количестве файлов
- Каталог обязателен — без него таблица недоступна. REST catalog для продакшена, SQL/File — для разработки
- Hidden partitioning — пользователь пишет WHERE по исходной колонке, движок автоматически применяет partition transform
- Partition evolution — изменение схемы партиционирования без перезаписи данных, уникальное преимущество Iceberg
- Schema evolution по column ID — безопасные rename и drop без перезаписи файлов
- iceberg-rust интегрируется с DataFusion через
TableProvider, аналогично delta-rs
Для углублённого изучения внутренней архитектуры Iceberg (трёхуровневые метаданные, manifest files, partition evolution, schema evolution) см. курс Storage Formats Deep-Dive.