Learning Platform
Глоссарий Troubleshooting
Урок 09.08 · 12 мин
Средний
Best PracticesPartitioningCompactionVACUUMSchema ManagementData Quality

Лучшие практики Lakehouse

Стратегия партиционирования

Правило 1 GB

Оптимальный размер партиции — ~1 GB сжатых данных (Parquet). Это обеспечивает баланс между параллелизмом чтения и overhead от мелких файлов.

Размер партиции → Влияние:
< 100 MB    → Over-partitioning: тысячи мелких файлов, overhead на metadata
100 MB-2 GB → Оптимально: хороший параллелизм, управляемый metadata
> 5 GB      → Under-partitioning: меньше параллелизма, сложнее compaction

Выбор partition key

# ХОРОШО: дата (medium cardinality, natural time dimension)
.partitionBy("event_date")

# ХОРОШО: регион + дата (если данные равномерно распределены)
.partitionBy("region", "event_date")

# ПЛОХО: user_id (high cardinality → миллионы партиций!)
.partitionBy("user_id")  # Миллионы мелких файлов

# ПЛОХО: status (low cardinality → 3-5 огромных партиций)
.partitionBy("status")  # "new", "paid", "shipped"
WARNING

Anti-pattern: over-partitioning по high-cardinality колонкам

Партиционирование по user_id или transaction_id создаёт миллионы партиций с файлами по несколько KB каждый. Это убивает производительность: файловая система (или object storage) не справляется с listing, metadata занимает больше места, чем данные, compaction работает бесконечно.

Правило: partition key cardinality < 10,000 значений.

Рекомендации по форматам

ФорматРекомендацияКомментарий
Delta LakepartitionBy("date") + Z-ORDERZ-ORDER компенсирует отсутствие partition evolution
IcebergPARTITIONED BY (days(ts))Hidden partitioning + partition evolution
HudipartitionPath = "datestr"Привязан к hoodie.datasource.write.partitionpath.field
PaimonPARTITIONED BY (dt) + bucketbucket-based internal partitioning
Проверка знанийKnowledge check
Почему партиционирование по user_id -- anti-pattern для Lakehouse-таблиц?
ОтветAnswer
user_id -- high cardinality колонка (миллионы уникальных значений). Партиционирование по ней создаёт миллионы мелких партиций с файлами по несколько KB. Проблемы: (1) object storage не справляется с listing миллионов директорий, (2) metadata занимает больше места чем данные, (3) compaction не может эффективно объединить файлы. Рекомендация: используйте partition key с cardinality < 10,000 (date, region, category).

Расписания Compaction

Compaction — критическая операция для всех lakehouse-форматов. Без неё мелкие файлы деградируют производительность чтения.

Delta Lake: OPTIMIZE

# Ежедневный OPTIMIZE (рекомендуется для большинства таблиц)
spark.sql("OPTIMIZE delta.`/data/silver/events`")

# OPTIMIZE + Z-ORDER для часто фильтруемых колонок
spark.sql("""
    OPTIMIZE delta.`/data/silver/events`
    ZORDER BY (event_date, category)
""")

Расписание: 1 раз в день для append-heavy таблиц, 1 раз в неделю для медленно растущих.

Apache Iceberg: rewrite_data_files

# Compaction
spark.sql("""
    CALL iceberg_catalog.system.rewrite_data_files(
        table => 'db.events',
        options => map('target-file-size-bytes', '1073741824')
    )
""")

# Sort order optimization
spark.sql("""
    CALL iceberg_catalog.system.rewrite_data_files(
        table => 'db.events',
        strategy => 'sort',
        sort_order => 'event_date ASC NULLS LAST'
    )
""")

Apache Hudi: Compaction (MOR)

# Inline compaction (при каждой N-ой записи)
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "5")

# Scheduled compaction
spark.sql("""
    CALL run_compaction(
        op => 'schedule',
        table => 'hudi_catalog.db.orders_mor'
    )
""")

Apache Paimon: Auto-Compaction

# Auto-compaction (по умолчанию включена)
spark.sql("""
    CREATE TABLE paimon.db.events (...)
    TBLPROPERTIES (
        'compaction.min.file-num' = '5',
        'compaction.max.file-num' = '50',
        'compaction.target-file-size' = '256mb'
    )
""")
ФорматТип compactionАвтоматическая?Рекомендация
Delta LakeOPTIMIZEНет (scheduled)1 раз/день для active tables
Icebergrewrite_data_filesНет (manual/scheduled)1 раз/день, target 1 GB
HudiCompaction (MOR)Опционально (inline)inline=5 commits для streaming
PaimonAuto-compactionДа (по умолчанию)Настройте thresholds

VACUUM и Retention Policies

Все форматы хранят старые версии файлов для time travel. Без очистки хранилище растёт бесконечно.

# Delta Lake: VACUUM (удаление файлов старше retention)
spark.sql("VACUUM delta.`/data/events` RETAIN 168 HOURS")  # 7 дней

# Iceberg: expire snapshots
spark.sql("""
    CALL iceberg_catalog.system.expire_snapshots(
        'db.events',
        TIMESTAMP '2024-01-08 00:00:00'
    )
""")

# Hudi: clean (автоматический при commit)
.option("hoodie.cleaner.commits.retained", "10")
.option("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS")

