Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 16 мин
Продвинутый
Apache Icebergiceberg-rustpyicebergCatalogHidden PartitioningPartition EvolutionSchema EvolutionTableProvider

Apache Iceberg и DataFusion: каталоги, hidden partitioning, schema evolution

Iceberg — второй по популярности открытый формат таблиц в экосистеме DataFusion. В отличие от Delta Lake, Iceberg проектировался с нуля для multi-engine support: одна и та же таблица читается DataFusion, Spark, Flink и Trino без адаптеров.

Архитектура метаданных Iceberg

Iceberg использует трёхуровневую иерархию метаданных — это сложнее Delta Lake, но масштабируется лучше при большом количестве файлов и партиций.

Трёхуровневая архитектура метаданных Iceberg
Catalog (REST, Hive, Glue, JDBC)Catalog хранит указатель на текущий metadata file для каждой таблицы
Metadata File (v1.metadata.json)
Manifest List (snap-001.avro)
Manifest File (manifest-001.avro)
Parquet файлы
Delete файлы

Почему три уровня?

  • Metadata file — атомарно заменяется при commit (как Delta JSON, но один файл на всю таблицу)
  • Manifest list — позволяет быстро определить, какие manifest-файлы содержат данные для конкретного snapshot
  • Manifest file — группирует data files по партициям, хранит per-file statistics для pruning

При наличии 100 000 файлов Delta Lake хранит stats в одном checkpoint (может быть сотни MB). Iceberg распределяет stats по manifest-файлам — каждый отвечает за ~1000 файлов, что позволяет читать только нужные manifests.

TIP

Правило: чем больше файлов в таблице, тем заметнее преимущество Iceberg перед Delta в скорости planning (построение списка файлов для чтения). Для таблиц <10 000 файлов разница минимальна.

Интеграция с DataFusion

Python API через pyiceberg

import datafusion
from pyiceberg.catalog import load_catalog

# Подключение к каталогу (REST catalog, Hive Metastore, Glue, etc.)
catalog = load_catalog(
    "my_catalog",
    **{
        "type": "rest",
        "uri": "http://iceberg-rest:8181",
        "s3.endpoint": "http://minio:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    }
)

# Загрузка таблицы из каталога
table = catalog.load_table("analytics.events")

# Регистрация в DataFusion
ctx = datafusion.SessionContext()
ctx.register_table_provider("events", table)

# SQL-запросы — hidden partitioning работает автоматически
df = ctx.sql("""
    SELECT event_type, count(*) as cnt
    FROM events
    WHERE event_ts >= '2024-03-01' AND event_ts < '2024-04-01'
    GROUP BY event_type
""")
df.show()
NOTE

В отличие от Delta Lake, где таблица открывается по пути к директории, Iceberg требует каталог. Каталог — это реестр таблиц, который хранит указатель на текущий metadata file. Без каталога Iceberg-таблица недоступна.

Типы каталогов

КаталогОписаниеКогда использовать
REST CatalogHTTP API (Tabular, Polaris, Gravitino)Продакшен, multi-engine
Hive MetastoreJDBC-хранилище метаданныхСуществующий Hadoop/Spark-стек
AWS GlueAWS-managed catalogAWS-инфраструктура
SQL CatalogPostgreSQL/MySQL/SQLiteПростые сетапы, разработка
File CatalogMetadata в filesystemЛокальная разработка, тесты

Для локальной разработки с DataFusion проще всего использовать SQL или File catalog:

# Локальный SQLite-каталог для разработки
catalog = load_catalog(
    "dev",
    **{
        "type": "sql",
        "uri": "sqlite:///tmp/iceberg_catalog.db",
        "warehouse": "file:///tmp/iceberg_warehouse",
    }
)

Rust API через iceberg-rust

use datafusion::prelude::*;
use iceberg::Catalog;
use iceberg_catalog_rest::RestCatalog;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    // Подключение к REST-каталогу
    let catalog = RestCatalog::new(/* config */);

    // Загрузка таблицы
    let table = catalog
        .load_table(&TableIdent::from_strs(["analytics", "events"])?)
        .await?;

    // Регистрация — IcebergTable реализует TableProvider
    ctx.register_table("events", Arc::new(table))?;

    let df = ctx.sql("SELECT count(*) FROM events").await?;
    df.show().await?;
    Ok(())
}
WARNING

iceberg-rust находится в активной разработке (Apache инкубатор). API может меняться между минорными версиями. Проверяйте совместимость с вашей версией DataFusion в changelog iceberg-rust.

