Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 40 мин
Продвинутый
Apache HudiCOWMORCopy-on-WriteMerge-on-ReadLog FileSnapshot QueryRead-Optimized QueryIncremental QueryPartial Updates

COW vs MOR Deep-Dive

В предыдущем уроке мы увидели, что Hudi-таблица состоит из FileGroup → FileSlice, а FileSlice может содержать только base file (COW) или base + log files (MOR). Теперь разберём побайтово, как работает запись и чтение в каждом типе.

Выбор COW vs MOR — необратимое решение, закреплённое в hoodie.properties. Это не просто «быстрее пишем / быстрее читаем» — два типа имеют принципиально разные write paths, read paths, и наборы доступных запросов.

NOTE

В Hudi 1.0 появились partial updates для MOR-таблиц — column-level merge, который обновляет только изменённые колонки. Это расширяет разрыв между COW и MOR: MOR получает преимущество не только в скорости записи, но и в эффективности хранения partial changes.

Write Path: COW (Copy-on-Write)

COW — «копирование при записи». При любом изменении (INSERT, UPDATE, DELETE) Hudi полностью перезаписывает затронутые base files:

COW Write Path: Upsert

Входящий батч (upsert)

Входящий батч записей для upsert. Каждая запись имеет record key (primary key) и precombine field (для разрешения дубликатов). Writer должен определить, в какой FileGroup попадает каждая запись.
1. Index lookup

Index: record key → FileGroup

Индекс определяет, существует ли запись в таблице. Если да — возвращает File Group ID и partition path. Если нет — запись считается INSERT. Hudi поддерживает Bloom, Simple, HFile, Bucket индексы (подробнее — Урок 03).
2. Tag records
INSERT (новые)Записи, не найденные в индексе. Будут записаны в новый FileGroup или добавлены к существующему (если файл не достиг max size). Каждая новая запись получает File Group ID.
UPDATE (существующие)Записи, найденные в индексе. Hudi знает конкретный FileGroup для каждой записи. При COW — нужно прочитать весь base file, смержить обновления, и записать заново.
3. Merge + rewrite

Прочитать base → merge → записать новый base

COW merge: для каждого затронутого FileGroup — прочитать текущий base file (Parquet), найти записи по record key, заменить на обновлённые, записать новый base file. Старый base file остаётся (до clean). Даже если обновлена 1 запись из 1M — перезаписывается весь файл.
4. Commit

commit instant (новые base files)

Commit instant создаётся на timeline. Содержит: список новых base files, список заменённых base files, схему, статистику. Атомарный commit — либо все файлы записаны, либо rollback.

Стоимость COW upsert

Допустим, FileGroup содержит 1 млн записей (base file ~100 MB). При обновлении 10 записей из этого FileGroup:

  1. Прочитать весь base file (100 MB read)
  2. Найти 10 записей по record key
  3. Заменить их значениями из батча (precombine определяет «победителя»)
  4. Записать новый base file (100 MB write)

Итого: 200 MB I/O для обновления 10 записей. Это write amplification — цена COW.

COW Write Amplification
До upsertFileGroup fg-001 содержит один base file версии @t1 размером ~100 MB с 1 млн записей. Все записи актуальны.
upsert 10 записей
После upsertСтарый base @t1 помечен как replaced (будет удалён при clean). Новый base @t2 содержит все 1M записей с 10 обновлёнными. Размер примерно тот же — 100 MB. Суммарно: 200 MB I/O для 10 записей.
TIP

COW write amplification можно снизить через clustering — пересортировку данных так, чтобы часто обновляемые записи оказались в одних FileGroup. Тогда upsert затрагивает меньше FileGroup → меньше перезаписей. Подробнее — в Уроке 06.

Write Path: MOR (Merge-on-Read)

MOR — «слияние при чтении». При записи Hudi не трогает base files, а дописывает дельту в log files:

MOR Write Path: Upsert

Входящий батч (upsert)

Входящий батч для upsert — аналогично COW. Тот же процесс index lookup для определения FileGroup. Разница начинается после тегирования.
1. Index lookup + tag

Index: record key → FileGroup

Index lookup идентичен COW: определяет FileGroup для каждой записи. Разница: для UPDATE не нужно читать base file. Записи сериализуются в Avro и дописываются в log file.
2. Append to log

Append Data Block в log file

MOR append: для каждого затронутого FileGroup — создать новый log block (Data Block с Avro-записями) и дописать в log file. Если log file не существует — создать. Не трогает base file. Скорость записи ≈ скорость append I/O.
3. Deltacommit

