Learning Platform
Глоссарий Troubleshooting
Урок 14.05 · 40 мин
Продвинутый
Apache HudiQueriesSnapshot QueryIncremental QueryCDCTime TravelRead OptimizedCheckpoint ETLhudi-rs

Запросы и инкрементальная обработка

Hudi поддерживает 5 типов запросов — больше, чем Delta Lake (3: snapshot, time travel, CDF) и Iceberg (4: snapshot, time travel, incremental, changelog). Это одно из ключевых преимуществ Hudi для построения инкрементальных pipeline: вместо полного пересканирования таблицы, downstream-системы читают только изменения с последнего checkpoint.

В Уроке 02 мы видели разницу между snapshot и read-optimized запросами на MOR-таблицах. Здесь мы разберём все 5 типов запросов детально, плюс паттерн checkpoint-based ETL — основу инкрементальных pipeline на Hudi.

NOTE

Библиотека hudi для Python (hudi-rs, v0.4.0) поддерживает snapshot, incremental и time-travel запросы нативно — без Spark/Flink. Это делает Python-based ETL возможным для read-path, хотя write-path по-прежнему требует Spark или Flink.

Обзор 5 типов запросов

5 типов запросов Hudi
Snapshot QueryВозвращает полный snapshot таблицы на момент последнего commit. Для COW — читает base files. Для MOR — мержит base + log files (snapshot view). Аналог обычного SELECT * в Delta Lake и Iceberg.
Incremental QueryВозвращает только записи, изменённые между двумя commit timestamps. Hudi сканирует timeline instants в диапазоне и читает только затронутые FileGroup. Ключ для ETL-pipeline.
CDC QueryВозвращает change events (before/after image) в формате CDC. Включает op-код: INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE. Требует hoodie.table.cdc.enabled=true.
Time Travel QuerySnapshot таблицы на конкретный момент прошлого. Hudi читает timeline до указанного instant и восстанавливает состояние. Глубина зависит от archived timeline (unlimited в Hudi 1.0 LSM).
Read Optimized QueryТолько для MOR-таблиц. Читает только base files (Parquet), игнорируя log files. Быстрее snapshot query, но данные могут быть стale — не включают uncommited deltacommits. Нет аналога в Delta/Iceberg.

Snapshot Query

Snapshot query — запрос по умолчанию: возвращает полный актуальный snapshot таблицы.

COW-таблица: прямое чтение Parquet

Snapshot Query: COW

Read timeline → последний commit

Reader получает список completed instants с timeline. Определяет последний commit — это target snapshot.

File listing из commit metadata

Из commit metadata получает список base files (Parquet), актуальных на этот commit. Partition pruning и column pruning применяются здесь.

Read Parquet → результат

Читает Parquet файлы напрямую. Нет merge step — COW-таблица всегда актуальна на уровне base files. Производительность идентична чтению обычного Parquet.

На COW-таблице snapshot query эквивалентен чтению обычного Parquet-датасета — никакого merge overhead. Это аналогично snapshot read в Delta Lake и Iceberg.

MOR-таблица: merge base + logs

Snapshot Query: MOR (merge)

FileSlice: base + log files

Reader получает FileSlice для каждого FileGroup: base file (последняя compaction) + список log files (deltacommits после compaction).

Merge: base + log₁ + log₂ + … + logₙ

Merge: reader загружает base file, затем последовательно применяет Data Blocks и Delete Blocks из log files. Порядок — по timestamp из instant. Merge — CPU-intensive операция.

Актуальный snapshot

Результат — актуальный snapshot с учётом всех deltacommits. Медленнее COW (из-за merge), но write быстрее (append log вместо rewrite).
TIP

Если MOR-таблица имеет слишком много log files (долго не запускалась compaction), snapshot query замедляется — каждый FileSlice требует merge N log файлов. Регулярная compaction критична для read performance MOR-таблиц.

Read Optimized Query (только MOR)

Read Optimized query — уникальная для Hudi возможность: читает только base files MOR-таблицы, пропуская log files:

Read Optimized vs Snapshot на MOR
Snapshot Query (MOR)Мержит base + все log files. Актуальные данные (включая все deltacommits). Медленнее из-за merge step. Используется когда нужна точность.
Read Optimized Query (MOR)Читает только base files (Parquet). Игнорирует log files. Данные актуальны на момент последней compaction. Быстрее — нет merge. Используется для dashboard'ов, где stale данные допустимы.
-- Spark SQL: выбор типа запроса
-- Snapshot (по умолчанию)
SELECT * FROM hudi_table

-- Read Optimized
SELECT * FROM hudi_table_ro
-- или через hint
SELECT * FROM hudi_table /*+ OPTIONS('hoodie.datasource.query.type'='read_optimized') */
NOTE

Delta Lake и Iceberg не имеют аналога Read Optimized query, потому что у них нет log files. Это побочный эффект MOR-архитектуры: разделение свежести и скорости чтения. COW-таблицы тоже не поддерживают Read Optimized — у них нет log files.

Incremental Query

Incremental query — ключевая возможность Hudi для построения ETL-pipeline. Вместо полного пересканирования таблицы, reader получает только записи, изменённые между двумя commit timestamps:

Incremental Query: как работает
TimelineActive timeline содержит все completed instants с метаданными: какие FileGroup были затронуты каждым commit'ом, сколько записей, какие партиции.
beginTime=T2
endTime=T4
РезультатHudi читает metadata из commit@T3 и commit@T4: список затронутых FileGroup. Затем читает только эти FileGroup (base + logs для MOR). Возвращает записи из T3 и T4.

Протокол инкрементального запроса

Incremental query работает через metadata в instant файлах: каждый completed commit содержит список затронутых файлов с их FileGroup ID. Reader:

  1. Читает timeline instants в диапазоне [beginTime, endTime]
  2. Извлекает список затронутых FileGroup из metadata
  3. Читает только эти FileGroup (без полного scan)
  4. Возвращает записи, записанные в этих commit’ах
-- Spark: incremental query
spark.read.format("hudi")
 .option("hoodie.datasource.query.type", "incremental")
 .option("hoodie.datasource.read.begin.instanttime", "20240115120000000")
 .option("hoodie.datasource.read.end.instanttime", "20240115180000000")
 .load("/path/to/hudi_table")
WARNING

Incremental query возвращает полные записи (все колонки), а не diff/changelog. Для получения before/after images используйте CDC query. Incremental query — это фильтр по времени записи, а не по содержимому изменений.

CDC Query

CDC query расширяет incremental query: вместо «что изменилось» возвращает как изменилось — с before/after images и op-кодами:

CDC Query: before/after images
Операция INSERTНовая запись. before_image = null, after_image = новая запись. Op-код: 'i' (insert).
Операция UPDATEОбновление существующей записи. before_image = старое состояние, after_image = новое состояние. Генерируется два события: UPDATE_BEFORE и UPDATE_AFTER.
Операция DELETEУдаление записи. before_image = удалённая запись, after_image = null. Op-код: 'd' (delete).

Включение CDC

CDC требует явного включения при создании таблицы:

# Включить CDC logging
hoodie.table.cdc.enabled=true
# Опционально: supplemental logging mode
hoodie.table.cdc.supplemental.logging.mode=data_before_after
NOTE

CDC в Hudi аналогичен Change Data Feed (CDF) в Delta Lake: оба требуют явного включения, оба генерируют before/after images. Разница: Delta CDF хранит CDC-данные в отдельной директории _change_data/, а Hudi встраивает CDC-информацию в метаданные instant’ов и log-файлов.

Сравнение CDC подходов трёх форматов

CDC: Hudi vs Delta Lake vs Iceberg
Hudi CDCВстроен в timeline metadata. Требует cdc.enabled=true. Before/after images из log files. Op-коды: i/u/d. Работает на COW и MOR.
Delta Lake CDFОтдельная директория _change_data/. Требует delta.enableChangeDataFeed=true. Op-коды: insert, update_preimage, update_postimage, delete. Файлы Parquet с дополнительными колонками.
Iceberg ChangelogChangelog через position delete files + data files между snapshot. Нет отдельного CDC storage — реконструируется из snapshot diff. IncrementalChangelogScan API.

Time Travel Query

Time travel позволяет прочитать snapshot таблицы на конкретный момент прошлого:

Time Travel: восстановление snapshot

Target: T3

Пользователь указывает target timestamp. Hudi ищет ближайший completed instant на timeline, не превышающий этот timestamp.