Hidden Partitioning

Главное архитектурное отличие Iceberg от Delta Lake — пользователю не нужно знать схему партиционирования для написания запросов.

Проблема с explicit partitioning

В Delta Lake (и Hive) партиционирование видно в запросах:

-- Delta Lake / Hive: нужно знать, что таблица партиционирована по date
-- и что партиция — это колонка в данных
SELECT * FROM events WHERE date = '2024-03-01'

-- Если партиционирование по month(event_ts), а вы пишете:
SELECT * FROM events WHERE event_ts = '2024-03-01T12:00:00'
-- ...партиция НЕ будет использована, будет full scan

Решение Iceberg: transform functions

Iceberg определяет партиционирование через transform functions, которые движок применяет автоматически:

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, TimestampType, StringType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import MonthTransform

schema = Schema(
    NestedField(1, "event_id", LongType(), required=True),
    NestedField(2, "event_ts", TimestampType(), required=True),
    NestedField(3, "event_type", StringType(), required=True),
)

# Партиционирование: month(event_ts)
spec = PartitionSpec(
    PartitionField(
        source_id=2,  # event_ts
        field_id=1000,
        transform=MonthTransform(),
        name="event_month"
    )
)

Теперь любой запрос с фильтром по event_ts автоматически использует партиции:

-- Iceberg: любой предикат на event_ts использует партиции автоматически
SELECT * FROM events
WHERE event_ts >= TIMESTAMP '2024-03-01 00:00:00'
  AND event_ts < TIMESTAMP '2024-04-01 00:00:00'
-- Движок применяет month() к предикату → читает только партицию 2024-03
Hidden partitioning vs explicit partitioning
Explicit (Delta/Hive)Delta/Hive: пользователь должен знать и использовать partition columns
Запрос
Проблема
Hidden (Iceberg)Iceberg: движок автоматически применяет transform к предикату
Запрос
Результат

Доступные transforms

TransformОписаниеПример
identityПартиционирование по значению колонкиidentity(country)
yearПо году из timestampyear(event_ts) → 2024
monthПо месяцуmonth(event_ts) → 2024-03
dayПо днюday(event_ts) → 2024-03-15
hourПо часуhour(event_ts) → 2024-03-15-14
bucket(N)Hash bucketingbucket(16, user_id) → 0-15
truncate(W)Усечение строки/числаtruncate(3, city) → “Mos”

Partition Evolution

Уникальная возможность Iceberg — изменение схемы партиционирования без перезаписи данных.

Сценарий

Таблица создана с month(event_ts). Через полгода нагрузка выросла, и нужно партиционирование по дням:

from pyiceberg.transforms import DayTransform

table = catalog.load_table("analytics.events")

# Эволюция партиционирования: month → day
with table.update_spec() as update:
    update.remove_field("event_month")   # удаляем month
    update.add_field(
        source_column_name="event_ts",
        transform=DayTransform(),
        name="event_day"
    )

После этого:

  • Старые файлы остаются в месячных партициях — не перезаписываются
  • Новые файлы записываются в дневные партиции
  • Запросы прозрачно читают обе схемы — Iceberg знает, какая схема действовала для каждого файла
Partition evolution: month → day
Старые файлыФайлы, записанные до эволюции
Партиция
Новые файлыФайлы, записанные после эволюции
Партиция
Запрос: WHERE event_ts = '2024-03-15'
TIP

Partition evolution — ключевое преимущество Iceberg для растущих систем. В Delta Lake изменение партиционирования требует полной перезаписи таблицы (OPTIMIZE + ZORDER).

Schema Evolution

Iceberg отслеживает колонки по уникальным ID, а не по именам. Это даёт безопасную schema evolution:

Поддерживаемые операции

table = catalog.load_table("analytics.events")

with table.update_schema() as update:
    # Добавление новой колонки
    update.add_column("session_id", StringType())

    # Переименование колонки — файлы не перезаписываются
    update.rename_column("event_type", "action_type")

    # Удаление колонки (мягкое — данные остаются в файлах)
    update.delete_column("legacy_field")

    # Изменение типа (только расширяющие: int → long, float → double)
    update.update_column("event_id", LongType())

Как это работает с DataFusion

DataFusion получает текущую schema от TableProvider (Iceberg). При чтении старых файлов, где отсутствует новая колонка session_id, Iceberg автоматически подставляет NULL:

-- session_id добавлен вчера. Старые файлы не содержат эту колонку.
SELECT event_id, session_id FROM events LIMIT 5;