deltacommit instant (новые log blocks)

Deltacommit instant (не commit!) создаётся на timeline. Содержит: список затронутых log files, количество записей, схему. Timeline различает commit и deltacommit — это позволяет reader'у выбрать стратегию чтения.

Стоимость MOR upsert

Тот же сценарий: 10 обновлённых записей из FileGroup с 1 млн строк:

  1. Сериализовать 10 записей в Avro (~5 KB)
  2. Дописать Data Block в log file (~5 KB write)

Итого: ~5 KB I/O вместо 200 MB. Write amplification отсутствует. Но есть read amplification — при чтении придётся мержить log с base.

MOR: Log Accumulation
t1: созданиеНачальный FileSlice: только base file. Таблица только что создана или прошла compaction — log файлов нет.
t2: upsert 10Первый deltacommit: log file с одним Data Block (10 записей, ~5 KB). Base file не тронут. Reader при snapshot query читает base + log.
t3: upsert 50Второй deltacommit: log file получает второй блок (50 записей, ~25 KB). Или создаётся log.2. Base по-прежнему нетронут.
t4: compactionCompaction: merge base + все log files → новый base. Log файлы помечаются для удаления (clean). Read path снова работает только с base. Цикл повторяется.

HoodieLogFile: внутренняя структура

Log file — не просто append-only файл. Это структурированный контейнер из блоков:

Структура HoodieLogFile
Log HeaderЗаголовок log-файла: версия формата (v2 в Hudi 1.0), File Group ID, base instant time. Записывается один раз при создании файла.
Data Block 1Первый блок данных, записанный при deltacommit t2. Содержит Avro-сериализованные записи. Заголовок блока: instant time, schema (Avro schema JSON), record count, encoding (AVRO), block type (DATA_BLOCK).
Delete BlockБлок удалений: содержит только record keys (не полные записи). При merge — записи с этими ключами исключаются из результата. Экономит I/O: не нужно хранить полную запись для удаления.
Data Block 2Второй блок данных от следующего deltacommit. Может содержать как INSERT, так и UPDATE записи — reader при merge определяет, что делать с каждой записью по record key.

Schema Evolution в Log Blocks

Каждый Data Block хранит свою Avro-схему в заголовке. Это поддерживает schema evolution внутри одного log file:

Block 1: schema v1 {order_id, amount, status}
Block 2: schema v2 {order_id, amount, status, priority} ← новая колонка

При merge reader проецирует записи из разных блоков в общую target schema, заполняя отсутствующие колонки default-значениями.

NOTE

Delta Lake хранит схему в каждом commit. Iceberg — в metadata file с column-id mapping. Hudi хранит схему в каждом log block — это granular подход, который позволяет разным блокам внутри одного log-файла иметь разные схемы. Цена — дублирование schema string (~2-5 KB на блок).

Merge Strategy: как MOR сливает данные при чтении

Когда reader выполняет snapshot query на MOR-таблице, он должен смержить base file с цепочкой log files:

MOR Merge Strategy

Snapshot Query (MOR)

Snapshot query на MOR-таблице. Reader получает список FileSlice (base + log files) от Timeline. Для каждого FileSlice выполняет merge.
  1. Читаем base file (record_key → record)
Шаг 1: Прочитать base file (Parquet). Строим hash map: record_key → record. Для 1M записей — ~500 MB в памяти (зависит от ширины записи). Это bottleneck MOR merge.
  1. Применяем log blocks (overwrite по record_key)
Шаг 2: Последовательно читаем все log blocks. Для каждого Data Block: если record_key уже в hash map — заменяем. Если нет — добавляем. Для Delete Block: удаляем из hash map. Порядок блоков = порядок instant time.
  1. Precombine (разрешаем дубликаты)
Шаг 3: Precombine — если несколько log blocks содержат одну и ту же запись, побеждает запись с максимальным precombine field. Payload class определяет стратегию: OverwriteWithLatest (по умолчанию), ExpressionAvro, Custom.

Результат: merged records

Итог: объединённый набор записей. Reader возвращает их клиенту. Merge overhead зависит от: размера base file (hash map), количества log blocks, количества записей в log. Компакция обнуляет этот overhead.
WARNING

MOR merge загружает весь base file в memory (hash map по record key). Для FileGroup с 10M записей это может потребовать гигабайты RAM. Контролируйте размер FileGroup через hoodie.parquet.max.file.size (по умолчанию 128 MB) и количество записей через hoodie.copyonwrite.record.size.estimate.

