Learning Platform
Глоссарий Troubleshooting
Урок 15.02 · 25 мин
Продвинутый
PaimonLSM-treeCompactionSnapshotManifestBucketStorage layout

Paimon как LSM-tree lakehouse

Apache Paimon — это open table format, который под капотом использует LSM-tree organization (Log-Structured Merge tree). Если ты знаешь RocksDB internals (modul 5 этого курса), Paimon — это RocksDB на S3, упрощённо говоря. Это объясняет почему он быстрый для streaming writes (LSM optimized для append-heavy workloads) и почему его design отличается от Iceberg (flat snapshots).

Z-ordering и data skipping в lakehouse

В этом уроке: что физически лежит на S3, как организованы levels, как работает compaction, почему bucket — критический primitive для performance.


Storage layout: что на диске

Paimon table — это директория на S3/HDFS с определённой структурой:

s3://bucket/warehouse/db.db/orders/
  snapshot/
    snapshot-1                   <- metadata for snapshot 1
    snapshot-2
    snapshot-3
    LATEST                       <- pointer на последний snapshot
    EARLIEST                     <- pointer на oldest available
  manifest/
    manifest-list-uuid-0.avro   <- list of manifest files for snapshot
    manifest-list-uuid-1.avro
    manifest-uuid-0.avro         <- list of data files
    manifest-uuid-1.avro
  schema/
    schema-0                     <- table schema versions
    schema-1
  bucket-0/                      <- bucketed data
    level-0/
      data-uuid-0.parquet        <- L0 files (recent, small)
      data-uuid-1.parquet
    level-1/
      data-uuid-2.parquet        <- L1 files (compacted, medium)
    level-2/
      data-uuid-3.parquet        <- L2 files (compacted, large)
  bucket-1/
    level-0/...
  ...
  bucket-N/
    ...
  tag/
    tag-2026-05-19               <- named snapshots (DR, archival)
  index/
    bucket-0-index.avro          <- bucket index for partial updates

Сравни с Iceberg layout:

s3://bucket/warehouse/orders/
  metadata/
    v1.metadata.json             <- catalog metadata
    snap-12345.avro              <- snapshot manifest
  data/
    file-1.parquet               <- все data files flat, no levels
    file-2.parquet
    ...

Iceberg: flat data, organized только partition-ами (даты, regions). Paimon: levelled, как RocksDB.


Snapshot — atomic version

Snapshot — это immutable version таблицы. Каждый commit (= Flink checkpoint) создаёт новый snapshot.

// snapshot-12345 content (упрощённо)
{
  "version": 3,
  "id": 12345,
  "schemaId": 2,
  "baseManifestList": "manifest-list-abc-0.avro",
  "deltaManifestList": "manifest-list-abc-1.avro",
  "commitUser": "flink-job-X",
  "commitIdentifier": 5678,
  "commitKind": "APPEND",
  "timeMillis": 1716096000000,
  "totalRecordCount": 9876543210,
  "deltaRecordCount": 100000,
  "changelogRecordCount": 0
}

Поля:

  • baseManifestList: указатель на manifest-list с files до этого commit-а.
  • deltaManifestList: указатель на manifest-list с files добавленных/удалённых в этом commit-е.
  • commitKind: APPEND (только добавления), OVERWRITE (replace partition), COMPACT (compaction snapshot).
  • schemaId: версия schema, которая активна на этот snapshot.

Чтение snapshot N = читать baseManifestList(N-1) + deltaManifestList(N). Это incremental — не нужно re-scan всё.

LATEST файл — символическая ссылка (atomic update через rename): “сейчас актуальный snapshot — id=12345”. Reader open LATEST -> получает 12345 -> читает snapshot-12345.


Manifest — что внутри

Manifest list (manifest-list-*.avro) — список manifest files:

[
  {"path": "manifest-uuid-0.avro", "added_files": 5, "deleted_files": 0, "size_bytes": 5400},
  {"path": "manifest-uuid-1.avro", "added_files": 3, "deleted_files": 1, "size_bytes": 3200}
]

Каждый manifest (manifest-*.avro) — список data files (с metadata):