-- Результат:
-- event_id | session_id
-- 1        | NULL        ← старый файл, колонки нет → NULL
-- 2        | NULL
-- 3        | "abc-123"   ← новый файл, колонка заполнена

Time Travel в Iceberg

Как и Delta Lake, Iceberg поддерживает запросы к историческим версиям:

# По snapshot ID
table = catalog.load_table("analytics.events")
scan = table.scan(snapshot_id=3051729675574597004)
arrow_table = scan.to_arrow()

# По timestamp
from datetime import datetime
scan = table.scan(
    snapshot_id=table.snapshot_as_of_timestamp(
        datetime(2024, 3, 15).timestamp() * 1000
    ).snapshot_id
)

Snapshots vs Versions

Iceberg использует snapshot ID (уникальный long), а Delta Lake — последовательный номер версии (0, 1, 2, …). Snapshot ID не последователен, что затрудняет навигацию вручную, но даёт гарантию уникальности в распределённых системах.

# Просмотр истории snapshots
for snapshot in table.metadata.snapshots:
    print(f"ID: {snapshot.snapshot_id}, "
          f"Timestamp: {snapshot.timestamp_ms}, "
          f"Operation: {snapshot.summary.operation}")

Практический пример: аналитика с Iceberg + DataFusion

import datafusion
from pyiceberg.catalog import load_catalog

def setup_iceberg_analytics():
    """Настройка аналитического контекста с Iceberg."""
    catalog = load_catalog(
        "production",
        **{
            "type": "rest",
            "uri": "http://iceberg-rest:8181",
            "s3.endpoint": "http://minio:9000",
            "s3.access-key-id": "admin",
            "s3.secret-access-key": "password",
        }
    )

    ctx = datafusion.SessionContext()

    # Регистрация нескольких таблиц из каталога
    for table_name in ["events", "users", "sessions"]:
        table = catalog.load_table(f"analytics.{table_name}")
        ctx.register_table_provider(table_name, table)

    return ctx

ctx = setup_iceberg_analytics()

# JOIN между Iceberg-таблицами — DataFusion оптимизирует как обычно
result = ctx.sql("""
    SELECT
        u.country,
        count(DISTINCT e.user_id) as active_users,
        count(*) as event_count
    FROM events e
    JOIN users u ON e.user_id = u.user_id
    WHERE e.event_ts >= TIMESTAMP '2024-03-01 00:00:00'
      AND e.event_ts < TIMESTAMP '2024-04-01 00:00:00'
    GROUP BY u.country
    ORDER BY active_users DESC
    LIMIT 20
""")
result.show()
NOTE

При JOIN между Iceberg-таблицами DataFusion применяет standard join optimization (hash join, predicate pushdown). Iceberg-провайдер делает file pruning до начала join — каждая таблица возвращает только нужные файлы.

Сравнение с Delta Lake: когда выбрать Iceberg

АспектВыбрать IcebergВыбрать Delta Lake
ПартиционированиеНужна partition evolution или hidden partitioningФиксированная схема, простота
Multi-engineSpark + Flink + Trino + DataFusionDatabricks + DataFusion
Schema evolutionЧастые rename/drop колонокТолько add columns
КаталогУже есть REST/Glue/Hive catalogНет каталога, работаем по путям
Файловая раскладкаОчень много файлов (>100K)Умеренное количество
ЭкосистемаMulti-vendor стратегияDatabricks-стек

Ключевые выводы

  1. Iceberg использует трёхуровневую архитектуру метаданных (metadata → manifest list → manifests), масштабируясь лучше Delta при большом количестве файлов
  2. Каталог обязателен — без него таблица недоступна. REST catalog для продакшена, SQL/File — для разработки
  3. Hidden partitioning — пользователь пишет WHERE по исходной колонке, движок автоматически применяет partition transform
  4. Partition evolution — изменение схемы партиционирования без перезаписи данных, уникальное преимущество Iceberg
  5. Schema evolution по column ID — безопасные rename и drop без перезаписи файлов
  6. iceberg-rust интегрируется с DataFusion через TableProvider, аналогично delta-rs
TIP

Для углублённого изучения внутренней архитектуры Iceberg (трёхуровневые метаданные, manifest files, partition evolution, schema evolution) см. курс Storage Formats Deep-Dive.

Iceberg: catalog и metadata layer Iceberg maintenance + ecosystem Lakehouse architecture: system design

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Чем архитектура метаданных Iceberg (metadata → manifest list → manifest files) отличается от Delta Lake (JSON commits + checkpoint)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4