Apache Hudi: глубокое погружение
Архитектура
Apache Hudi (Hadoop Upserts Deletes and Incrementals) — lakehouse-формат, разработанный Uber в 2016 году. Изначально создан для решения конкретной проблемы: эффективные upserts на масштабе Uber (миллионы поездок в день, каждая обновляется несколько раз).
Ключевые архитектурные элементы:
Hudi таблица:
/data/orders/
├── .hoodie/
│ ├── hoodie.properties ← конфигурация таблицы
│ └── timeline/
│ ├── 20240115100000.commit ← instant (timestamp)
│ ├── 20240115110000.deltacommit ← MOR delta
│ └── 20240115120000.commit ← commit
├── 2024/01/15/
│ ├── base-file-001.parquet ← base file
│ └── .log-001 ← MOR delta log
└── 2024/01/16/
└── base-file-002.parquet
Timeline
Timeline — центральное понятие Hudi. Это упорядоченная последовательность instants (моментов времени), каждый из которых представляет атомарную операцию. Timeline обеспечивает: порядок операций, ACID-гарантии и точку для incremental queries.
Типы instants: commit (COW write), deltacommit (MOR write), compaction (слияние delta logs в base files), clean (удаление старых файлов).
Настройка SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiPipeline") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.jars.packages",
"org.apache.hudi:hudi-spark4-bundle_2.13:1.0.1") \
.getOrCreate()
KryoSerializer обязателен
Hudi требует KryoSerializer вместо стандартного Java-сериализатора. Без этой настройки вы получите ошибки сериализации при upsert-операциях.
Ключевые возможности
Table Types: COW vs MOR
Hudi предлагает два типа таблиц с принципиально разными trade-offs:
Copy-on-Write (COW) — при каждом upsert перезаписывается весь файл данных:
df.write.format("hudi") \
.option("hoodie.table.type", "COPY_ON_WRITE") \
.option("hoodie.table.name", "orders_cow") \
.option("hoodie.datasource.write.recordkey.field", "order_id") \
.option("hoodie.datasource.write.precombine.field", "updated_at") \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("/data/hudi/orders_cow")
Merge-on-Read (MOR) — записывает delta в log-файлы, слияние происходит при чтении или compaction:
df.write.format("hudi") \
.option("hoodie.table.type", "MERGE_ON_READ") \
.option("hoodie.table.name", "orders_mor") \
.option("hoodie.datasource.write.recordkey.field", "order_id") \
.option("hoodie.datasource.write.precombine.field", "updated_at") \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("/data/hudi/orders_mor")
| Параметр | COW | MOR |
|---|---|---|
| Запись | Медленная (rewrite файла) | Быстрая (delta log) |
| Чтение | Быстрое (один файл) | Медленное (merge delta) |
| Latency записи | Высокая | Низкая |
| Latency чтения | Низкая | Высокая (до compaction) |
| Лучший сценарий | Read-heavy, batch updates | Write-heavy, streaming |
Record Key и Precombine
Каждая запись в Hudi-таблице идентифицируется двумя обязательными полями:
- recordkey — уникальный идентификатор записи (аналог primary key)
- precombine — поле для разрешения дубликатов (выбирается запись с большим значением)
# Пример: order_id = record key, updated_at = precombine
# Если приходят два обновления для order_id = "123":
# {order_id: "123", status: "shipped", updated_at: "10:00"}
# {order_id: "123", status: "delivered", updated_at: "11:00"}
# Hudi сохранит запись с updated_at = "11:00" (более новая)
Incremental Queries
Incremental queries — уникальная возможность Hudi. Вы можете запросить только изменённые записи с определённого момента времени:
# Incremental query: только записи, изменённые после timestamp
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime",
"20240115100000") \
.load("/data/hudi/orders_cow")
# Результат: только записи, добавленные/обновлённые после 10:00
Это критично для ETL-пайплайнов: вместо перечитывания всей таблицы на каждом запуске, вы обрабатываете только delta изменений.
# Паттерн: инкрементальный ETL
last_processed = get_last_checkpoint() # "20240115100000"
# Читаем только новые данные
new_data = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime",
last_processed) \
.load("/data/hudi/orders_cow")
# Обрабатываем
processed = transform(new_data)
processed.write.format("hudi").save("/data/hudi/orders_gold")
# Обновляем checkpoint
save_checkpoint(current_instant)
Compaction (MOR)
Для MOR-таблиц compaction — критическая операция. Она сливает delta logs с base files, улучшая read performance:
# Запуск compaction для MOR-таблицы
spark.sql("""
CALL run_compaction(
op => 'run',
table => 'hudi_catalog.db.orders_mor'
)
""")
Anti-pattern: COW для высокочастотных writes
Использование COPY_ON_WRITE для streaming или высокочастотных upserts — частая ошибка. Каждый upsert перезаписывает весь base file (может быть сотни MB). При 1000 upserts/минуту это означает 1000 полных перезаписей. Для write-heavy нагрузок используйте MERGE_ON_READ с регулярным compaction.
Anti-patterns
Частые ошибки при работе с Apache Hudi
-
COW для write-heavy нагрузок — перезапись целого файла при каждом upsert. Используйте MOR + compaction.
-
Не запускать compaction для MOR — без compaction delta logs растут, и чтение деградирует экспоненциально. Настройте scheduled compaction.
-
Неправильный precombine field — если precombine field не монотонно возрастает (например, random UUID), Hudi может сохранять старые записи вместо новых.
-
Забыть KryoSerializer — стандартный Java serializer вызывает ошибки. Всегда указывайте
spark.serializer = org.apache.spark.serializer.KryoSerializer.
Когда использовать Apache Hudi
Лучший выбор, когда:
- Нагрузка upsert-heavy (CDC, real-time обновления)
- Нужны incremental queries для ETL (обработка только дельты)
- Требуется выбор между COW и MOR для оптимизации под профиль нагрузки
- Работаете с AWS EMR (глубокая интеграция)
Не лучший выбор, когда:
- Нужна полная schema evolution (Hudi поддерживает только add columns)
- Хотите partition evolution без rewrite (рассмотрите Iceberg)
- Нужна vendor-нейтральность (рассмотрите Iceberg)
- Streaming-first с native changelog (рассмотрите Paimon)
Для углублённого изучения внутренней архитектуры Hudi (timeline, COW/MOR internals, compaction) см. курс Storage Formats Deep-Dive.
Что дальше?
В следующем уроке мы разберём Apache Paimon — самый молодой lakehouse-формат, разработанный Alibaba для streaming-first архитектуры. Ключевые фичи: LSM-tree хранилище и native changelog CDC.