Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 14 мин
Продвинутый
ObjectStoreS3GCSAzure BlobCredentialsMulti-formatMigrationDataFusion

Object Store и мультиформатные конфигурации

В предыдущих уроках мы подключали Delta Lake и Iceberg к DataFusion. Но под капотом обоих форматов работает один и тот же механизм доступа к хранилищу — trait ObjectStore. Этот урок объясняет, как DataFusion читает данные из S3/GCS/Azure, как конфигурировать credentials, и как выбрать между форматами для конкретного сценария.

ObjectStore trait

DataFusion не обращается к файловой системе или облачному хранилищу напрямую. Все I/O-операции проходят через абстракцию ObjectStore:

ObjectStore как единый интерфейс доступа к хранилищу
DataFusion (DataSourceExec)DataFusion запрашивает файлы через ObjectStore trait
ObjectStore trait
AmazonS3Amazon S3 и совместимые (MinIO, Ceph)
URL
GoogleCloudStorageGoogle Cloud Storage
URL
MicrosoftAzureAzure Blob Storage
URL
LocalFileSystemЛокальная файловая система
URL

Библиотека object_store (Apache Arrow проект) предоставляет реализации для всех облачных провайдеров. DataFusion использует её по умолчанию — не нужно подключать отдельные SDK.

NOTE

ObjectStore — это тот же самый crate, который используют delta-rs и iceberg-rust. Все три проекта (DataFusion, Delta, Iceberg) разделяют один и тот же I/O-слой.

Конфигурация S3

Python: автоматическая регистрация

DataFusion Python автоматически создаёт ObjectStore при регистрации таблицы с S3 URL:

import datafusion

ctx = datafusion.SessionContext()

# Автоматически создаёт AmazonS3 ObjectStore
ctx.register_parquet(
    "events",
    "s3://my-bucket/warehouse/events/",
    table_partition_cols=[("date", "string")]
)

# Credentials из переменных окружения:
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
# или из ~/.aws/credentials (AWS CLI profile)

Python: явная конфигурация

import datafusion
from datafusion import SessionContext

ctx = SessionContext()

# Регистрация с явными credentials
ctx.register_object_store(
    "s3://my-bucket",
    options={
        "aws_access_key_id": "AKIA...",
        "aws_secret_access_key": "...",
        "region": "eu-west-1",
    }
)

ctx.register_parquet("events", "s3://my-bucket/events/")

Rust: ручная регистрация

use datafusion::prelude::*;
use object_store::aws::AmazonS3Builder;
use url::Url;

let ctx = SessionContext::new();

// Создание S3 ObjectStore с явными credentials
let s3 = AmazonS3Builder::new()
    .with_bucket_name("my-bucket")
    .with_region("eu-west-1")
    .with_access_key_id("AKIA...")
    .with_secret_access_key("...")
    .build()?;

// Регистрация в RuntimeEnv
let url = Url::parse("s3://my-bucket")?;
ctx.runtime_env()
    .register_object_store(&url, Arc::new(s3));

// Теперь можно читать s3://my-bucket/...
ctx.register_parquet("events", "s3://my-bucket/events/", Default::default())
    .await?;

S3-compatible storage (MinIO, Ceph)

# MinIO — S3-совместимый storage для локальной разработки
ctx.register_object_store(
    "s3://local-bucket",
    options={
        "aws_access_key_id": "minioadmin",
        "aws_secret_access_key": "minioadmin",
        "endpoint": "http://localhost:9000",
        "allow_http": "true",           # MinIO на HTTP
        "region": "us-east-1",          # обязательно, даже для MinIO
    }
)
WARNING

Для S3-compatible storage (MinIO, Ceph, Garage) обязательно укажите endpoint и allow_http (если endpoint на HTTP). Без endpoint DataFusion обращается к AWS S3. Без allow_http — отклоняет HTTP-подключения.

Конфигурация GCS

Python

ctx.register_object_store(
    "gs://my-bucket",
    options={
        "service_account_path": "/path/to/service-account.json",
        # или service_account_key: "{...json...}" для inline-ключа
    }
)

ctx.register_parquet("events", "gs://my-bucket/events/")

Rust

use object_store::gcp::GoogleCloudStorageBuilder;

let gcs = GoogleCloudStorageBuilder::new()
    .with_bucket_name("my-bucket")
    .with_service_account_path("/path/to/key.json")
    .build()?;

ctx.runtime_env()
    .register_object_store(&Url::parse("gs://my-bucket")?, Arc::new(gcs));
TIP