[
  {
    "kind": "ADD",
    "level": 0,
    "path": "bucket-3/level-0/data-uuid-X.parquet",
    "schema_id": 2,
    "min_key": {"id": 1000},
    "max_key": {"id": 2000},
    "row_count": 5000,
    "file_size_in_bytes": 250000,
    "min_seq": 12340,
    "max_seq": 12345
  },
  {
    "kind": "DELETE",
    "level": 0,
    "path": "bucket-3/level-0/data-uuid-Y.parquet",
    ...
  }
]

Поля важны для performance:

  • min_key / max_key: для range queries reader skips files where key range не пересекается с filter.
  • level: reader знает, какие files на каком уровне (для merge order).
  • min_seq / max_seq: sequence numbers — для conflict resolution и changelog reads.

Reading taible:

  1. Открыть snapshot — list of manifest lists.
  2. Открыть manifest lists — list of manifests.
  3. Открыть manifests — list of data files с min/max.
  4. Filter pushdown: skip files где max меньше query.min или min больше query.max.
  5. Прочитать только relevant parquet files.

Хорошо organized read — почти O(1) на planning (только metadata reads), затем parallel data reads.


LSM levels: L0, L1, L2

Это core of Paimon design. Каждый bucket имеет своё levelled organization:

Paimon LSM-tree organization внутри bucket
L0 (recent)много мелких filesКаждый Flink checkpoint -> 1 file в L0 для этого bucket-а. Размер ~100KB-10MB. Reading: union all L0 files
compaction trigger: L0 size > threshold
L1 (medium)средние files, sortedMerged из L0. Sorted по primary key. Файлы non-overlapping (key ranges не пересекаются). Размер ~100MB
compaction trigger: L1 size > threshold
L2 (large)большие files, sortedMerged из L1. Sorted, non-overlapping. Размер ~1GB. Optimized для long-range scans, BI queries

Compaction — это асинхронный background process, который merges files:

  • L0 -> L1: возьми все L0 files (overlapping ranges), merge sort их с потенциально overlapping L1 files в новые sorted L1 files. Удали старые.
  • L1 -> L2: то же между уровнями. Меньше частая операция (L1 размеры большие).

Это снимает мелкий-файл-problem Iceberg. Iceberg оставляет все files плоскими, нужен external compaction job. Paimon ддоит compaction сам, в-процессе работы Flink job-а (или separately).

-- Force compaction (вручную)
CALL sys.compact(
  table => 'orders',
  partitions => "date=2026-05-19"
);

-- Settings для auto-compaction
CREATE TABLE orders (
  id BIGINT,
  ...
) WITH (
  'num-levels' = '3',                          -- L0, L1, L2 (default)
  'compaction.max.file-num' = '50',           -- L0 trigger: 50 files
  'compaction.min.file-num' = '5',
  'target-file-size' = '128 MB',               -- target для L1+
  'changelog-producer' = 'input'               -- changelog generation mode
);

Read path

Когда query читает row by primary key:

SELECT * FROM orders WHERE id = 12345;

Шаги:

  1. Bucket lookup: hash(12345) mod num_buckets = bucket 7 (например).
  2. Read manifest для bucket 7. Получить list файлов на каждом уровне.
  3. L2 lookup: для каждого L2 file проверь min_key <= 12345 <= max_key. Если match — open file и binary-search по index.
  4. L1 lookup: то же самое. L1 имеет более recent данные — если найдено там, может override L2.
  5. L0 lookup: все L0 files (могут иметь любой range). Если найдено — override L1/L2.
  6. Merge: take latest sequence number version. Это final value.

LSM read = scan multiple files. Overhead — но Paimon optimizes:

  • Bloom filters в parquet (опционально). Per-file bloom filter: “is key X possibly in this file?”. 1KB memory per file. False positives, no false negatives — fast skip.
  • Index files (для some cases): explicit B+tree index, faster than scanning parquet metadata.
  • Bucket restricts search к 1 bucket из N — divides search space.

Range queries (WHERE id BETWEEN 1000 AND 2000):

  • Может затронуть несколько buckets (если hash distributes randomly) — все читаются parallel.
  • Внутри bucket: skip files где range не пересекается.

Full-table scan (SELECT count(*) FROM orders):

  • Все buckets parallel. Все files читать.
  • L2 files large, sequential parquet scan быстрый.

Bucket — critical primitive

