Paimon как LSM-tree lakehouse
Apache Paimon — это open table format, который под капотом использует LSM-tree organization (Log-Structured Merge tree). Если ты знаешь RocksDB internals (modul 5 этого курса), Paimon — это RocksDB на S3, упрощённо говоря. Это объясняет почему он быстрый для streaming writes (LSM optimized для append-heavy workloads) и почему его design отличается от Iceberg (flat snapshots).
Z-ordering и data skipping в lakehouseВ этом уроке: что физически лежит на S3, как организованы levels, как работает compaction, почему bucket — критический primitive для performance.
Storage layout: что на диске
Paimon table — это директория на S3/HDFS с определённой структурой:
s3://bucket/warehouse/db.db/orders/
snapshot/
snapshot-1 <- metadata for snapshot 1
snapshot-2
snapshot-3
LATEST <- pointer на последний snapshot
EARLIEST <- pointer на oldest available
manifest/
manifest-list-uuid-0.avro <- list of manifest files for snapshot
manifest-list-uuid-1.avro
manifest-uuid-0.avro <- list of data files
manifest-uuid-1.avro
schema/
schema-0 <- table schema versions
schema-1
bucket-0/ <- bucketed data
level-0/
data-uuid-0.parquet <- L0 files (recent, small)
data-uuid-1.parquet
level-1/
data-uuid-2.parquet <- L1 files (compacted, medium)
level-2/
data-uuid-3.parquet <- L2 files (compacted, large)
bucket-1/
level-0/...
...
bucket-N/
...
tag/
tag-2026-05-19 <- named snapshots (DR, archival)
index/
bucket-0-index.avro <- bucket index for partial updates
Сравни с Iceberg layout:
s3://bucket/warehouse/orders/
metadata/
v1.metadata.json <- catalog metadata
snap-12345.avro <- snapshot manifest
data/
file-1.parquet <- все data files flat, no levels
file-2.parquet
...
Iceberg: flat data, organized только partition-ами (даты, regions). Paimon: levelled, как RocksDB.
Snapshot — atomic version
Snapshot — это immutable version таблицы. Каждый commit (= Flink checkpoint) создаёт новый snapshot.
// snapshot-12345 content (упрощённо)
{
"version": 3,
"id": 12345,
"schemaId": 2,
"baseManifestList": "manifest-list-abc-0.avro",
"deltaManifestList": "manifest-list-abc-1.avro",
"commitUser": "flink-job-X",
"commitIdentifier": 5678,
"commitKind": "APPEND",
"timeMillis": 1716096000000,
"totalRecordCount": 9876543210,
"deltaRecordCount": 100000,
"changelogRecordCount": 0
}
Поля:
- baseManifestList: указатель на manifest-list с files до этого commit-а.
- deltaManifestList: указатель на manifest-list с files добавленных/удалённых в этом commit-е.
- commitKind: APPEND (только добавления), OVERWRITE (replace partition), COMPACT (compaction snapshot).
- schemaId: версия schema, которая активна на этот snapshot.
Чтение snapshot N = читать baseManifestList(N-1) + deltaManifestList(N). Это incremental — не нужно re-scan всё.
LATEST файл — символическая ссылка (atomic update через rename): “сейчас актуальный snapshot — id=12345”. Reader open LATEST -> получает 12345 -> читает snapshot-12345.
Manifest — что внутри
Manifest list (manifest-list-*.avro) — список manifest files:
[
{"path": "manifest-uuid-0.avro", "added_files": 5, "deleted_files": 0, "size_bytes": 5400},
{"path": "manifest-uuid-1.avro", "added_files": 3, "deleted_files": 1, "size_bytes": 3200}
]
Каждый manifest (manifest-*.avro) — список data files (с metadata):
[
{
"kind": "ADD",
"level": 0,
"path": "bucket-3/level-0/data-uuid-X.parquet",
"schema_id": 2,
"min_key": {"id": 1000},
"max_key": {"id": 2000},
"row_count": 5000,
"file_size_in_bytes": 250000,
"min_seq": 12340,
"max_seq": 12345
},
{
"kind": "DELETE",
"level": 0,
"path": "bucket-3/level-0/data-uuid-Y.parquet",
...
}
]
Поля важны для performance:
- min_key / max_key: для range queries reader skips files where key range не пересекается с filter.
- level: reader знает, какие files на каком уровне (для merge order).
- min_seq / max_seq: sequence numbers — для conflict resolution и changelog reads.
Reading taible:
- Открыть snapshot — list of manifest lists.
- Открыть manifest lists — list of manifests.
- Открыть manifests — list of data files с min/max.
- Filter pushdown: skip files где max меньше query.min или min больше query.max.
- Прочитать только relevant parquet files.
Хорошо organized read — почти O(1) на planning (только metadata reads), затем parallel data reads.
LSM levels: L0, L1, L2
Это core of Paimon design. Каждый bucket имеет своё levelled organization:
Compaction — это асинхронный background process, который merges files:
- L0 -> L1: возьми все L0 files (overlapping ranges), merge sort их с потенциально overlapping L1 files в новые sorted L1 files. Удали старые.
- L1 -> L2: то же между уровнями. Меньше частая операция (L1 размеры большие).
Это снимает мелкий-файл-problem Iceberg. Iceberg оставляет все files плоскими, нужен external compaction job. Paimon ддоит compaction сам, в-процессе работы Flink job-а (или separately).
-- Force compaction (вручную)
CALL sys.compact(
table => 'orders',
partitions => "date=2026-05-19"
);
-- Settings для auto-compaction
CREATE TABLE orders (
id BIGINT,
...
) WITH (
'num-levels' = '3', -- L0, L1, L2 (default)
'compaction.max.file-num' = '50', -- L0 trigger: 50 files
'compaction.min.file-num' = '5',
'target-file-size' = '128 MB', -- target для L1+
'changelog-producer' = 'input' -- changelog generation mode
);
Read path
Когда query читает row by primary key:
SELECT * FROM orders WHERE id = 12345;
Шаги:
- Bucket lookup:
hash(12345) mod num_buckets= bucket 7 (например). - Read manifest для bucket 7. Получить list файлов на каждом уровне.
- L2 lookup: для каждого L2 file проверь
min_key <= 12345 <= max_key. Если match — open file и binary-search по index. - L1 lookup: то же самое. L1 имеет более recent данные — если найдено там, может override L2.
- L0 lookup: все L0 files (могут иметь любой range). Если найдено — override L1/L2.
- Merge: take latest sequence number version. Это final value.
LSM read = scan multiple files. Overhead — но Paimon optimizes:
- Bloom filters в parquet (опционально). Per-file bloom filter: “is key X possibly in this file?”. 1KB memory per file. False positives, no false negatives — fast skip.
- Index files (для some cases): explicit B+tree index, faster than scanning parquet metadata.
- Bucket restricts search к 1 bucket из N — divides search space.
Range queries (WHERE id BETWEEN 1000 AND 2000):
- Может затронуть несколько buckets (если hash distributes randomly) — все читаются parallel.
- Внутри bucket: skip files где range не пересекается.
Full-table scan (SELECT count(*) FROM orders):
- Все buckets parallel. Все files читать.
- L2 files large, sequential parquet scan быстрый.
Bucket — critical primitive
Bucket — это горизонтальное partitioning внутри Paimon. Каждый bucket — independent LSM tree.
CREATE TABLE orders (
id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
date DATE,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (date)
WITH (
'bucket' = '32', -- 32 buckets per partition
'bucket-key' = 'id' -- hash by id
);
Bucket count — fixed (на изменение требуется rewrite). По умолчанию bucket = -1, что означает unaware (no bucketing, не рекомендуется для primary-key tables).
Trade-off bucket count:
- Слишком мало (например, 4): каждый bucket большой, compaction expensive, parallel read limited 4-way.
- Слишком много (например, 1000): много мелких files (50 events/sec * 30 sec checkpoint / 1000 buckets = 1.5 records per bucket на checkpoint), много metadata overhead.
- Sweet spot: ~ количество cores * 2-4 на reader, или ~ N где daily volume / N = ~1GB.
Bucket-key выбор:
- Primary key: distributes по PK. Все updates на one PK в один bucket — consistent versioning.
- Random: better load balancing, но нет PK locality.
- Composite: balance.
Flink writer parallelism оптимально = number of buckets per partition. Каждый writer-subtask handles 1+ buckets exclusively (no cross-subtask coordination).
Bucket count rewrite expensive. Если у тебя 32 buckets и оказалось мало (writes slow, files large), смена на 64 = full table rewrite (читать всё, writes в новую структуру). Лучше overprovision немного, чем underprovision. Для unknown volume: start 16, monitor average file size, scale.
Write path
Flink writes в Paimon table flow:
Key insights:
- MemTable (in-RAM) — буфер между record-by-record arrival и parquet write (batch).
- Flush on checkpoint — гарантия that pending data durable перед commit.
- GlobalCommitter — atomic snapshot creation. Без него — concurrent writers могли бы writes конфликтовать.
- Compactor async — не блокирует writes. Может быть dedicated job или inline в writer-job.
Concurrent writers
Paimon allows несколько writers одновременно (например, два Flink jobs пишут в одну таблицу). Conflict resolution:
- Каждый writer commits atomically (через GlobalCommitter своего job-а).
- Если два snapshot commits одновременно — последний wins (snapshot id sequential), но: предыдущий snapshot оставлен в storage, manifest references to его files валидны.
- В случае real conflict (например, same primary key written from two jobs) — последний по sequence number wins. Paimon uses sequence-based versioning.
CREATE TABLE orders (
id BIGINT,
...
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'sequence.field' = 'updated_at' -- explicit sequence field
);
sequence.field — это field, который определяет ordering. Если updated_at = 2026-05-19T10:00 в одном writer, и updated_at = 2026-05-19T09:50 в другом — первый wins, второй ignored (как stale). Без explicit field — Paimon использует commit sequence (later commit wins, что может быть semantically wrong если data из старого batch arrives late).
Comparison: read latency на больших таблицах
Сценарий: table с 1TB данных, 256 buckets, query SELECT count(*) WHERE region = 'EU'.
Iceberg (без good partitioning):
- Read manifest (small).
- Filter files via partition predicate (region = ‘EU’ — если partitioned by region).
- Если not partitioned — full scan: read all parquet files, filter at engine.
- 1TB / 1GB per file = 1000 files. Parallel read S3, total ~ 30-60 секунд.
Paimon (bucketed, partitioned by region):
- Read latest snapshot, manifest, only files for region=‘EU’ partition.
- 1TB / 4 regions = 250GB per region.
- 256 buckets * 4 levels * mean file size = ~1024 files in EU.
- L0 files small (~50MB), L1 ~200MB, L2 ~1GB. Mostly L2 — large, fast sequential read.
- Parallel read 1024 files (very parallelizable), 30-45 секунд.
Different patterns но similar для full-aggregation. Where Paimon выигрывает — point lookups (PK queries: 1 bucket, 1 file usually) и range scans on bucket-key (skip irrelevant buckets).
Production-перспектива: tuning
Top-tunable Paimon properties:
-- File size targets
'target-file-size' = '128MB' -- L1/L2 target
'num-sorted-run.compaction-trigger' = '5' -- L0 -> L1 trigger
'num-sorted-run.stop-trigger' = '20' -- backpressure trigger
-- Compaction parallelism
'write-buffer-size' = '256MB' -- per-subtask MemTable budget
'commit.callbacks' = '...' -- custom callback после commit
'snapshot.num-retained.min' = '10' -- min snapshots to retain
'snapshot.num-retained.max' = '1000' -- max
'snapshot.time-retained' = '1 h' -- TTL для snapshots
-- Read optimization
'lookup.cache-max-memory-size' = '256MB' -- для lookup join cache
'scan.snapshot-id' = 'latest' -- по умолчанию
-- Bucketing
'bucket' = '32'
'bucket-key' = 'id'
Monitor:
- paimon_compaction_io — bytes read/written by compactor. High — много pending L0.
- paimon_files_per_level — files на каждом уровне. L0 > 20 — compaction lagging.
- paimon_snapshot_size_bytes — общий size последнего snapshot.
Common pitfalls:
- Compaction can’t keep up: L0 grows unbounded -> query slow (scan many L0 files) -> backpressure. Solution: bigger compaction parallelism, dedicated compaction job.
- Tag/snapshot retention: not configuring retention -> S3 bucket grows infinitely.
- Bucket count wrong: most common — too few buckets, single writer bottleneck.