В GKE (Google Kubernetes Engine) используйте Workload Identity вместо service account ключей. DataFusion подхватит credentials автоматически через Application Default Credentials (ADC).

Конфигурация Azure Blob Storage

Python

ctx.register_object_store(
    "az://my-container",
    options={
        "account_name": "myaccount",
        "access_key": "...",
        # или "sas_token": "...",
        # или "client_id": "...", "client_secret": "...", "tenant_id": "..."
    }
)

ctx.register_parquet("events", "az://my-container/events/")

Rust

use object_store::azure::MicrosoftAzureBuilder;

let azure = MicrosoftAzureBuilder::new()
    .with_container_name("my-container")
    .with_account("myaccount")
    .with_access_key("...")
    .build()?;

ctx.runtime_env()
    .register_object_store(&Url::parse("az://my-container")?, Arc::new(azure));

Мультиформатная архитектура

В реальных системах данные хранятся в разных форматах. DataFusion позволяет запрашивать их в одном SQL-запросе:

Мультиформатная архитектура с DataFusion
SQL: SELECT … FROM delta_events JOIN iceberg_users JOIN csv_configОдин SQL-запрос с JOIN между разными форматами
DataFusion Query Engine
Delta Lake
Iceberg
Parquet
CSV
import datafusion
from deltalake import DeltaTable
from pyiceberg.catalog import load_catalog

ctx = datafusion.SessionContext()

# Delta Lake таблица на S3
delta = DeltaTable("s3://warehouse/events", storage_options={
    "AWS_REGION": "eu-west-1"
})
ctx.register_table_provider("events", delta)

# Iceberg таблица через каталог
catalog = load_catalog("prod", **{"type": "rest", "uri": "http://catalog:8181"})
iceberg_table = catalog.load_table("analytics.users")
ctx.register_table_provider("users", iceberg_table)

# Raw Parquet на S3
ctx.register_parquet("sessions", "s3://warehouse/sessions/")

# Локальный CSV
ctx.register_csv("config", "./data/config.csv")

# Один запрос — четыре формата, два облака
result = ctx.sql("""
    SELECT
        u.country,
        c.region_name,
        count(DISTINCT e.user_id) as active_users,
        count(DISTINCT s.session_id) as sessions
    FROM events e
    JOIN users u ON e.user_id = u.user_id
    JOIN sessions s ON e.session_id = s.session_id
    JOIN config c ON u.country = c.country_code
    WHERE e.event_ts >= TIMESTAMP '2024-03-01 00:00:00'
    GROUP BY u.country, c.region_name
    ORDER BY active_users DESC
""")
result.show()
NOTE

DataFusion оптимизирует мультиформатные запросы так же, как однородные: predicate pushdown работает для каждого TableProvider, join reordering учитывает statistics от каждого формата.

Выбор формата: дерево решений

Выбор между raw Parquet, Delta Lake и Iceberg зависит от требований к данным:

Дерево решений: выбор формата хранения
Нужны ACID-транзакции?Первый вопрос: нужны ли ACID-транзакции?
Нет → Raw ParquetНет ACID → raw Parquet
Когда
Да → Partition evolution?Да ACID → нужен partition evolution?
Нет → Delta LakeНет partition evolution → Delta Lake
Когда
Да → IcebergДа partition evolution → Iceberg
Когда

Подробная матрица выбора

КритерийRaw ParquetDelta LakeApache Iceberg
ACIDНетДаДа
Time travelНетДа (по version/timestamp)Да (по snapshot/timestamp)
Concurrent writesUnsafeSerializableSerializable
File pruningRow group stats (читаем footer)Transaction log stats (1 read)Manifest stats (read manifests)
Schema evolutionНет (только add columns в Parquet)Add/renameAdd/rename/drop с column IDs
Partition evolutionНетНет (перезапись)Да (in-place)
Catalog requirementНетНет (path-based)Да
DataFusion overheadМинимальныйЗагрузка transaction logЗагрузка metadata + manifests
Рекомендуемый объём<100 GB, static100 GB — 10 TB100 GB — 100+ TB

Миграция между форматами

Raw Parquet → Delta Lake

Самый частый путь миграции — добавить транзакционный слой к существующим Parquet-файлам:

import datafusion
from deltalake import write_deltalake, DeltaTable

ctx = datafusion.SessionContext()

# Читаем существующие Parquet-файлы через DataFusion
ctx.register_parquet("raw_events", "s3://old-bucket/events/")

# Трансформация (если нужно)
result = ctx.sql("""
    SELECT *, DATE_TRUNC('day', event_ts) as event_date
    FROM raw_events
""")

