Sinks: Paimon, Kafka, StarRocks, Doris
Flink CDC pipeline без sink — это бессмысленно. От выбора sink-коннектора зависит, какой формат данных получает downstream, насколько хорошо работает schema evolution, и какие гарантии delivery.
В этом уроке разберём четыре популярных sink: Paimon (lakehouse, наиболее частый для CDC), Kafka (для event-bus), StarRocks и Doris (МPP-аналитика). Для каждого — особенности конфигурации, schema evolution behavior, типичные use cases.
Paimon sink
Paimon — самый популярный sink для Flink CDC. Причины: первоклассная streaming-поддержка, embedded compaction, нативная схема evolution, low-latency upsert.
Paimon LSM internals: как работает upsert и compactionsink:
type: paimon
name: paimon-lakehouse
catalog.warehouse: s3://lake/paimon/
catalog.metastore: hive
catalog.uri: thrift://hive-metastore:9083
# Per-table options:
table.options:
bucket: '8'
changelog-producer: 'lookup'
merge-engine: 'deduplicate'
snapshot.time-retained: '7 d'
compaction.min.file-num: '5'
Что важно настроить:
- bucket — количество шардов внутри Paimon-таблицы по hash(PK). Влияет на параллелизм read и write. Дефолт 1 — слишком мало для большинства случаев. Рекомендуется bucket = parallelism пайплайна.
- merge-engine — что делать при коллизии PK:
deduplicate(default) — последний по timestamp выигрывает.partial-update— несколько streams обновляют разные колонки одного row.aggregation— агрегация при merge (см. урок про lakehouse).first-row— оставлять только первое значение.
- changelog-producer — как Paimon формирует CDC-changelog для downstream readers:
none— без changelog, просто snapshot reads.input— passthrough input changelog (быстро, но требует чтобы upstream уже даёт valid CDC).lookup— Paimon делает lookup в historical files и формирует proper changelog. Slower, но надёжно.full-compaction— формирует changelog при full compaction.
Schema evolution в Paimon
Paimon отлично поддерживает schema evolution:
- Add column — мгновенно. Старые snapshots остаются valid (отсутствующая колонка = NULL).
- Drop column — мгновенно (колонка маркируется removed; physically остаётся в Parquet файлах до compaction).
- Rename column — поддерживается через alter.
- Change column type — ограниченная поддержка (например, int -> long, string размер).
С schema.change.behavior: evolve Flink CDC применяет MySQL ALTER TABLE автоматически.
Если Paimon-таблица используется одновременно несколькими readers (Spark batch + Flink streaming), убедитесь, что snapshot.time-retained достаточно большой для batch reader. Snapshot, который вы пытаетесь прочитать через Spark, может быть expired Paimon-cleanup, что вернёт ошибку. Безопасно ставить 7-14 дней.
Kafka sink (CDC -> Kafka)
Иногда Flink CDC используется для формирования CDC stream, а downstream consumer’ы — это другие Flink/Spark jobs или сервисы, читающие Kafka. То есть pipeline превращается в “Debezium на стероидах”: MySQL -> Flink CDC -> Kafka.
sink:
type: kafka
name: cdc-events-kafka
properties.bootstrap.servers: kafka-1:9092,kafka-2:9092
topic: cdc.crm.\0 # \0 = source-table name
format: debezium-json # совместимо с Debezium consumer формат
partition.strategy: hash # партиционирование по PK
Параметры:
- topic — имя топика. С
\0placeholder — auto-формирование per source-table (cdc.crm.customers,cdc.crm.orders). - format — формат событий.
debezium-json— совместим с classic Debezium-consumers (тот же envelope сbefore/after/op).canal-json,maxwell-json— другие де-факто стандарты CDC. - partition.strategy —
hashпо PK гарантирует, что все updates одного row идут в одну партицию (важно для downstream ordering).
Use cases для Kafka sink:
- Migration from Debezium: legacy consumers ожидают debezium-json формат в Kafka — Flink CDC может drop-in replace Debezium с теми же downstream.
- Multi-consumer fanout: один CDC source, много downstream — Kafka буфер обязателен (см. урок 1 про architecture).
- Event-driven microservices: сервисы подписаны на Kafka, реагируют на изменения в БД через events.
StarRocks sink
StarRocks — open-source MPP-OLAP БД, форк (и продвижение) Doris с улучшениями. Используется для real-time analytics: дашборды, BI, ad-hoc queries с миллисекундной latency.
sink:
type: starrocks
name: starrocks-analytics
jdbc-url: jdbc:mysql://starrocks-fe:9030
load-url: starrocks-fe:8030
username: flink_writer
password: ${SR_PASSWORD}
database-name: analytics
table.options:
table-model: 'primary_key' # vs duplicate_key, aggregate_key, unique_key
enable-data-stream-load: 'true'
sink.label-prefix: 'flink-cdc-'
StarRocks table models:
- Duplicate Key — append-only, нет deduplication. Для logs, immutable events.
- Unique Key — по PK, last-version-wins. Похоже на Paimon deduplicate merge.
- Aggregate Key — pre-aggregation на write (sum, max, min, replace).
- Primary Key — поддерживает UPDATE/DELETE по PK. Использовать для CDC sink.
Для CDC всегда используйте Primary Key model. Другие либо не поддерживают updates, либо плохо работают с ними.
Schema evolution
StarRocks поддерживает Add column быстро. Drop column — труднее (требует “hidden” columns mechanism). Rename — частично. В general — schema.change.behavior: evolve работает для Add, но для destructive operations может потребоваться ручная интервенция.
Doris sink
Apache Doris — open-source MPP-OLAP БД, изначально разработана Baidu. Очень близка к StarRocks (был форком на ранних стадиях), сейчас оба развиваются параллельно. Сильное community в Китае.
sink:
type: doris
name: doris-analytics
fenodes: doris-fe:8030
jdbc-url: jdbc:mysql://doris-fe:9030
username: flink_writer
password: ${DORIS_PASSWORD}
database-name: analytics
table.options:
table-model: 'unique' # vs duplicate, aggregate
sink.label-prefix: 'flink-cdc-'
sink.enable-2pc: 'true' # exactly-once
Doris vs StarRocks:
| Feature | StarRocks | Doris |
|---|---|---|
| Lakehouse integration | первоклассная (Iceberg, Hudi, Paimon) | хорошая (Iceberg, Hudi) |
| Vectorization engine | C++ native | C++ native |
| Performance benchmarks | в среднем чуть выше | сравнимо |
| Community / governance | StarRocks Inc + Apache StarRocks (incubating) | Apache Doris (top-level) |
| Cloud-native | StarRocks Cloud | многочисленные облачные предложения |
Выбор между StarRocks и Doris — часто пресс-релизы и community preference, не технологические fundamentals. Оба отлично работают как CDC sink.
Сравнительная таблица
Multi-sink: один pipeline, несколько sinks
Иногда нужно одновременно писать в lakehouse (для долгосрочного хранения) и в OLAP (для real-time queries). Flink CDC pipeline framework поддерживает один sink, поэтому для multi-sink можно:
- Использовать обычный Flink job (не pipeline-YAML) с несколькими sink’ами через DataStream
addSinkили Table APIINSERT INTO ... INSERT INTO. - Сделать первый sink — Kafka, и поднять второй Flink job, читающий Kafka и пишущий в OLAP.
// Подход 1: multi-sink в одном DataStream job
DataStream<RowData> cdcStream = env.fromSource(mysqlCdcSource, ...);
cdcStream.sinkTo(paimonSink);
cdcStream.sinkTo(starrocksSink);
-- Подход 1, через SQL
INSERT INTO paimon.lake.orders SELECT * FROM mysql_orders_cdc;
INSERT INTO starrocks.analytics.orders SELECT * FROM mysql_orders_cdc;
-- (StatementSet для атомарности — но это два разных pipeline под капотом)
Multi-sink усложняет операции (отдельные failure modes, отдельные checkpoint impact). Для большинства случаев Kafka в середине проще.
Production gotchas по sink-у
Paimon: проверяйте snapshot.time-retained если есть batch readers. По умолчанию короткое retention может удалить snapshot, который spark-job ещё читает.
Kafka: размер CDC event может быть большим (full before/after row). Если в MySQL очень widе таблицы (200+ колонок, jsonb-поля) — Kafka message size может превысить max.message.bytes (1MB дефолт). Настройте message.max.bytes на брокере и max.request.size на producer.
StarRocks / Doris: load throughput имеет лимит на BE-ноду (по дефолту 100MB/s). На high-throughput pipelines (50K events/s × 1KB = 50MB/s) могут быть bottlenecks. Шардируйте по BE-нодам.
Schema evolution: даже sink с поддержкой schema evolution может иметь edge cases. Drop column в MySQL и одновременная запись новых rows может вызвать race condition. Делайте destructive DDL в low-traffic окно.
Попробуй сам
- Настрой Flink CDC pipeline MySQL -> Paimon с merge-engine=aggregation. Какие будут side-effects при PK collision?
- Подними Flink CDC pipeline MySQL -> Kafka в debezium-json формате. Подключи к этому Kafka topic ранее существовавший Debezium-consumer. Будет ли он работать?
- Сравни write throughput Paimon vs StarRocks vs Doris для одного CDC stream 10K events/s. Что повлияет на разницу?