Лучшие практики Lakehouse
Стратегия партиционирования
Правило 1 GB
Оптимальный размер партиции — ~1 GB сжатых данных (Parquet). Это обеспечивает баланс между параллелизмом чтения и overhead от мелких файлов.
Размер партиции → Влияние:
< 100 MB → Over-partitioning: тысячи мелких файлов, overhead на metadata
100 MB-2 GB → Оптимально: хороший параллелизм, управляемый metadata
> 5 GB → Under-partitioning: меньше параллелизма, сложнее compaction
Выбор partition key
# ХОРОШО: дата (medium cardinality, natural time dimension)
.partitionBy("event_date")
# ХОРОШО: регион + дата (если данные равномерно распределены)
.partitionBy("region", "event_date")
# ПЛОХО: user_id (high cardinality → миллионы партиций!)
.partitionBy("user_id") # Миллионы мелких файлов
# ПЛОХО: status (low cardinality → 3-5 огромных партиций)
.partitionBy("status") # "new", "paid", "shipped"
Anti-pattern: over-partitioning по high-cardinality колонкам
Партиционирование по user_id или transaction_id создаёт миллионы партиций с файлами по несколько KB каждый. Это убивает производительность: файловая система (или object storage) не справляется с listing, metadata занимает больше места, чем данные, compaction работает бесконечно.
Правило: partition key cardinality < 10,000 значений.
Рекомендации по форматам
| Формат | Рекомендация | Комментарий |
|---|---|---|
| Delta Lake | partitionBy("date") + Z-ORDER | Z-ORDER компенсирует отсутствие partition evolution |
| Iceberg | PARTITIONED BY (days(ts)) | Hidden partitioning + partition evolution |
| Hudi | partitionPath = "datestr" | Привязан к hoodie.datasource.write.partitionpath.field |
| Paimon | PARTITIONED BY (dt) + bucket | bucket-based internal partitioning |
Расписания Compaction
Compaction — критическая операция для всех lakehouse-форматов. Без неё мелкие файлы деградируют производительность чтения.
Delta Lake: OPTIMIZE
# Ежедневный OPTIMIZE (рекомендуется для большинства таблиц)
spark.sql("OPTIMIZE delta.`/data/silver/events`")
# OPTIMIZE + Z-ORDER для часто фильтруемых колонок
spark.sql("""
OPTIMIZE delta.`/data/silver/events`
ZORDER BY (event_date, category)
""")
Расписание: 1 раз в день для append-heavy таблиц, 1 раз в неделю для медленно растущих.
Apache Iceberg: rewrite_data_files
# Compaction
spark.sql("""
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.events',
options => map('target-file-size-bytes', '1073741824')
)
""")
# Sort order optimization
spark.sql("""
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.events',
strategy => 'sort',
sort_order => 'event_date ASC NULLS LAST'
)
""")
Apache Hudi: Compaction (MOR)
# Inline compaction (при каждой N-ой записи)
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "5")
# Scheduled compaction
spark.sql("""
CALL run_compaction(
op => 'schedule',
table => 'hudi_catalog.db.orders_mor'
)
""")
Apache Paimon: Auto-Compaction
# Auto-compaction (по умолчанию включена)
spark.sql("""
CREATE TABLE paimon.db.events (...)
TBLPROPERTIES (
'compaction.min.file-num' = '5',
'compaction.max.file-num' = '50',
'compaction.target-file-size' = '256mb'
)
""")
| Формат | Тип compaction | Автоматическая? | Рекомендация |
|---|---|---|---|
| Delta Lake | OPTIMIZE | Нет (scheduled) | 1 раз/день для active tables |
| Iceberg | rewrite_data_files | Нет (manual/scheduled) | 1 раз/день, target 1 GB |
| Hudi | Compaction (MOR) | Опционально (inline) | inline=5 commits для streaming |
| Paimon | Auto-compaction | Да (по умолчанию) | Настройте thresholds |
VACUUM и Retention Policies
Все форматы хранят старые версии файлов для time travel. Без очистки хранилище растёт бесконечно.
# Delta Lake: VACUUM (удаление файлов старше retention)
spark.sql("VACUUM delta.`/data/events` RETAIN 168 HOURS") # 7 дней
# Iceberg: expire snapshots
spark.sql("""
CALL iceberg_catalog.system.expire_snapshots(
'db.events',
TIMESTAMP '2024-01-08 00:00:00'
)
""")
# Hudi: clean (автоматический при commit)
.option("hoodie.cleaner.commits.retained", "10")
.option("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS")
# Paimon: snapshot expiration
spark.sql("""
ALTER TABLE paimon.db.events
SET TBLPROPERTIES ('snapshot.time-retained' = '7d')
""")
Anti-pattern: никогда не vacuuming
Без VACUUM/expire_snapshots старые data files и metadata накапливаются бесконечно. На production-таблице с ежедневными ETL это может быть терабайты мёртвых данных через год. Настройте автоматическую очистку с retention >= времени самого длинного запроса (обычно 7 дней).
Schema Management
Evolution vs Enforcement
| Подход | Когда использовать | Пример |
|---|---|---|
| Schema enforcement | Bronze -> Silver (валидация) | Отбросить записи с неверными типами |
| Schema evolution | Добавление новых полей | Новое поле geo_region в событиях |
| Schema on read | Bronze (raw ingestion) | JSON без валидации |
# Schema enforcement на Silver слое
silver = (
bronze
.select(from_json(col("value"), strict_schema).alias("data"))
.filter(col("data").isNotNull()) # Отбрасываем невалидные
.select("data.*")
)
# Schema evolution при записи
new_data.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/data/silver/events")
Рекомендации по слоям
- Bronze: schema-on-read. Храните всё, ничего не валидируйте.
- Silver: strict schema enforcement. Дропайте/каратинизируйте невалидные записи.
- Gold: schema evolution запрещена. Любое изменение — через migration.
Data Quality Gates
Каждый слой медальонной архитектуры должен иметь quality gates — проверки, которые данные проходят перед записью:
Bronze Quality Gates
- Данные записаны (completeness)
- Timestamp источника сохранён
- Metadata Kafka (offset, partition) сохранены
Silver Quality Gates
- NOT NULL для обязательных полей
- Типы соответствуют схеме
- Дедупликация по бизнес-ключу
- Referential integrity (FK существуют в dimension-таблицах)
Gold Quality Gates
- Агрегаты сходятся с контрольными суммами Silver
- Метрики в допустимых диапазонах (нет отрицательных сумм)
- Полнота данных (все даты/регионы присутствуют)
# Пример quality gate на Silver
from pyspark.sql.functions import col, count, sum as _sum
silver = spark.read.format("delta").load("/data/silver/orders")
# Проверка: нет NULL в обязательных полях
null_check = silver.filter(
col("order_id").isNull() |
col("customer_id").isNull()
).count()
assert null_check == 0, f"Silver quality gate failed: {null_check} NULL records"
# Проверка: нет отрицательных сумм
negative_check = silver.filter(col("amount") < 0).count()
assert negative_check == 0, f"Silver quality gate failed: {negative_check} negative amounts"
Мониторинг здоровья Lakehouse
Ключевые метрики для наблюдения:
| Метрика | Порог | Действие |
|---|---|---|
| Количество мелких файлов (< 10 MB) | > 1000 в партиции | OPTIMIZE / compaction |
| Размер metadata | > 10% от данных | expire_snapshots / VACUUM |
| Time travel versions | > 100 | VACUUM с коротким retention |
| Compaction lag (MOR) | > 1 час | Увеличить inline compaction |
| Quality gate failures | > 0 | Расследование + алерт |
# Проверка здоровья Delta-таблицы
detail = spark.sql("DESCRIBE DETAIL delta.`/data/silver/events`")
detail.select(
"numFiles",
"sizeInBytes",
"numPartitions"
).show()
# Проверка: средний размер файла
avg_file_size = detail.select(
(col("sizeInBytes") / col("numFiles")).alias("avg_file_bytes")
).first()["avg_file_bytes"]
if avg_file_size < 10 * 1024 * 1024: # < 10 MB
print("WARNING: Small files detected, run OPTIMIZE")
Чеклист для production Lakehouse
- Партиционирование: ~1 GB на партицию, medium cardinality key
- Compaction: настроено расписание (OPTIMIZE/rewrite/compact)
- VACUUM/expire: retention >= 7 дней, автоматический запуск
- Schema enforcement: strict на Silver, evolution на Bronze
- Quality gates: NOT NULL, type checks, range checks на каждом слое
- Мониторинг: алерты на мелкие файлы, metadata рост, gate failures
- Backups: Silver и Gold таблицы имеют retention policy и disaster recovery plan
Что дальше?
Модуль Lakehouse Formats завершён. Вы изучили все 4 основных формата, научились выбирать подходящий и настраивать production-ready lakehouse. В следующем модуле мы перейдём к Apache Arrow — формату in-memory колоночного хранения, который лежит в основе Spark Connect и zero-copy transfer между процессами.