В уроке про Debezium Server + Pub/Sub мы увидели, что Kafka — не обязательная часть CDC-стека. Существует целый класс sink-адаптеров, которые превращают Debezium Server в самодостаточный CDC-runtime.
В этом уроке мы пойдём дальше и рассмотрим один из самых интересных кейсов: прямой стрим CDC-событий в Apache Iceberg через комьюнити-проект memiiso/debezium-server-iceberg. Получаем Streaming-Lakehouse без Kafka, без Spark, без Flink — буквально один JVM-процесс, который читает WAL и пишет Parquet-файлы в S3/GCS/Azure поверх Iceberg-каталога.
Зачем это вообще нужно
Классический путь “CDC в Lakehouse” выглядит так:
Postgres/MySQL
|
v
Debezium Connector (плагин Kafka Connect)
|
v
Apache Kafka (хранение событий, durable log)
|
v
Iceberg Sink Connector (или Flink/Spark consumer)
|
v
Apache Iceberg на S3/GCS + Catalog (Glue/Polaris/Lakekeeper)
Минимально это четыре управляемых сервиса: Kafka Connect workers, Kafka brokers, ZooKeeper или KRaft-controllers, отдельный sink-runtime. На каждом — ребалансировки, апгрейды, метрики, RBAC, capacity planning.
Для small-medium пайплайнов (десятки таблиц, единицы тысяч событий в секунду) такая инфраструктура — overkill. Альтернатива: выбросить Kafka и stream-processor целиком.
Postgres/MySQL
|
v
Debezium Server (один JVM-процесс)
| in-process: snapshot reader + streaming reader + Iceberg writer
v
Apache Iceberg на S3/GCS + REST Catalog
Когда это уместно
Streaming-Lakehouse без Kafka — это не замена Kafka везде. Это конкретный паттерн для случаев, когда:
- единственный потребитель CDC — Lakehouse (нет fan-out на 10 разных consumers);
- объём — единицы-десятки тысяч событий/с (не сотни тысяч);
- допустима single-writer модель и нет требований к sub-second SLA;
- команда хочет минимизировать операционную поверхность.
Debezium Server: краткое напоминание
Debezium Server — это standalone Quarkus-приложение, которое запускает Debezium Engine в одном процессе и публикует события в один настраиваемый sink. Никакого Kafka Connect, никаких worker-групп, никакого rebalancing protocol.
Поддерживаемые sinks (на момент Debezium 3.x):
| Sink | Назначение |
|---|---|
kafka | классический выход в Kafka (без Connect) |
pubsub | Google Cloud Pub/Sub |
kinesis | AWS Kinesis Data Streams |
pulsar | Apache Pulsar |
eventhubs | Azure Event Hubs |
nats-streaming / nats-jetstream | NATS |
rabbitmq | RabbitMQ |
http | произвольный webhook |
redis | Redis Streams |
iceberg | Apache Iceberg (через расширение memiiso/debezium-server-iceberg) |
Iceberg-sink не входит в основной дистрибутив Debezium Server. Это отдельный проект, собранный поверх Debezium Engine как drop-in расширение.
memiiso/debezium-server-iceberg
memiiso/debezium-server-iceberg — это community-проект, который реализует IcebergChangeConsumer поверх Debezium Engine. Архитектурно он ведёт себя как обычный Debezium Server, но вместо отправки событий в брокер пишет их напрямую в Iceberg-таблицы через Iceberg Java API.
Ключевая идея: Debezium читает WAL и формирует change-events с full schema (envelope с before/after/op/source). Iceberg Consumer берёт батч событий, дедуплицирует по PK в рамках батча, конвертирует в Iceberg-rows и атомарно коммитит снапшот таблицы.
Почему именно memiiso, а не нативный Debezium
Нативный Iceberg-sink в самом upstream-проекте Debezium на 2026 находится в активной разработке, но именно memiiso/debezium-server-iceberg — это production-ready решение, которое уже несколько лет используется в проде у разных команд. Сборка — это отдельный JAR-distribution с теми же application.properties, что и обычный Debezium Server.
Что умеет
- Автосоздание таблиц — при первом батче Iceberg-таблица создаётся автоматически из Debezium-schema (требуется
debezium.format.value.schemas.enable=true). - Два режима записи —
append(полный лог изменений) иupsert(latest-state по PK через equality deletes). - Schema expansion — добавление новых колонок без миграции (когда исходная таблица расширяется).
- Multiple catalog backends — REST (Polaris/Lakekeeper), Glue, Hive, Hadoop, Nessie.
- Multiple object stores — S3, GCS, Azure Blob/ADLS, локальная FS.
Архитектура
В одном JVM-процессе уживаются три компонента:
+--------------------------- Debezium Server (JVM) ---------------------------+
| |
| +------------------+ +------------------+ +------------------+ |
| | Source Engine | | Change Buffer | | Iceberg Writer | |
| | (PG WAL reader) | --> | (in-memory | --> | (Parquet writer | |
| | snapshot+stream | | batch queue) | | + commit) | |
| +------------------+ +------------------+ +------------------+ |
| | |
+--------------------------------------------------------------|---------------+
|
v
+------------+-----------+
| Iceberg REST Catalog |
| (Polaris/Lakekeeper) |
+------------+-----------+
|
v
+------------+-----------+
| Object Store (S3/GCS) |
| data + metadata files |
+------------------------+
Что происходит на каждом шаге:
- Source Engine — Debezium PG/MySQL коннектор. Снимает initial snapshot, потом читает WAL/binlog и эмитит change-events.
- Change Buffer — события батчуются по
debezium.sink.batch.batch-size-waitиmax.batch.size. Внутри батча работает дедупликация по PK (оставляем только последнюю версию записи). - Iceberg Writer — формирует Parquet data files и (в upsert-режиме) equality-delete files. После записи всех файлов делает
Transaction.commitTransaction()через каталог. Коммит атомарный — либо весь батч появляется в новом снапшоте, либо ничего.
Iceberg snapshot = checkpoint
Каждый успешный коммит порождает новый Iceberg-снапшот. Это даёт побочный эффект: снапшот сам по себе является чекпоинтом пайплайна. При рестарте Debezium Server читает свой offset-storage (file/Redis), а Iceberg-таблица уже находится в консистентном состоянии после последнего коммита. Если процесс упал между записью data-files и коммитом — несоммитченные файлы остаются “orphan” и удаляются обычным expire_snapshots/remove_orphan_files.
Режимы записи: append vs upsert
Append mode
debezium.sink.iceberg.upsert=false
Каждое CDC-событие становится отдельной строкой в Iceberg-таблице. Колонки envelope (__op, __ts_ms, __source_ts_ms, __deleted, etc.) сохраняются как есть.
Что получаем:
| id | name | __op | __ts_ms | __deleted |
|---|---|---|---|---|
| 1 | Alice | c | 1714500000000 | false |
| 1 | Alice Z. | u | 1714500120000 | false |
| 1 | Alice Z. | d | 1714500300000 | true |
| 2 | Bob | c | 1714500400000 | false |
Это полный change-log — отлично подходит для аудита, time-travel-аналитики, восстановления состояния таблицы на любой момент через WHERE __ts_ms <= X + window-функции.
Upsert mode
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
Iceberg-таблица содержит только последнее состояние каждой записи по PK. Реализовано через Iceberg V2 equality deletes — это и есть merge-on-read семантика.
Как это работает на write-side:
- Внутри батча — дедуп по PK, оставляем последнюю версию.
- Для каждой записи с
op != 'd'пишем positional/data row. - Для каждой обновлённой/удалённой записи дополнительно пишем equality-delete row (по PK).
- Коммитим оба набора файлов в одном Iceberg-снапшоте.
Что получаем:
| id | name | __op | __deleted |
|---|---|---|---|
| 1 | Alice Z. | d | true |
| 2 | Bob | c | false |
При чтении движок (Trino/Spark/DuckDB) применяет equality-deletes к data-files и отдаёт latest-state. Если выставить upsert-keep-deletes=false, удалённые строки физически выпадают из таблицы.
Требования upsert mode
- В исходной таблице должен быть PRIMARY KEY. Без PK consumer молча падает в append-режим.
- Производительность чтения деградирует при большом количестве delete-files. Нужно периодически запускать
rewrite_data_files/rewrite_position_delete_filesпроцедуры компактирования. - Upsert использует Iceberg format version 2. Format version 1 не поддерживает row-level deletes.
Минимальный application.properties
# ============================================================================
# Debezium Source: PostgreSQL
# ============================================================================
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/data/offsets.dat
debezium.source.offset.flush.interval.ms=5000
debezium.source.database.hostname=postgres.internal
debezium.source.database.port=5432
debezium.source.database.user=debezium
debezium.source.database.password=${PG_PASSWORD}
debezium.source.database.dbname=production
debezium.source.topic.prefix=lakehouse
debezium.source.plugin.name=pgoutput
debezium.source.publication.name=debezium_publication
debezium.source.slot.name=debezium_slot
debezium.source.table.include.list=public.orders,public.customers
# CRITICAL: schemas нужны для автосоздания Iceberg-таблиц
debezium.format.value=json
debezium.format.key=json
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
# ============================================================================
# Sink: Apache Iceberg
# ============================================================================
debezium.sink.type=iceberg
# Поведение записи
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.allow-field-addition=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.format-version=2
# Каталог: REST (Polaris/Lakekeeper)
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.rest.RESTCatalog
debezium.sink.iceberg.uri=https://polaris.internal/api/catalog
debezium.sink.iceberg.warehouse=s3://lakehouse-prod/warehouse
debezium.sink.iceberg.credential=${POLARIS_CLIENT_ID}:${POLARIS_CLIENT_SECRET}
# S3 / object store
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=https://s3.us-east-1.amazonaws.com
debezium.sink.iceberg.s3.path-style-access=false
# Батчинг и тюнинг
debezium.sink.batch.batch-size-wait=NoBatchSizeWait
debezium.source.max.batch.size=2048
debezium.source.max.queue.size=8192
debezium.source.poll.interval.ms=10000
# Heartbeat — чтобы WAL не пух на тихих таблицах
debezium.source.heartbeat.interval.ms=10000
Что важно подсветить:
schemas.enable=true— без этого consumer не сможет автоматически создать Iceberg-таблицы при первом батче.format-version=2— обязательно для upsert-режима (нужны equality deletes).allow-field-addition=true— schema expansion. Когда в Postgres делаютALTER TABLE ADD COLUMN, Iceberg-таблица автоматически расширяется. Несовместимые изменения типов (например,int→string) не поддерживаются — это придётся делать вручную через миграцию таблицы.batch-size-wait=NoBatchSizeWait— коммитим как только пришёл батч событий. Альтернатива —MaxBatchSizeWaitс явнымmax-batch-size/max-wait-msдля группировки в более крупные снапшоты (меньше small-files, выше latency).
Type mapping: Postgres → Iceberg
Маппинг типов делается через Debezium-schema (semantic types) и стандартный Iceberg type system.
| PostgreSQL | Debezium semantic | Iceberg |
|---|---|---|
int2, int4 | INT32 | int |
int8, bigint | INT64 | long |
numeric(p,s), decimal | Decimal | decimal(p,s) |
real, float4 | FLOAT32 | float |
double precision | FLOAT64 | double |
text, varchar, char | STRING | string |
boolean | BOOLEAN | boolean |
bytea | BYTES | binary |
uuid | io.debezium.data.Uuid | string (или uuid) |
date | io.debezium.time.Date | date |
time | MicroTime | time |
timestamp | MicroTimestamp | timestamp (no-tz) |
timestamptz | ZonedTimestamp | timestamptz |
json, jsonb | STRING (JSON-encoded) | string |
array | ARRAY | list<...> |
Temporal types — известное место граблей
Старые версии debezium-server-iceberg мапили timestamp в long (миллисекунды от epoch). С недавних релизов используется правильный Iceberg timestamp тип на основе Debezium semantic types. Перед миграцией проверьте: для существующих таблиц поменять тип колонки нельзя — нужна пересборка таблицы.
Partition spec
По умолчанию Iceberg-таблица создаётся без партиционирования — все файлы складываются в одну директорию. Для production-таблиц нужно явно задать partition spec через init-скрипт или через шаблонные параметры:
debezium.sink.iceberg.table-default.partition-by=__source_ts_ms_day
или вручную через каталог:
ALTER TABLE lakehouse.public.orders
ADD PARTITION FIELD days(__source_ts_ms);
Iceberg поддерживает hidden partitioning — запросы по __source_ts_ms автоматически используют partition pruning без необходимости явного WHERE day = ....
Catalog integration
REST Catalog (Polaris / Lakekeeper)
REST catalog — рекомендованный способ на 2026. Это open Iceberg REST API, реализованный множеством каталогов: Apache Polaris (бывший Snowflake Open Catalog), Lakekeeper, Nessie (REST mode), Tabular (acquired by Databricks).
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.rest.RESTCatalog
debezium.sink.iceberg.uri=https://polaris.internal/api/catalog
debezium.sink.iceberg.warehouse=lakehouse-prod
debezium.sink.iceberg.credential=client-id:client-secret
debezium.sink.iceberg.token=${OAUTH_TOKEN}
Плюсы: vendor-neutral, multi-engine (Spark, Trino, Flink, DuckDB читают одну и ту же таблицу), централизованный RBAC.
AWS Glue
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
debezium.sink.iceberg.warehouse=s3://lakehouse-prod/warehouse
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
Подходит для AWS-only стека. Glue catalog хорошо интегрируется с Athena/EMR/Redshift Spectrum.
Hadoop / Hive
Для on-prem или legacy-сред. Hadoop catalog хранит метаданные прямо на FS (без отдельного service), Hive — в Hive Metastore.
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog
debezium.sink.iceberg.warehouse=s3://lakehouse-prod/warehouse
Сравнение подходов: куда поставить CDC → Iceberg
| Аспект | Debezium Server + Iceberg sink | Debezium + Kafka + Iceberg sink connector | ClickPipes / Conduit / Fivetran |
|---|---|---|---|
| Компоненты | 1 JVM + catalog + storage | Kafka cluster + Connect + sink connector | Managed SaaS |
| Throughput | до десятков тысяч events/s | сотни тысяч events/s, горизонтально | зависит от тарифа |
| HA | single-writer (active-passive) | HA через Connect cluster | managed |
| Replay | нет встроенного (offset-only) | да (Kafka log retention) | depends |
| Fan-out | один sink (только Iceberg) | многосайнк (Kafka topic → N consumers) | обычно один sink |
| Latency | секунды (на batch commit) | sub-second в Kafka, секунды до Iceberg | минуты (batch SaaS) |
| Стоимость TCO | низкая | высокая | высокая (per-row pricing) |
| Когда выбрать | small-medium pipelines, Lakehouse-only consumer, минимум инфры | многие consumers, replay, высокий throughput | быстрый старт без инженеров |
Conduit и альтернативы
Conduit — Go-based runtime для CDC/connectors с поддержкой Iceberg. Архитектурно похож на Debezium Server, но без JVM-overhead. ClickPipes, Fivetran HVR — managed-варианты. Для open-source self-hosted сетапа debezium-server-iceberg — самый зрелый вариант на 2026.
Ограничения и подводные камни
1. Single-writer по таблице
В Iceberg несколько одновременных писателей в одну таблицу приводят к конфликтам коммитов (HTTP 409 от каталога) и retry-storm. debezium-server-iceberg запускается как один процесс — это и плюс (нет coordination), и ограничение.
HA-стратегия: active-passive с Redis offset-storage. Резервная реплика стоит холодной, при падении primary она поднимается, читает offset из Redis и продолжает. Даунтайм — секунды-минуты, но не нулевой.
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
debezium.source.offset.storage.redis.address=redis-master:6379
2. Нет retroactive replay
Kafka даёт log retention — можно прокрутить топик с любого offset. В Iceberg-pipeline такой возможности нет: если sink упал между snapshot и streaming-фазой и offset потерялся, единственный путь — re-snapshot всей таблицы (или конкретных таблиц) через signal.data.collections.
3. Concurrency внутри процесса
IcebergChangeConsumer пишет в Iceberg синхронно из одного потока (на батч). Параллелить запись по таблицам можно через несколько процессов с непересекающимися table.include.list, но тогда теряется атомарность кросс-табличных транзакций.
4. Small files problem
Каждый батч → новый снапшот → новые файлы. На low-traffic таблицах это создаёт большое количество мелких файлов, что убивает производительность чтения. Обязательно настроить:
debezium.sink.batch.batch-size-wait=MaxBatchSizeWaitс разумнымmax-wait-ms(например, 60s) — батчуем подольше, коммитим реже.- Регулярный compaction через
CALL system.rewrite_data_files(table => '...')(Spark) или внешний джоб (Iceberg Maintenance Service / Lakekeeper auto-compact). expire_snapshotsдля очистки старых метаданных.
5. Schema evolution
Поддерживается только расширение (add column, widen type). Несовместимые изменения (drop column, narrow type, rename) делаются вручную через каталог. Если Postgres-таблица меняется несовместимо, consumer бросит ошибку и остановится.
Production tips
Чек-лист перед запуском в прод
- Offset storage — file storage только для dev. В прод — Redis с TTL=0 и persistence (AOF/RDB).
- Heartbeat —
heartbeat.interval.ms=10000обязательно, иначе WAL переполнится на тихих таблицах. - Batch sizing — баланс между latency и количеством small-files. Стартовая точка:
MaxBatchSizeWait,max-batch-size=10000,max-wait-ms=60000. - Format version 2 — без этого нет upsert.
- Compaction job — отдельный Spark/Trino-джоб раз в час:
rewrite_data_files,rewrite_position_delete_files,expire_snapshots. - Monitoring — экспортируйте JMX метрики Debezium (
MilliSecondsBehindSource,NumberOfCommittedTransactions,QueueTotalCapacity) в Prometheus. - Schema registry на стороне источника — не используйте
format.value=jsonбез schemas.enable, иначе автосоздание таблиц не сработает. - PG replication slot lag alert — алерт на
pg_replication_slots.confirmed_flush_lsnотстающий отpg_current_wal_lsnбольше чем на 1 GB. - Disaster recovery plan — что делаем если offset потерялся:
signal.data.collections=public.*для re-snapshot, готовый runbook. - Test schema evolution — прогоните
ALTER TABLE ADD COLUMNна staging перед прод-релизом, убедитесь что Iceberg-таблица расширилась корректно.
Exactly-once: что реально гарантируется
В строгом смысле — нет exactly-once. Гарантируется at-least-once + idempotent commit:
- Debezium читает WAL и коммитит offset после успешного Iceberg-commit.
- Если процесс падает между Iceberg-commit и offset-flush, при рестарте те же события приедут второй раз.
- В upsert mode дублирующий батч идемпотентен: equality-deletes по PK перепишут те же строки тем же значением. Snapshot будет отличаться (появится лишний снапшот), но данные — нет.
- В append mode дубли реальны — будет физическое удвоение строк. Дедуп придётся делать на стороне читателя через
ROW_NUMBER() OVER (PARTITION BY pk ORDER BY __source_ts_ms DESC, __ts_ms DESC).
Append mode и дубли
Если выбираете append mode для аудита, всегда оставляйте __source_ts_ms и __lsn в схеме. Это единственный способ дедуплицировать на read-time после рестартов.
KnowledgeCheck
Что мы узнали
- Debezium Server — это standalone Quarkus-runtime для Debezium Engine с набором sink-адаптеров: Pub/Sub, Kinesis, Pulsar, EventHubs, NATS, RabbitMQ, HTTP, Redis, Iceberg.
- memiiso/debezium-server-iceberg — community-расширение, реализующее Iceberg sink. Стрим Postgres/MySQL → Iceberg в одном JVM, без Kafka и без Spark.
- Append vs upsert — append даёт полный change-log (хорошо для аудита), upsert — latest-state по PK через V2 equality deletes (хорошо для replica таблиц).
- Catalog-agnostic — REST (Polaris, Lakekeeper, Nessie), Glue, Hive, Hadoop. Object store — S3, GCS, Azure, FS.
- Schema evolution — поддерживается только расширение (add column, widen type), несовместимые изменения требуют ручной миграции.
- Limitations — single-writer (active-passive HA через Redis offset), нет replay (offset-only recovery), small-files problem требует регулярного compaction.
- Production checklist — Redis offset, heartbeat, batch tuning, V2 format, compaction job, JMX-метрики, replication slot lag alerts.
Что дальше
В следующем уроке мы посмотрим итоги модуля 7 и перейдём к модулю 8 — там будет рассмотрена эксплуатация production CDC: alerting на репликационный lag, recovery procedures и сравнение с альтернативными CDC-стеками (OLake, Conduit, ClickPipes).