Запросы и инкрементальная обработка
Hudi поддерживает 5 типов запросов — больше, чем Delta Lake (3: snapshot, time travel, CDF) и Iceberg (4: snapshot, time travel, incremental, changelog). Это одно из ключевых преимуществ Hudi для построения инкрементальных pipeline: вместо полного пересканирования таблицы, downstream-системы читают только изменения с последнего checkpoint.
В Уроке 02 мы видели разницу между snapshot и read-optimized запросами на MOR-таблицах. Здесь мы разберём все 5 типов запросов детально, плюс паттерн checkpoint-based ETL — основу инкрементальных pipeline на Hudi.
Библиотека hudi для Python (hudi-rs, v0.4.0) поддерживает snapshot, incremental и time-travel запросы нативно — без Spark/Flink. Это делает Python-based ETL возможным для read-path, хотя write-path по-прежнему требует Spark или Flink.
Обзор 5 типов запросов
Snapshot Query
Snapshot query — запрос по умолчанию: возвращает полный актуальный snapshot таблицы.
COW-таблица: прямое чтение Parquet
Read timeline → последний commit
Reader получает список completed instants с timeline. Определяет последний commit — это target snapshot.File listing из commit metadata
Из commit metadata получает список base files (Parquet), актуальных на этот commit. Partition pruning и column pruning применяются здесь.Read Parquet → результат
Читает Parquet файлы напрямую. Нет merge step — COW-таблица всегда актуальна на уровне base files. Производительность идентична чтению обычного Parquet.На COW-таблице snapshot query эквивалентен чтению обычного Parquet-датасета — никакого merge overhead. Это аналогично snapshot read в Delta Lake и Iceberg.
MOR-таблица: merge base + logs
FileSlice: base + log files
Reader получает FileSlice для каждого FileGroup: base file (последняя compaction) + список log files (deltacommits после compaction).Merge: base + log₁ + log₂ + … + logₙ
Merge: reader загружает base file, затем последовательно применяет Data Blocks и Delete Blocks из log files. Порядок — по timestamp из instant. Merge — CPU-intensive операция.Актуальный snapshot
Результат — актуальный snapshot с учётом всех deltacommits. Медленнее COW (из-за merge), но write быстрее (append log вместо rewrite).Если MOR-таблица имеет слишком много log files (долго не запускалась compaction), snapshot query замедляется — каждый FileSlice требует merge N log файлов. Регулярная compaction критична для read performance MOR-таблиц.
Read Optimized Query (только MOR)
Read Optimized query — уникальная для Hudi возможность: читает только base files MOR-таблицы, пропуская log files:
-- Spark SQL: выбор типа запроса
-- Snapshot (по умолчанию)
SELECT * FROM hudi_table
-- Read Optimized
SELECT * FROM hudi_table_ro
-- или через hint
SELECT * FROM hudi_table /*+ OPTIONS('hoodie.datasource.query.type'='read_optimized') */
Delta Lake и Iceberg не имеют аналога Read Optimized query, потому что у них нет log files. Это побочный эффект MOR-архитектуры: разделение свежести и скорости чтения. COW-таблицы тоже не поддерживают Read Optimized — у них нет log files.
Incremental Query
Incremental query — ключевая возможность Hudi для построения ETL-pipeline. Вместо полного пересканирования таблицы, reader получает только записи, изменённые между двумя commit timestamps:
Протокол инкрементального запроса
Incremental query работает через metadata в instant файлах: каждый completed commit содержит список затронутых файлов с их FileGroup ID. Reader:
- Читает timeline instants в диапазоне
[beginTime, endTime] - Извлекает список затронутых FileGroup из metadata
- Читает только эти FileGroup (без полного scan)
- Возвращает записи, записанные в этих commit’ах
-- Spark: incremental query
spark.read.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240115120000000")
.option("hoodie.datasource.read.end.instanttime", "20240115180000000")
.load("/path/to/hudi_table")
Incremental query возвращает полные записи (все колонки), а не diff/changelog. Для получения before/after images используйте CDC query. Incremental query — это фильтр по времени записи, а не по содержимому изменений.
CDC Query
CDC query расширяет incremental query: вместо «что изменилось» возвращает как изменилось — с before/after images и op-кодами:
Включение CDC
CDC требует явного включения при создании таблицы:
# Включить CDC logging
hoodie.table.cdc.enabled=true
# Опционально: supplemental logging mode
hoodie.table.cdc.supplemental.logging.mode=data_before_after
CDC в Hudi аналогичен Change Data Feed (CDF) в Delta Lake: оба требуют явного включения, оба генерируют before/after images. Разница: Delta CDF хранит CDC-данные в отдельной директории _change_data/, а Hudi встраивает CDC-информацию в метаданные instant’ов и log-файлов.
Сравнение CDC подходов трёх форматов
Time Travel Query
Time travel позволяет прочитать snapshot таблицы на конкретный момент прошлого:
Target: T3
Пользователь указывает target timestamp. Hudi ищет ближайший completed instant на timeline, не превышающий этот timestamp.Timeline replay до T3
Hudi восстанавливает состояние таблицы на момент T3: файлы из commit'ов T1, T2, T3. Commit'ы T4, T5 игнорируются. Для MOR — merge с log files до T3.Snapshot на момент T3
Результат — snapshot таблицы как она выглядела на момент T3. Все записи, удалённые после T3, восстановлены. Записи, добавленные после T3, отсутствуют.Глубина time travel
Глубина time travel определяется настройками archived timeline:
# До Hudi 1.0: archived timeline в plain Avro файлах
# Глубина ограничена hoodie.keep.min.commits (по умолчанию 20-30 instants)
# Старые instants архивируются и не всегда доступны для time travel
# Hudi 1.0+: LSM-style archived timeline
# Глубина практически неограниченна — LSM хранит compacted history
# Time travel на любой instant в истории таблицы
Hudi 1.0 с LSM-style archived timeline обеспечивает неограниченный time travel — в отличие от Delta Lake, где time travel ограничен VACUUM retention (по умолчанию 7 дней, если VACUUM запущен). Iceberg тоже поддерживает неограниченный time travel через snapshot expiration, но требует явного управления retention.
Checkpoint-Based ETL
Инкрементальные запросы Hudi — основа для checkpoint-based ETL: паттерна, где downstream-pipeline запоминает последний обработанный commit и при следующем запуске читает только новые изменения:
Bronze (Hudi)
Bronze-таблица (Hudi): принимает raw-данные из Kafka/S3/файлов. Каждый batch создаёт commit instant на timeline.ETL: transform + checkpoint
ETL-job читает Bronze инкрементально: от последнего checkpoint до текущего instant. Трансформирует данные и записывает в Silver. Сохраняет новый checkpoint.Silver (Hudi)
Silver-таблица (Hudi): содержит очищенные, обогащённые данные. Тоже Hudi — поддерживает собственные incremental queries для Gold-слоя.Преимущества checkpoint ETL перед full scan
Пример: multi-hop pipeline
# Этап 1: Kafka → Bronze (Hudi, MOR)
# Streaming writer каждые 30 секунд
bronze_checkpoint = "20240115120000000"
# Этап 2: Bronze → Silver (incremental ETL)
silver_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", bronze_checkpoint) \
.load("/data/bronze/orders")
# Трансформация
silver_transformed = silver_df \
.filter("status != 'cancelled'") \
.withColumn("amount_usd", col("amount") * col("exchange_rate"))
# Запись в Silver (Hudi, COW для быстрого чтения)
silver_transformed.write.format("hudi") \
.option("hoodie.table.name", "orders_silver") \
.mode("append") \
.save("/data/silver/orders")
# Обновить checkpoint
new_checkpoint = get_latest_commit("/data/bronze/orders")
save_checkpoint("bronze_to_silver", new_checkpoint)
hudi-rs: Python Read Path
Библиотека hudi (hudi-rs, v0.4.0) — нативный Rust-based reader для Hudi-таблиц. Поддерживает snapshot, incremental и time-travel запросы без Spark/Flink:
Python: hudi.HudiTable()
Python-код вызывает hudi.HudiTable(). Библиотека написана на Rust с PyO3-биндингами. Читает .hoodie/ для timeline metadata и hoodie.properties.Rust core: timeline + Parquet
Rust core парсит timeline instants, восстанавливает file listing. Поддерживает S3, GCS, Azure, local FS через object_store crate. Читает Parquet через arrow-rs.PyArrow Table / RecordBatch
Возвращает PyArrow Table или RecordBatches. Интегрируется с Pandas, Polars, DuckDB через Arrow IPC. Zero-copy где возможно.API примеры
import hudi
# Snapshot query
table = hudi.HudiTable("/path/to/hudi_table")
batches = table.read_snapshot() # List[RecordBatch]
# Time travel
batches = table.read_snapshot(
timestamp="2024-01-15T12:00:00"
)
# Incremental query
batches = table.read_incremental(
start_timestamp="2024-01-15T12:00:00",
end_timestamp="2024-01-15T18:00:00"
)
# Конвертация в pandas
import pyarrow as pa
table_arrow = pa.Table.from_batches(batches)
df = table_arrow.to_pandas()
hudi-rs v0.4.0 — только чтение. Нельзя создать таблицу, выполнить upsert или запустить compaction. Для write-операций используйте Spark с hudi-spark-bundle или Flink с hudi-flink-bundle. Это контрастирует с deltalake (Python, read + write) и pyiceberg (Python, read + write).
Дерево выбора типа запроса
Нужны все данные или только изменения?
Первый вопрос: нужны все данные или только изменения? Если все — snapshot/time-travel/read-optimized. Если изменения — incremental/CDC.Все данные
Нужны все данные. Следующий вопрос: на какой момент? Если текущий — snapshot или read-optimized. Если прошлый — time travel.Snapshot
Snapshot: актуальные данные, merge base+logs (MOR). Максимальная актуальность, средняя скорость.Read Optimized
Read Optimized (MOR only): быстрое чтение, но stale данные (на момент последней compaction). Для dashboards.Time Travel
Time Travel: snapshot на конкретный момент прошлого. Для аудита, дебага, compliance.Только изменения
Нужны только изменения. Следующий вопрос: нужны before/after images или просто изменённые записи?Incremental
Incremental: все записи, изменённые в диапазоне. Полные записи (все колонки). Для ETL-pipeline.CDC
CDC: before/after images с op-кодами. Для downstream CDC consumers (Debezium-compatible). Требует cdc.enabled.Практические рекомендации
Для ETL-pipeline: используйте incremental query + checkpoint. Это основное преимущество Hudi перед full-scan подходом.
Для real-time dashboards на MOR: используйте read-optimized query, если stale данные на 5-15 минут допустимы. Это в разы быстрее snapshot query, потому что нет merge step.
Для CDC-интеграции с downstream: включите hoodie.table.cdc.enabled=true при создании таблицы. Добавить позже возможно, но CDC-данные будут доступны только для новых commit’ов.
Для Python-based analytics: используйте hudi-rs (hudi library) для read-path. Он значительно быстрее PySpark для аналитических query на средних датасетах (до ~100GB).
Итоги
- 5 типов запросов: snapshot, incremental, CDC, time travel, read optimized — больше, чем у Delta Lake (3) и Iceberg (4).
- Incremental query — ключевая сила Hudi: O(ΔN) вместо O(N), основа checkpoint-based ETL.
- CDC query — before/after images с op-кодами. Аналог Delta CDF, но хранится в timeline metadata, а не в отдельной директории.
- Read Optimized — уникален для MOR: быстрый, но stale. Нет аналога в Delta/Iceberg.
- Time Travel — неограниченная глубина в Hudi 1.0 (LSM archived timeline), в отличие от Delta (ограничен VACUUM retention).
- hudi-rs (v0.4.0) — Rust-based Python reader. Snapshot, incremental, time travel без Spark. Только чтение.
В следующем уроке мы разберём Table Services — async-механизмы compaction, cleaning и clustering — и экосистему интеграций Hudi с движками и форматами.