Timeline replay до T3

Hudi восстанавливает состояние таблицы на момент T3: файлы из commit'ов T1, T2, T3. Commit'ы T4, T5 игнорируются. Для MOR — merge с log files до T3.

Snapshot на момент T3

Результат — snapshot таблицы как она выглядела на момент T3. Все записи, удалённые после T3, восстановлены. Записи, добавленные после T3, отсутствуют.

Глубина time travel

Глубина time travel определяется настройками archived timeline:

# До Hudi 1.0: archived timeline в plain Avro файлах
# Глубина ограничена hoodie.keep.min.commits (по умолчанию 20-30 instants)
# Старые instants архивируются и не всегда доступны для time travel

# Hudi 1.0+: LSM-style archived timeline
# Глубина практически неограниченна — LSM хранит compacted history
# Time travel на любой instant в истории таблицы
TIP

Hudi 1.0 с LSM-style archived timeline обеспечивает неограниченный time travel — в отличие от Delta Lake, где time travel ограничен VACUUM retention (по умолчанию 7 дней, если VACUUM запущен). Iceberg тоже поддерживает неограниченный time travel через snapshot expiration, но требует явного управления retention.

Checkpoint-Based ETL

Инкрементальные запросы Hudi — основа для checkpoint-based ETL: паттерна, где downstream-pipeline запоминает последний обработанный commit и при следующем запуске читает только новые изменения:

Checkpoint-Based ETL Pipeline

Bronze (Hudi)

Bronze-таблица (Hudi): принимает raw-данные из Kafka/S3/файлов. Каждый batch создаёт commit instant на timeline.
incremental query

ETL: transform + checkpoint

ETL-job читает Bronze инкрементально: от последнего checkpoint до текущего instant. Трансформирует данные и записывает в Silver. Сохраняет новый checkpoint.
upsert

Silver (Hudi)

Silver-таблица (Hudi): содержит очищенные, обогащённые данные. Тоже Hudi — поддерживает собственные incremental queries для Gold-слоя.
Checkpoint StorageCheckpoint — это timestamp последнего обработанного commit'а. Хранится вне таблицы: в файле, в базе данных, или в Flink state. При следующем запуске ETL использует его как beginTime для incremental query.

Преимущества checkpoint ETL перед full scan

Full Scan vs Checkpoint ETL
Full Scan ETLКаждый запуск ETL пересканирует всю таблицу. Для 1TB Bronze — это 1TB чтения каждый раз. Не масштабируется: время ETL растёт линейно с размером таблицы.
Checkpoint ETL (Hudi)Каждый запуск ETL читает только изменения с последнего checkpoint. Для 1TB Bronze с 10MB изменений — это 10MB чтения. Время ETL зависит от объёма изменений, не от размера таблицы.

Пример: multi-hop pipeline

# Этап 1: Kafka → Bronze (Hudi, MOR)
# Streaming writer каждые 30 секунд
bronze_checkpoint = "20240115120000000"

# Этап 2: Bronze → Silver (incremental ETL)
silver_df = spark.read.format("hudi") \
 .option("hoodie.datasource.query.type", "incremental") \
 .option("hoodie.datasource.read.begin.instanttime", bronze_checkpoint) \
 .load("/data/bronze/orders")

# Трансформация
silver_transformed = silver_df \
 .filter("status != 'cancelled'") \
 .withColumn("amount_usd", col("amount") * col("exchange_rate"))

# Запись в Silver (Hudi, COW для быстрого чтения)
silver_transformed.write.format("hudi") \
 .option("hoodie.table.name", "orders_silver") \
 .mode("append") \
 .save("/data/silver/orders")

# Обновить checkpoint
new_checkpoint = get_latest_commit("/data/bronze/orders")
save_checkpoint("bronze_to_silver", new_checkpoint)

hudi-rs: Python Read Path

Библиотека hudi (hudi-rs, v0.4.0) — нативный Rust-based reader для Hudi-таблиц. Поддерживает snapshot, incremental и time-travel запросы без Spark/Flink:

hudi-rs: архитектура чтения

Python: hudi.HudiTable()

Python-код вызывает hudi.HudiTable(). Библиотека написана на Rust с PyO3-биндингами. Читает .hoodie/ для timeline metadata и hoodie.properties.

