Lakehouse — Paimon, Iceberg, Hudi
Lakehouse-форматы — это попытка совместить гибкость data lake (дешёвое объектное хранилище, открытые форматы Parquet/ORC) с транзакционностью data warehouse (ACID, schema evolution, time travel). Flink — один из движков, который пишет в эти форматы; на read-стороне их умеют все: Spark, Trino, Snowflake, ClickHouse, DuckDB.
В 2026 году в Flink-экосистеме доминируют три формата: Apache Paimon (бывший Flink Table Store, ставший верховым projectом ASF), Apache Iceberg и Apache Hudi. Каждый сделан с своими дизайн-решениями, и выбор не очевиден. Этот урок — попытка дать осмысленный гайд.
Зачем lakehouse
Классический рисёрс: продакшен Kafka -> Flink -> Postgres (или ClickHouse). Работает для оперативного запроса; не работает для:
- Долгосрочного хранения (ClickHouse дорогой по storage, Postgres плохо tunable for analytics).
- Ad-hoc SQL аналитики на исторических данных от разных команд.
- Time-travel запросов (“какие у нас были балансы две недели назад”).
- Schema evolution без блокировок (нельзя
ALTER TABLEв продакшене Postgres на терабайтной таблице).
Lakehouse-формат решает эти проблемы. Данные лежат в S3/GCS/HDFS как Parquet-файлы, метаданные хранят историю всех версий (manifests, snapshots), движок выбирает корректный snapshot для запроса.
Apache Paimon
Paimon — относительно молодой формат (родился как Flink Table Store в 2022, стал Apache top-level в 2024). Дизайн-цель: streaming-first хранилище, нативно для Flink. Использует LSM-tree (Log-Structured Merge) для эффективных upserts с мелкими incremental commits.
Paimon как LSM-tree lakehouse — internalsСильные стороны:
- Streaming sink работает на любой частоте commits (от секунд до минут). Иные форматы (Iceberg) предпочитают minutes из-за overhead’а большого количества snapshots.
- Changelog producer — Paimon может генерировать CDC-changelog (для downstream Flink-стримов, читающих из Paimon как из source).
- Merge engines — настраиваемые: deduplicate (последний по primary key), partial-update (multiple stream sources обновляют разные колонки одного row), aggregation (агрегация при merge).
- Embedded compaction — Flink job может сам автоматически делать compaction (объединение мелких файлов).
Слабые стороны:
- Сравнительно молодой — экосистема меньше Iceberg (Snowflake не читает, ClickHouse поддерживает через bridge).
- Меньше документации, реже встречается в job-постингах.
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 's3://lake/paimon/'
);
USE CATALOG paimon;
CREATE TABLE user_metrics (
user_id STRING,
click_count BIGINT,
last_click_time TIMESTAMP_LTZ(3),
dt STRING,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '8',
'merge-engine' = 'partial-update',
'changelog-producer' = 'lookup',
'snapshot.time-retained' = '7 d'
);
Apache Iceberg
Iceberg — самый “vendor-neutral” формат. Спецификация открыта, реализаций множество: Java, Python, Rust. Поддерживается ВСЕМИ серьёзными движками: Snowflake, BigQuery, Redshift, Databricks, Trino, Presto, Spark, Flink, ClickHouse, DuckDB, Athena, EMR.
Apache Iceberg — глубокое погружениеСильные стороны:
- Универсальная читаемость — таблица, написанная Flink, мгновенно доступна для Snowflake, Spark, ClickHouse.
- Хорошая batch-производительность — оптимизирован для большого числа batch-запросов с эффективным partition pruning.
- Богатый ecosystem REST Catalog — Polaris (Snowflake), Lakekeeper, Tabular (теперь Apache, поглощён в Iceberg сообщество).
Слабые стороны:
- Минимальный commit interval ~1 минута (раньше было 5 минут, сейчас оптимизированы, но всё ещё хуже Paimon для streaming с sub-minute latency).
- Upsert/CDC слабее: Iceberg-spec V3 добавил row-level deletes и position deletes, но компакция этих deletes сложнее чем в Paimon.
- Changelog stream ограничен: нативный CDC source из Iceberg работает, но менее зрел.
CREATE CATALOG iceberg WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'https://lakekeeper.internal/api/iceberg',
'warehouse' = 's3://lake/iceberg/',
'token' = '...'
);
USE CATALOG iceberg;
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
order_date DATE
) PARTITIONED BY (order_date) WITH (
'write.format.default' = 'parquet',
'write.target-file-size-bytes' = '536870912',
'write.distribution-mode' = 'hash'
);
-- Append-only sink (Iceberg v2 spec)
INSERT INTO orders SELECT * FROM kafka_orders;
Apache Hudi
Hudi — старейший из трёх (2017), родился в Uber для CDC use-case. Был долго лидером для streaming-CDC в lakehouse. Сейчас (2026) популярность смещается в сторону Paimon (для Flink-стека) и Iceberg (для multi-engine), но Hudi остаётся силён там, где он исторически вырос: Spark + CDC.
Сильные стороны:
- CDC-first дизайн — Copy-on-Write (CoW) и Merge-on-Read (MoR) режимы. MoR оптимизирован для частых обновлений мелких порций.
- Bootstrap-fra-existing-Parquet — превратить старый Parquet-датасет в Hudi-таблицу без перезаписи.
- Богатые table services — clustering, archiving, indexing встроены.
Слабые стороны:
- Сложность конфигурации — много знобов, легко shot-in-foot. Steep learning curve.
- Slow adoption за пределами Spark — Flink интеграция есть, но менее зрелая, чем Iceberg или Paimon.
- Snowflake/BigQuery не читают Hudi нативно (требуется внешний integration).
CREATE TABLE hudi_orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
ts TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://lake/hudi/orders',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'precombine.field' = 'ts',
'hive_sync.enabled' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://metastore:9083'
);
Сравнение
| Характеристика | Paimon | Iceberg | Hudi |
|---|---|---|---|
| Дизайн | LSM-tree, streaming-first | Snapshot manifest, batch-leaning | CoW / MoR, CDC-leaning |
| Min commit interval | секунды | 30s-1m | секунды (MoR) |
| Upsert | first-class, multiple merge engines | row/position deletes (V3) | first-class, ключевой use-case |
| CDC source | first-class (changelog producer) | new (limited) | first-class |
| Schema evolution | поддержка | поддержка | поддержка |
| Time travel | поддержка | поддержка (snapshots) | поддержка (commits) |
| Ecosystem | Flink, Spark, ClickHouse, Trino | All major engines | Spark first, Flink second |
| Compaction | embedded в Flink job | external job или engine | service + scheduled jobs |
| Сложность | средняя | низкая для batch | высокая |
| Зрелость для Flink | high | medium-high | medium |
Write to Paimon с Flink
Полный production-пример: Flink job читает из Kafka и пишет в Paimon с agregation merge engine.
-- Source
CREATE TABLE clicks (
user_id STRING,
url STRING,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'kafka:9092',
'scan.startup.mode' = 'group-offsets',
'properties.group.id' = 'flink-paimon-writer',
'format' = 'json'
);
-- Paimon catalog
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 's3://lake/paimon/',
's3.endpoint' = 'https://s3.eu-central-1.amazonaws.com'
);
-- Paimon sink with aggregation merge engine
CREATE TABLE paimon.analytics.user_daily_clicks (
dt STRING,
user_id STRING,
click_count BIGINT,
unique_urls BIGINT,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '16',
'merge-engine' = 'aggregation',
'fields.click_count.aggregate-function' = 'sum',
'fields.unique_urls.aggregate-function' = 'sum',
'changelog-producer' = 'lookup',
'snapshot.time-retained' = '7 d',
'compaction.min.file-num' = '5',
'compaction.max.file-num' = '50'
);
-- Pipeline
INSERT INTO paimon.analytics.user_daily_clicks
SELECT
DATE_FORMAT(event_time, 'yyyy-MM-dd') AS dt,
user_id,
1 AS click_count,
1 AS unique_urls
FROM clicks;
Что здесь работает:
- bucket = 16 — Paimon шардирует данные по PK hash в 16 buckets. Каждый bucket — отдельный LSM tree файл.
- merge-engine = aggregation — при merge нескольких записей с одним PK Paimon применяет агрегацию (sum здесь).
- changelog-producer = lookup — Paimon генерирует CDC-changelog при merges, downstream Flink jobs могут читать его как source.
- snapshot.time-retained = 7 d — старые snapshots очищаются через 7 дней (для time travel).
Aggregation merge engine — мощная Paimon-фича. Вместо того чтобы делать GROUP BY в Flink (с unbounded state риском), вы делаете “1 click per insert” в Flink (stateless) и aggregation делается на read-side при merge файлов. State на Flink-стороне — нулевой, агрегация инкрементальная.
Iceberg sink example
INSERT INTO iceberg.warehouse.orders_partitioned
SELECT
order_id,
customer_id,
amount,
CAST(order_time AS DATE) AS order_date
FROM kafka_orders
WHERE order_time > '2026-01-01';
Iceberg делает commit каждые checkpoint-interval секунд. При checkpoint-interval=60s — один commit в минуту. Файлы накапливаются — нужна периодическая компакция (через Spark или Trino).
Как выбрать
Сводный decision tree:
- Streaming pipeline с CDC внутри Flink-стека? — Paimon. Низкая latency, нативная Flink-интеграция.
- Centralized lakehouse читаемый Spark/Trino/Snowflake? — Iceberg. Vendor-neutral, full ecosystem.
- Legacy Spark/Hudi проект, миграция не оправдана? — Hudi.
- Не уверены, в каком направлении проект пойдёт? — Iceberg (самый универсальный, наименьший lock-in).
Попробуй сам
- Возьми Paimon CDC pipeline (Kafka -> Flink -> Paimon). Какие настройки compaction нужны, чтобы файлы не накапливались?
- Сравни время записи 10M rows в Iceberg vs Paimon с одинаковым checkpoint interval = 30s. Что повлияет на разницу?
- Подумай: если ваша organization уже использует Snowflake для BI — какой формат лучше использовать для lakehouse-таблиц, к которым Snowflake должен иметь доступ?