# Paimon: snapshot expiration
spark.sql("""
    ALTER TABLE paimon.db.events
    SET TBLPROPERTIES ('snapshot.time-retained' = '7d')
""")
WARNING

Anti-pattern: никогда не vacuuming

Без VACUUM/expire_snapshots старые data files и metadata накапливаются бесконечно. На production-таблице с ежедневными ETL это может быть терабайты мёртвых данных через год. Настройте автоматическую очистку с retention >= времени самого длинного запроса (обычно 7 дней).

Schema Management

Evolution vs Enforcement

ПодходКогда использоватьПример
Schema enforcementBronze -> Silver (валидация)Отбросить записи с неверными типами
Schema evolutionДобавление новых полейНовое поле geo_region в событиях
Schema on readBronze (raw ingestion)JSON без валидации
# Schema enforcement на Silver слое
silver = (
    bronze
    .select(from_json(col("value"), strict_schema).alias("data"))
    .filter(col("data").isNotNull())  # Отбрасываем невалидные
    .select("data.*")
)

# Schema evolution при записи
new_data.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("/data/silver/events")

Рекомендации по слоям

  • Bronze: schema-on-read. Храните всё, ничего не валидируйте.
  • Silver: strict schema enforcement. Дропайте/каратинизируйте невалидные записи.
  • Gold: schema evolution запрещена. Любое изменение — через migration.

Data Quality Gates

Каждый слой медальонной архитектуры должен иметь quality gates — проверки, которые данные проходят перед записью:

Bronze Quality Gates

  • Данные записаны (completeness)
  • Timestamp источника сохранён
  • Metadata Kafka (offset, partition) сохранены

Silver Quality Gates

  • NOT NULL для обязательных полей
  • Типы соответствуют схеме
  • Дедупликация по бизнес-ключу
  • Referential integrity (FK существуют в dimension-таблицах)

Gold Quality Gates

  • Агрегаты сходятся с контрольными суммами Silver
  • Метрики в допустимых диапазонах (нет отрицательных сумм)
  • Полнота данных (все даты/регионы присутствуют)
# Пример quality gate на Silver
from pyspark.sql.functions import col, count, sum as _sum

silver = spark.read.format("delta").load("/data/silver/orders")

# Проверка: нет NULL в обязательных полях
null_check = silver.filter(
    col("order_id").isNull() |
    col("customer_id").isNull()
).count()

assert null_check == 0, f"Silver quality gate failed: {null_check} NULL records"

# Проверка: нет отрицательных сумм
negative_check = silver.filter(col("amount") < 0).count()
assert negative_check == 0, f"Silver quality gate failed: {negative_check} negative amounts"
Проверка знанийKnowledge check
Почему schema enforcement рекомендуется на Silver слое, а не на Bronze?
ОтветAnswer
Bronze слой хранит сырые данные из источника в формате 'as-is' (schema-on-read). Это обеспечивает: (1) полноту данных -- ничего не теряется из-за schema mismatch, (2) возможность re-processing с новой логикой, (3) аудируемость. Schema enforcement на Bronze отбрасывал бы записи с неожиданными полями или типами -- потеря данных. Silver слой -- правильное место для валидации: здесь мы осознанно отбрасываем/каратинизируем невалидные записи, зная что оригинал сохранён в Bronze.

Мониторинг здоровья Lakehouse

Ключевые метрики для наблюдения:

МетрикаПорогДействие
Количество мелких файлов (< 10 MB)> 1000 в партицииOPTIMIZE / compaction
Размер metadata> 10% от данныхexpire_snapshots / VACUUM
Time travel versions> 100VACUUM с коротким retention
Compaction lag (MOR)> 1 часУвеличить inline compaction
Quality gate failures> 0Расследование + алерт
# Проверка здоровья Delta-таблицы
detail = spark.sql("DESCRIBE DETAIL delta.`/data/silver/events`")
detail.select(
    "numFiles",
    "sizeInBytes",
    "numPartitions"
).show()

# Проверка: средний размер файла
avg_file_size = detail.select(
    (col("sizeInBytes") / col("numFiles")).alias("avg_file_bytes")
).first()["avg_file_bytes"]

if avg_file_size < 10 * 1024 * 1024:  # < 10 MB
    print("WARNING: Small files detected, run OPTIMIZE")

Чеклист для production Lakehouse

  • Партиционирование: ~1 GB на партицию, medium cardinality key
  • Compaction: настроено расписание (OPTIMIZE/rewrite/compact)
  • VACUUM/expire: retention >= 7 дней, автоматический запуск
  • Schema enforcement: strict на Silver, evolution на Bronze
  • Quality gates: NOT NULL, type checks, range checks на каждом слое
  • Мониторинг: алерты на мелкие файлы, metadata рост, gate failures
  • Backups: Silver и Gold таблицы имеют retention policy и disaster recovery plan

Что дальше?

Модуль Lakehouse Formats завершён. Вы изучили все 4 основных формата, научились выбирать подходящий и настраивать production-ready lakehouse. В следующем модуле мы перейдём к Apache Arrow — формату in-memory колоночного хранения, который лежит в основе Spark Connect и zero-copy transfer между процессами.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какой размер Parquet-файлов считается оптимальным для lakehouse-таблиц и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8