# Запись в Delta-формат
write_deltalake(
    "s3://warehouse/events_delta",
    result.to_arrow_table(),
    partition_by=["event_date"],
    mode="overwrite",
    storage_options={"AWS_REGION": "eu-west-1"}
)

# Проверка
delta = DeltaTable("s3://warehouse/events_delta")
print(f"Version: {delta.version()}, Files: {len(delta.files())}")

Delta Lake → Iceberg

Миграция между форматами требует чтения данных из одного формата и записи в другой:

import datafusion
from deltalake import DeltaTable
from pyiceberg.catalog import load_catalog

ctx = datafusion.SessionContext()

# Читаем из Delta
delta = DeltaTable("s3://warehouse/events_delta")
ctx.register_table_provider("delta_events", delta)

# Читаем все данные через DataFusion
result = ctx.sql("SELECT * FROM delta_events")
arrow_table = result.to_arrow_table()

# Создаём Iceberg-таблицу и записываем
catalog = load_catalog("prod", **{"type": "rest", "uri": "http://catalog:8181"})
iceberg_table = catalog.create_table(
    "analytics.events",
    schema=arrow_table.schema,
    partition_spec=PartitionSpec(
        PartitionField(source_id=2, field_id=1000,
                       transform=DayTransform(), name="event_day")
    )
)
iceberg_table.append(arrow_table)
WARNING

Миграция больших таблиц (>1 TB) не должна делаться через to_arrow_table() — это загрузит всю таблицу в память. Используйте batch processing: читайте и записывайте по partition за раз.

Оптимизация I/O для object store

Параллелизм чтения

DataFusion читает файлы параллельно через target_partitions. Для object store это означает concurrent HTTP-запросы:

# Увеличение параллелизма для S3 (по умолчанию = num_cores)
ctx = datafusion.SessionContext()
ctx.set_config("datafusion.execution.target_partitions", "32")

Предотвращение проблемы мелких файлов

Object store имеет высокую латентность на каждый запрос (~50ms для S3). Тысячи мелких файлов (каждый <1 MB) создают bottleneck:

1000 файлов × 1 KB × 50ms/запрос ≈ 50 секунд только на метаданные
  vs.
10 файлов × 100 MB × 50ms/запрос ≈ 0.5 секунды + transfer time

Оптимальный размер файла для object store: 128-256 MB.

Кеширование метаданных

Для часто запрашиваемых таблиц имеет смысл кешировать Parquet footer и statistics:

// Rust: кастомный ObjectStore с кешированием
use object_store::memory::InMemory;
use object_store::PrefixStore;

// В DataFusion нет встроенного кеша ObjectStore,
// но можно реализовать обёртку через ObjectStore trait
struct CachedObjectStore {
    inner: Arc<dyn ObjectStore>,
    cache: Arc<Cache<String, Bytes>>,
}
TIP

Для Delta Lake metadata caching работает автоматически — DeltaTable кеширует transaction log при загрузке. Для raw Parquet DataFusion кеширует footer после первого чтения в рамках одного SessionContext.

Кросс-ссылки с другими модулями

  • DataSourceExec — физический оператор чтения данных, рассмотренный в модуле 02 (архитектура). ObjectStore вызывается именно из DataSourceExec
  • Partitioning — стратегии параллелизма из модуля 11 (performance tuning). target_partitions и repartition_file_scans работают одинаково для object store и локальных файлов
  • File layout — оптимизация размера файлов и сортировки из модуля 11

Ключевые выводы

  1. ObjectStore trait — единый интерфейс DataFusion для S3/GCS/Azure/local. Crate object_store используется и DataFusion, и delta-rs, и iceberg-rust
  2. Credentials конфигурируются через register_object_store() (Python) или AmazonS3Builder/GoogleCloudStorageBuilder (Rust). Для S3-compatible storage обязательны endpoint и allow_http
  3. Мультиформатные запросы — DataFusion JOIN между Delta, Iceberg, Parquet и CSV в одном SQL-запросе
  4. Выбор формата: raw Parquet для static данных, Delta Lake для ACID с простотой, Iceberg для partition evolution и multi-engine
  5. Оптимальный размер файла на object store: 128-256 MB. Мелкие файлы (<1 MB) создают I/O bottleneck
  6. Миграция между форматами — через DataFusion как ETL-движок: читаем из одного формата, записываем в другой

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. DataFusion, delta-rs и iceberg-rust используют один и тот же crate для доступа к S3/GCS/Azure. Какой?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4