Bucket — это горизонтальное partitioning внутри Paimon. Каждый bucket — independent LSM tree.

CREATE TABLE orders (
  id BIGINT,
  user_id BIGINT,
  amount DECIMAL(10,2),
  date DATE,
  PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (date)
WITH (
  'bucket' = '32',                  -- 32 buckets per partition
  'bucket-key' = 'id'               -- hash by id
);

Bucket count — fixed (на изменение требуется rewrite). По умолчанию bucket = -1, что означает unaware (no bucketing, не рекомендуется для primary-key tables).

Trade-off bucket count:

  • Слишком мало (например, 4): каждый bucket большой, compaction expensive, parallel read limited 4-way.
  • Слишком много (например, 1000): много мелких files (50 events/sec * 30 sec checkpoint / 1000 buckets = 1.5 records per bucket на checkpoint), много metadata overhead.
  • Sweet spot: ~ количество cores * 2-4 на reader, или ~ N где daily volume / N = ~1GB.

Bucket-key выбор:

  • Primary key: distributes по PK. Все updates на one PK в один bucket — consistent versioning.
  • Random: better load balancing, но нет PK locality.
  • Composite: balance.

Flink writer parallelism оптимально = number of buckets per partition. Каждый writer-subtask handles 1+ buckets exclusively (no cross-subtask coordination).

WARNING

Bucket count rewrite expensive. Если у тебя 32 buckets и оказалось мало (writes slow, files large), смена на 64 = full table rewrite (читать всё, writes в новую структуру). Лучше overprovision немного, чем underprovision. Для unknown volume: start 16, monitor average file size, scale.


Write path

Flink writes в Paimon table flow:

Paimon write path: от Flink record до S3
Flink streamFlink stream of records. PaimonSink принимает RowData в pipeline
ChannelComputerChannelComputer определяет bucket по bucket-key. Records same-bucket идут в одного writer-subtask (через keyBy)
WriteSubtaskКаждый writer-subtask handles несколько buckets. Внутри — MemTable (in-memory sorted buffer) для каждого bucket-а
MemTable[bucket]MemTable per bucket: BTreeMap (или skip-list). Sorted by primary key. Когда заполняется или checkpoint — flush на диск
flush на checkpoint
Parquet file L0Parquet файл записывается в bucket-N/level-0/. Включает sorted records, min/max key, bloom filter. Возвращает file metadata
Committable
GlobalCommitterGlobalCommitter (single instance) собирает все Committables от writers. После checkpoint complete — пишет новый snapshot + manifest list + manifest atomically. LATEST pointer updates
новый snapshot visible
Compactor (async)Background compaction job: detect L0 size threshold, merge в L1. Может быть на том же или отдельном Flink job

Key insights:

  • MemTable (in-RAM) — буфер между record-by-record arrival и parquet write (batch).
  • Flush on checkpoint — гарантия that pending data durable перед commit.
  • GlobalCommitter — atomic snapshot creation. Без него — concurrent writers могли бы writes конфликтовать.
  • Compactor async — не блокирует writes. Может быть dedicated job или inline в writer-job.

Concurrent writers

Paimon allows несколько writers одновременно (например, два Flink jobs пишут в одну таблицу). Conflict resolution:

  • Каждый writer commits atomically (через GlobalCommitter своего job-а).
  • Если два snapshot commits одновременно — последний wins (snapshot id sequential), но: предыдущий snapshot оставлен в storage, manifest references to его files валидны.
  • В случае real conflict (например, same primary key written from two jobs) — последний по sequence number wins. Paimon uses sequence-based versioning.
CREATE TABLE orders (
  id BIGINT,
  ...
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'sequence.field' = 'updated_at'   -- explicit sequence field
);

sequence.field — это field, который определяет ordering. Если updated_at = 2026-05-19T10:00 в одном writer, и updated_at = 2026-05-19T09:50 в другом — первый wins, второй ignored (как stale). Без explicit field — Paimon использует commit sequence (later commit wins, что может быть semantically wrong если data из старого batch arrives late).


Comparison: read latency на больших таблицах

Сценарий: table с 1TB данных, 256 buckets, query SELECT count(*) WHERE region = 'EU'.

Iceberg (без good partitioning):

  • Read manifest (small).
  • Filter files via partition predicate (region = ‘EU’ — если partitioned by region).
  • Если not partitioned — full scan: read all parquet files, filter at engine.
  • 1TB / 1GB per file = 1000 files. Parallel read S3, total ~ 30-60 секунд.

Paimon (bucketed, partitioned by region):

  • Read latest snapshot, manifest, only files for region=‘EU’ partition.
  • 1TB / 4 regions = 250GB per region.
  • 256 buckets * 4 levels * mean file size = ~1024 files in EU.
  • L0 files small (~50MB), L1 ~200MB, L2 ~1GB. Mostly L2 — large, fast sequential read.
  • Parallel read 1024 files (very parallelizable), 30-45 секунд.

Different patterns но similar для full-aggregation. Where Paimon выигрывает — point lookups (PK queries: 1 bucket, 1 file usually) и range scans on bucket-key (skip irrelevant buckets).


Production-перспектива: tuning

Top-tunable Paimon properties:

-- File size targets
'target-file-size' = '128MB'              -- L1/L2 target
'num-sorted-run.compaction-trigger' = '5'  -- L0 -> L1 trigger
'num-sorted-run.stop-trigger' = '20'       -- backpressure trigger

-- Compaction parallelism
'write-buffer-size' = '256MB'              -- per-subtask MemTable budget
'commit.callbacks' = '...'                 -- custom callback после commit
'snapshot.num-retained.min' = '10'         -- min snapshots to retain
'snapshot.num-retained.max' = '1000'       -- max
'snapshot.time-retained' = '1 h'           -- TTL для snapshots

-- Read optimization
'lookup.cache-max-memory-size' = '256MB'  -- для lookup join cache
'scan.snapshot-id' = 'latest'              -- по умолчанию

-- Bucketing
'bucket' = '32'
'bucket-key' = 'id'

Monitor:

  • paimon_compaction_io — bytes read/written by compactor. High — много pending L0.
  • paimon_files_per_level — files на каждом уровне. L0 > 20 — compaction lagging.
  • paimon_snapshot_size_bytes — общий size последнего snapshot.

Common pitfalls:

  • Compaction can’t keep up: L0 grows unbounded -> query slow (scan many L0 files) -> backpressure. Solution: bigger compaction parallelism, dedicated compaction job.
  • Tag/snapshot retention: not configuring retention -> S3 bucket grows infinitely.
  • Bucket count wrong: most common — too few buckets, single writer bottleneck.

Проверка знанийKnowledge check
Production Paimon table: 100GB/day ingestion, bucket count = 8, parallelism = 32 writers, query latency на PK lookups 5-10 секунд (целью было меньше 100ms). Метрики: paimon_files_per_level показывает L0=200 files, L1=50, L2=15. Что не так и какие 3 fixes сделать?
ОтветAnswer
Диагноз: bucket count слишком маленький (8) при parallelism 32 — 4 writers пишут в каждый bucket, MemTable contention, plus небольшие L0 files plenty. L0=200 — compaction явно отстаёт (норма меньше 20). При PK lookup: scan 200 L0 files в bucket-е + 50 L1 + 15 L2 = 265 file opens на каждую query (S3 latency ~50ms per open) = 13s+. Это объясняет 5-10 секундную latency. Fixes: (1) Увеличить bucket count до 32 (= parallelism). Это требует REWRITE table — провалить новую таблицу через INSERT OVERWRITE с new bucket setting. Большие investments — но fundamental fix. (2) Boost compaction concurrent: 'num-sorted-run.compaction-trigger' = '3' (агрессивнее trigger, default 5), 'num-sorted-run.stop-trigger' = '10' (backpressure быстрее). Также можно поставить dedicated compaction job (separate Flink job, который только compaction делает), чтобы writes не конкурировали с compaction за CPU. (3) Кэширование: 'lookup.cache-max-memory-size' = '1024MB' если queries hot rows (recent). Cache держит L0 files в memory, lookup без S3 round-trip. После fixes: paimon_files_per_level L0 меньше 20, query latency 50-150ms (1-2 file opens в среднем). Bonus: проверь target-file-size = 128MB и убедись что L1/L2 не слишком маленькие — иначе compaction churn без real consolidation.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В Paimon LSM-tree organization, что происходит при checkpoint Flink writer-а?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5