Object Store и мультиформатные конфигурации
В предыдущих уроках мы подключали Delta Lake и Iceberg к DataFusion. Но под капотом обоих форматов работает один и тот же механизм доступа к хранилищу — trait ObjectStore. Этот урок объясняет, как DataFusion читает данные из S3/GCS/Azure, как конфигурировать credentials, и как выбрать между форматами для конкретного сценария.
ObjectStore trait
DataFusion не обращается к файловой системе или облачному хранилищу напрямую. Все I/O-операции проходят через абстракцию ObjectStore:
Библиотека object_store (Apache Arrow проект) предоставляет реализации для всех облачных провайдеров. DataFusion использует её по умолчанию — не нужно подключать отдельные SDK.
ObjectStore — это тот же самый crate, который используют delta-rs и iceberg-rust. Все три проекта (DataFusion, Delta, Iceberg) разделяют один и тот же I/O-слой.
Конфигурация S3
Python: автоматическая регистрация
DataFusion Python автоматически создаёт ObjectStore при регистрации таблицы с S3 URL:
import datafusion
ctx = datafusion.SessionContext()
# Автоматически создаёт AmazonS3 ObjectStore
ctx.register_parquet(
"events",
"s3://my-bucket/warehouse/events/",
table_partition_cols=[("date", "string")]
)
# Credentials из переменных окружения:
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
# или из ~/.aws/credentials (AWS CLI profile)
Python: явная конфигурация
import datafusion
from datafusion import SessionContext
ctx = SessionContext()
# Регистрация с явными credentials
ctx.register_object_store(
"s3://my-bucket",
options={
"aws_access_key_id": "AKIA...",
"aws_secret_access_key": "...",
"region": "eu-west-1",
}
)
ctx.register_parquet("events", "s3://my-bucket/events/")
Rust: ручная регистрация
use datafusion::prelude::*;
use object_store::aws::AmazonS3Builder;
use url::Url;
let ctx = SessionContext::new();
// Создание S3 ObjectStore с явными credentials
let s3 = AmazonS3Builder::new()
.with_bucket_name("my-bucket")
.with_region("eu-west-1")
.with_access_key_id("AKIA...")
.with_secret_access_key("...")
.build()?;
// Регистрация в RuntimeEnv
let url = Url::parse("s3://my-bucket")?;
ctx.runtime_env()
.register_object_store(&url, Arc::new(s3));
// Теперь можно читать s3://my-bucket/...
ctx.register_parquet("events", "s3://my-bucket/events/", Default::default())
.await?;
S3-compatible storage (MinIO, Ceph)
# MinIO — S3-совместимый storage для локальной разработки
ctx.register_object_store(
"s3://local-bucket",
options={
"aws_access_key_id": "minioadmin",
"aws_secret_access_key": "minioadmin",
"endpoint": "http://localhost:9000",
"allow_http": "true", # MinIO на HTTP
"region": "us-east-1", # обязательно, даже для MinIO
}
)
Для S3-compatible storage (MinIO, Ceph, Garage) обязательно укажите endpoint и allow_http (если endpoint на HTTP). Без endpoint DataFusion обращается к AWS S3. Без allow_http — отклоняет HTTP-подключения.
Конфигурация GCS
Python
ctx.register_object_store(
"gs://my-bucket",
options={
"service_account_path": "/path/to/service-account.json",
# или service_account_key: "{...json...}" для inline-ключа
}
)
ctx.register_parquet("events", "gs://my-bucket/events/")
Rust
use object_store::gcp::GoogleCloudStorageBuilder;
let gcs = GoogleCloudStorageBuilder::new()
.with_bucket_name("my-bucket")
.with_service_account_path("/path/to/key.json")
.build()?;
ctx.runtime_env()
.register_object_store(&Url::parse("gs://my-bucket")?, Arc::new(gcs));
В GKE (Google Kubernetes Engine) используйте Workload Identity вместо service account ключей. DataFusion подхватит credentials автоматически через Application Default Credentials (ADC).
Конфигурация Azure Blob Storage
Python
ctx.register_object_store(
"az://my-container",
options={
"account_name": "myaccount",
"access_key": "...",
# или "sas_token": "...",
# или "client_id": "...", "client_secret": "...", "tenant_id": "..."
}
)
ctx.register_parquet("events", "az://my-container/events/")
Rust
use object_store::azure::MicrosoftAzureBuilder;
let azure = MicrosoftAzureBuilder::new()
.with_container_name("my-container")
.with_account("myaccount")
.with_access_key("...")
.build()?;
ctx.runtime_env()
.register_object_store(&Url::parse("az://my-container")?, Arc::new(azure));
Мультиформатная архитектура
В реальных системах данные хранятся в разных форматах. DataFusion позволяет запрашивать их в одном SQL-запросе:
import datafusion
from deltalake import DeltaTable
from pyiceberg.catalog import load_catalog
ctx = datafusion.SessionContext()
# Delta Lake таблица на S3
delta = DeltaTable("s3://warehouse/events", storage_options={
"AWS_REGION": "eu-west-1"
})
ctx.register_table_provider("events", delta)
# Iceberg таблица через каталог
catalog = load_catalog("prod", **{"type": "rest", "uri": "http://catalog:8181"})
iceberg_table = catalog.load_table("analytics.users")
ctx.register_table_provider("users", iceberg_table)
# Raw Parquet на S3
ctx.register_parquet("sessions", "s3://warehouse/sessions/")
# Локальный CSV
ctx.register_csv("config", "./data/config.csv")
# Один запрос — четыре формата, два облака
result = ctx.sql("""
SELECT
u.country,
c.region_name,
count(DISTINCT e.user_id) as active_users,
count(DISTINCT s.session_id) as sessions
FROM events e
JOIN users u ON e.user_id = u.user_id
JOIN sessions s ON e.session_id = s.session_id
JOIN config c ON u.country = c.country_code
WHERE e.event_ts >= TIMESTAMP '2024-03-01 00:00:00'
GROUP BY u.country, c.region_name
ORDER BY active_users DESC
""")
result.show()
DataFusion оптимизирует мультиформатные запросы так же, как однородные: predicate pushdown работает для каждого TableProvider, join reordering учитывает statistics от каждого формата.
Выбор формата: дерево решений
Выбор между raw Parquet, Delta Lake и Iceberg зависит от требований к данным:
Подробная матрица выбора
| Критерий | Raw Parquet | Delta Lake | Apache Iceberg |
|---|---|---|---|
| ACID | Нет | Да | Да |
| Time travel | Нет | Да (по version/timestamp) | Да (по snapshot/timestamp) |
| Concurrent writes | Unsafe | Serializable | Serializable |
| File pruning | Row group stats (читаем footer) | Transaction log stats (1 read) | Manifest stats (read manifests) |
| Schema evolution | Нет (только add columns в Parquet) | Add/rename | Add/rename/drop с column IDs |
| Partition evolution | Нет | Нет (перезапись) | Да (in-place) |
| Catalog requirement | Нет | Нет (path-based) | Да |
| DataFusion overhead | Минимальный | Загрузка transaction log | Загрузка metadata + manifests |
| Рекомендуемый объём | <100 GB, static | 100 GB — 10 TB | 100 GB — 100+ TB |
Миграция между форматами
Raw Parquet → Delta Lake
Самый частый путь миграции — добавить транзакционный слой к существующим Parquet-файлам:
import datafusion
from deltalake import write_deltalake, DeltaTable
ctx = datafusion.SessionContext()
# Читаем существующие Parquet-файлы через DataFusion
ctx.register_parquet("raw_events", "s3://old-bucket/events/")
# Трансформация (если нужно)
result = ctx.sql("""
SELECT *, DATE_TRUNC('day', event_ts) as event_date
FROM raw_events
""")
# Запись в Delta-формат
write_deltalake(
"s3://warehouse/events_delta",
result.to_arrow_table(),
partition_by=["event_date"],
mode="overwrite",
storage_options={"AWS_REGION": "eu-west-1"}
)
# Проверка
delta = DeltaTable("s3://warehouse/events_delta")
print(f"Version: {delta.version()}, Files: {len(delta.files())}")
Delta Lake → Iceberg
Миграция между форматами требует чтения данных из одного формата и записи в другой:
import datafusion
from deltalake import DeltaTable
from pyiceberg.catalog import load_catalog
ctx = datafusion.SessionContext()
# Читаем из Delta
delta = DeltaTable("s3://warehouse/events_delta")
ctx.register_table_provider("delta_events", delta)
# Читаем все данные через DataFusion
result = ctx.sql("SELECT * FROM delta_events")
arrow_table = result.to_arrow_table()
# Создаём Iceberg-таблицу и записываем
catalog = load_catalog("prod", **{"type": "rest", "uri": "http://catalog:8181"})
iceberg_table = catalog.create_table(
"analytics.events",
schema=arrow_table.schema,
partition_spec=PartitionSpec(
PartitionField(source_id=2, field_id=1000,
transform=DayTransform(), name="event_day")
)
)
iceberg_table.append(arrow_table)
Миграция больших таблиц (>1 TB) не должна делаться через to_arrow_table() — это загрузит всю таблицу в память. Используйте batch processing: читайте и записывайте по partition за раз.
Оптимизация I/O для object store
Параллелизм чтения
DataFusion читает файлы параллельно через target_partitions. Для object store это означает concurrent HTTP-запросы:
# Увеличение параллелизма для S3 (по умолчанию = num_cores)
ctx = datafusion.SessionContext()
ctx.set_config("datafusion.execution.target_partitions", "32")
Предотвращение проблемы мелких файлов
Object store имеет высокую латентность на каждый запрос (~50ms для S3). Тысячи мелких файлов (каждый <1 MB) создают bottleneck:
1000 файлов × 1 KB × 50ms/запрос ≈ 50 секунд только на метаданные
vs.
10 файлов × 100 MB × 50ms/запрос ≈ 0.5 секунды + transfer time
Оптимальный размер файла для object store: 128-256 MB.
Кеширование метаданных
Для часто запрашиваемых таблиц имеет смысл кешировать Parquet footer и statistics:
// Rust: кастомный ObjectStore с кешированием
use object_store::memory::InMemory;
use object_store::PrefixStore;
// В DataFusion нет встроенного кеша ObjectStore,
// но можно реализовать обёртку через ObjectStore trait
struct CachedObjectStore {
inner: Arc<dyn ObjectStore>,
cache: Arc<Cache<String, Bytes>>,
}
Для Delta Lake metadata caching работает автоматически — DeltaTable кеширует transaction log при загрузке. Для raw Parquet DataFusion кеширует footer после первого чтения в рамках одного SessionContext.
Кросс-ссылки с другими модулями
- DataSourceExec — физический оператор чтения данных, рассмотренный в модуле 02 (архитектура). ObjectStore вызывается именно из DataSourceExec
- Partitioning — стратегии параллелизма из модуля 11 (performance tuning).
target_partitionsиrepartition_file_scansработают одинаково для object store и локальных файлов - File layout — оптимизация размера файлов и сортировки из модуля 11
Ключевые выводы
- ObjectStore trait — единый интерфейс DataFusion для S3/GCS/Azure/local. Crate
object_storeиспользуется и DataFusion, и delta-rs, и iceberg-rust - Credentials конфигурируются через
register_object_store()(Python) илиAmazonS3Builder/GoogleCloudStorageBuilder(Rust). Для S3-compatible storage обязательныendpointиallow_http - Мультиформатные запросы — DataFusion JOIN между Delta, Iceberg, Parquet и CSV в одном SQL-запросе
- Выбор формата: raw Parquet для static данных, Delta Lake для ACID с простотой, Iceberg для partition evolution и multi-engine
- Оптимальный размер файла на object store: 128-256 MB. Мелкие файлы (<1 MB) создают I/O bottleneck
- Миграция между форматами — через DataFusion как ETL-движок: читаем из одного формата, записываем в другой