Rust core: timeline + Parquet

Rust core парсит timeline instants, восстанавливает file listing. Поддерживает S3, GCS, Azure, local FS через object_store crate. Читает Parquet через arrow-rs.

PyArrow Table / RecordBatch

Возвращает PyArrow Table или RecordBatches. Интегрируется с Pandas, Polars, DuckDB через Arrow IPC. Zero-copy где возможно.

API примеры

import hudi

# Snapshot query
table = hudi.HudiTable("/path/to/hudi_table")
batches = table.read_snapshot() # List[RecordBatch]

# Time travel
batches = table.read_snapshot(
 timestamp="2024-01-15T12:00:00"
)

# Incremental query
batches = table.read_incremental(
 start_timestamp="2024-01-15T12:00:00",
 end_timestamp="2024-01-15T18:00:00"
)

# Конвертация в pandas
import pyarrow as pa
table_arrow = pa.Table.from_batches(batches)
df = table_arrow.to_pandas()
WARNING

hudi-rs v0.4.0 — только чтение. Нельзя создать таблицу, выполнить upsert или запустить compaction. Для write-операций используйте Spark с hudi-spark-bundle или Flink с hudi-flink-bundle. Это контрастирует с deltalake (Python, read + write) и pyiceberg (Python, read + write).

Дерево выбора типа запроса

Выбор типа запроса Hudi

Нужны все данные или только изменения?

Первый вопрос: нужны все данные или только изменения? Если все — snapshot/time-travel/read-optimized. Если изменения — incremental/CDC.

Все данные

Нужны все данные. Следующий вопрос: на какой момент? Если текущий — snapshot или read-optimized. Если прошлый — time travel.

Snapshot

Snapshot: актуальные данные, merge base+logs (MOR). Максимальная актуальность, средняя скорость.

Read Optimized

Read Optimized (MOR only): быстрое чтение, но stale данные (на момент последней compaction). Для dashboards.

Time Travel

Time Travel: snapshot на конкретный момент прошлого. Для аудита, дебага, compliance.

Только изменения

Нужны только изменения. Следующий вопрос: нужны before/after images или просто изменённые записи?

Incremental

Incremental: все записи, изменённые в диапазоне. Полные записи (все колонки). Для ETL-pipeline.

CDC

CDC: before/after images с op-кодами. Для downstream CDC consumers (Debezium-compatible). Требует cdc.enabled.

Практические рекомендации

TIP

Для ETL-pipeline: используйте incremental query + checkpoint. Это основное преимущество Hudi перед full-scan подходом.

Для real-time dashboards на MOR: используйте read-optimized query, если stale данные на 5-15 минут допустимы. Это в разы быстрее snapshot query, потому что нет merge step.

Для CDC-интеграции с downstream: включите hoodie.table.cdc.enabled=true при создании таблицы. Добавить позже возможно, но CDC-данные будут доступны только для новых commit’ов.

Для Python-based analytics: используйте hudi-rs (hudi library) для read-path. Он значительно быстрее PySpark для аналитических query на средних датасетах (до ~100GB).

Итоги

  • 5 типов запросов: snapshot, incremental, CDC, time travel, read optimized — больше, чем у Delta Lake (3) и Iceberg (4).
  • Incremental query — ключевая сила Hudi: O(ΔN) вместо O(N), основа checkpoint-based ETL.
  • CDC query — before/after images с op-кодами. Аналог Delta CDF, но хранится в timeline metadata, а не в отдельной директории.
  • Read Optimized — уникален для MOR: быстрый, но stale. Нет аналога в Delta/Iceberg.
  • Time Travel — неограниченная глубина в Hudi 1.0 (LSM archived timeline), в отличие от Delta (ограничен VACUUM retention).
  • hudi-rs (v0.4.0) — Rust-based Python reader. Snapshot, incremental, time travel без Spark. Только чтение.

В следующем уроке мы разберём Table Services — async-механизмы compaction, cleaning и clustering — и экосистему интеграций Hudi с движками и форматами.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. MOR-таблица: последняя compaction была 2 часа назад, с тех пор 50 deltacommit'ов. Dashboard-запрос допускает stale данные до 5 минут. Какой тип запроса оптимален?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6