Partial Updates (Hudi 1.0)

В Hudi 1.0 MOR получил partial updates — обновление только изменённых колонок:

Partial Updates: Column-Level Merge
Стандартный upsertОбычный upsert заменяет всю запись: log block содержит все колонки обновлённой записи (order_id, amount, status, priority, ...). Даже если изменилась только одна колонка — в log попадают все.
Partial update (1.0)Partial update записывает в log только изменённые колонки + record key. При merge reader обновляет только указанные поля в base-записи. Экономит I/O при обновлении 1-2 колонок в широких таблицах.

Partial updates используют PartialUpdateAvroPayload — payload class, который при merge не перезаписывает null-колонки из log. Это идеально для wide tables (100+ колонок), где обновляются 2-3 поля.

Три типа запросов на MOR-таблице

MOR-таблица поддерживает три типа запросов — это уникальная особенность Hudi, которой нет ни в Delta Lake, ни в Iceberg:

Типы запросов на MOR-таблице
Snapshot QueryСамый точный, но самый дорогой. Читает base file + все log files, выполняет merge. Результат — актуальное состояние на последний completed deltacommit. Эквивалент SELECT * FROM table.
Read-Optimized QueryСамый быстрый, но не актуальный. Читает ТОЛЬКО base files — игнорирует log files. Видит данные на момент последней compaction. Для COW: идентичен snapshot. Для MOR: данные могут отставать.
Incremental QueryЧитает записи, изменённые между двумя instant'ами. Используется для CDC-паттернов: ETL-пайплайн читает только новые/обновлённые записи с последнего checkpoint. Возвращает полные записи (не дельты).

Когда какой использовать

СценарийТип запросаПочему
Дашборд аналитикаRead-OptimizedДопустимо отставание на 1-2 часа, важна скорость
Финансовый отчётSnapshotНужна точность до последней записи
ETL-пайплайнIncrementalОбрабатываем только новые записи
Data science explorationRead-OptimizedМассовое сканирование, merge на каждом файле — overkill
Real-time dashboardSnapshot + частая compactionЧастая compaction снижает merge overhead
TIP

Read-optimized query на MOR — идентичен чтению COW-таблицы: только base files, без merge. Если ваш основной use case — аналитические запросы с допустимым отставанием, MOR + read-optimized + фоновая compaction даёт вам быструю запись и быстрое чтение. Платите за compaction отдельно.

COW vs MOR: полное сравнение

COW vs MOR: Write и Read Path

Copy-on-Write (COW)

COW: Простая модель. Запись дорогая (перезапись base), чтение дешёвое (только base). Нет log файлов. Один тип запроса (snapshot = read-optimized). Не нужна compaction. Подходит для batch workloads с нечастыми обновлениями.
Write PathRead base → merge → write new base. Медленная запись. Write amplification = размер base file. При 128 MB base и обновлении 1 записи — 256 MB I/O (read + write).
Read PathТолько base files. Нет merge overhead. Parquet push-down (column pruning, predicate push-down, statistics) работает на полную мощность.

Merge-on-Read (MOR)

MOR: Сложная модель. Запись дешёвая (append log), чтение дорогое (merge). Три типа запросов. Нужна compaction (async). Подходит для streaming/high-frequency upserts.
Write PathAppend Data Block в log file. Быстрая запись. Write amplification ≈ 0 (только размер дельты). Тип instant: deltacommit (не commit!).
Read Path (snapshot)Base + merge с log files. Read amplification зависит от количества log blocks после последней compaction. Hash map merge в памяти. Column pruning на base, но не на log blocks (они Avro).

Сравнительная таблица

ПараметрCOWMOR
Write latencyВысокая (перезапись base)Низкая (append log)
Read latency (snapshot)Низкая (только base)Высокая (base + merge logs)
Write amplificationВысокаяНизкая
Read amplificationЗависит от log size
Instant typecommitdeltacommit
Типы запросов1 (snapshot = read-optimized)3 (snapshot, read-optimized, incremental)
Compaction нужна? (критически)
Partial updates (1.0)
Мелкие файлы (base перезаписывается) (log файлы, до compaction)
Подходит дляBatch, нечастые upsertsStreaming, частые upserts

Compaction: мост между MOR и COW

Compaction — это процесс превращения MOR FileSlice в COW-подобный: merge log files в base file. После compaction FileSlice содержит только base — как в COW:

Compaction Process
До compactionMOR FileSlice: base file (snapshot на t1) + 5 log files (deltacommits t2-t6). Snapshot query должен смержить все 5 log файлов с base. Read-optimized query видит только данные на t1.
compaction
После compactionНовый base file (snapshot на t6) содержит все данные: оригинальные записи из base @t1 + все upserts из log файлов. Log файлы будут удалены при следующем clean. Read path теперь работает только с base.

Стратегии compaction

Стратегии Compaction
Inline CompactionCompaction выполняется в том же процессе, что и запись. После N deltacommits writer запускает compaction перед следующей записью. Простая настройка, но блокирует write path на время compaction.
Async CompactionCompaction выполняется отдельным процессом/job. Writer только создаёт compaction.requested instant. Отдельный Spark/Flink job подхватывает requested → inflight → completed. Не блокирует запись — recommended.
# Стратегия: inline или async
hoodie.compact.inline=false
# Количество deltacommits до scheduled compaction
hoodie.compact.inline.max.delta.commits=5
# Compaction strategy
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
WARNING

Без compaction log файлы неограниченно растут. Snapshot query на MOR с 1000 log blocks станет невыносимо медленным — merge каждого FileGroup потребует чтения всех 1000 блоков. Всегда настраивайте compaction для MOR-таблиц. Рекомендация: async compaction каждые 5-10 deltacommits.

Когда выбирать COW vs MOR

Decision Tree: COW vs MOR

Частота обновлений?

Первый вопрос: как часто обновляются данные? Если обновления редкие (batch ETL раз в час или реже) — COW. Если частые (streaming каждые 5 минут) — MOR.

Редкие (batch) → COW

Batch обновления (раз в час+): COW оптимален. Перезапись base при каждом batch — приемлемая цена за простоту. Нет compaction overhead. Читатели всегда видят актуальные данные без merge.

Частые → Далее…

Частые обновления: зависит от read-требований. Если допустимо отставание — MOR + read-optimized. Если нужна real-time точность — MOR + частая compaction.

Допустимо отставание?

Второй вопрос: допустимо ли отставание чтения? Read-optimized query на MOR видит данные на момент последней compaction. Если отставание на 10-30 минут допустимо — MOR идеален.

→ MOR

Отставание допустимо: MOR — лучший выбор. Быстрая запись + read-optimized для аналитики + compaction в фоне. Streaming ingest + batch read — классический MOR pattern.

→ MOR + compaction

Real-time точность: MOR с частой compaction (каждые 5 deltacommits). Или рассмотрите COW с microbatch (если batch не слишком частый). Snapshot query на MOR с малым количеством log blocks — приемлемый overhead.

Антипаттерны

DANGER

Не используйте COW для streaming ingest с частотой < 5 минут. Каждый batch будет полностью перезаписывать base files — write amplification на порядок выше, чем MOR append. При 100 FileGroup × 128 MB base = 12.8 GB перезаписи на каждый microbatch.

DANGER

Не используйте MOR без compaction в production. Read amplification растёт линейно с количеством log blocks. После 100 deltacommits без compaction snapshot query может быть в 10-50x медленнее, чем после compaction.

Итоги

  • COW: полная перезапись base при каждом upsert. Дорогая запись, дешёвое чтение. Тип instant — commit
  • MOR: append в log файлы. Дешёвая запись, дорогое чтение (merge). Тип instant — deltacommit
  • Log file — структурированный контейнер из блоков (Data, Delete, Rollback) с Avro-сериализацией и per-block schema
  • Merge strategy: hash map по record key, последовательное применение log blocks, precombine для разрешения дубликатов
  • Три типа запросов на MOR: snapshot (точный, дорогой), read-optimized (быстрый, отстаёт), incremental (CDC)
  • Partial updates (Hudi 1.0): column-level merge для MOR — обновляем только изменённые колонки
  • Compaction — обязательна для MOR. Async compaction каждые 5-10 deltacommits — recommended strategy
  • Выбор: batch с нечастыми обновлениями → COW; streaming с частыми upserts → MOR

В следующем уроке мы разберём индексную подсистему Hudi — как именно происходит index lookup, который определяет маршрутизацию записей по FileGroup.

Hudi в Spark — production

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. COW-таблица содержит FileGroup с base file размером 128 MB (1M записей). Поступает upsert-батч с 50 обновлёнными записями для этого FileGroup. Какой объём I/O потребуется?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6