MySQL CDC pipeline
MySQL — самый частый источник для CDC. В этом уроке настроим production-готовый MySQL CDC pipeline через Flink CDC: разберём server-id, snapshot стратегии, разные форматы binlog, gtid configuration, и YAML pipeline format Flink CDC 3.0+.
Подготовка MySQL
MySQL должен быть настроен для binlog replication. Без этого Flink CDC не сможет читать changes.
MySQL Binary Log: архитектура и форматы GTID Mode: глобальные идентификаторы транзакций# /etc/mysql/my.cnf
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
gtid_mode = ON
enforce_gtid_consistency = ON
Что это значит:
- server-id — уникальный ID MySQL instance. На master = 1, на каждом slave/Flink CDC consumer должен быть уникальный.
- log_bin = mysql-bin — включает binlog с prefix
mysql-bin. - binlog_format = ROW — пишет полные row changes (а не SQL statements). Обязательно для CDC. STATEMENT и MIXED не подходят.
- binlog_row_image = FULL — пишет полные before/after образы row. Без этого Flink CDC не сможет правильно сформировать
-U/+Uпары для updates. - expire_logs_days = 7 — сколько хранить binlog. На production обычно 7-30 дней; CDC consumer должен успеть прочитать.
- gtid_mode = ON — включает Global Transaction Identifiers. Это упрощает recovery и позволяет cross-server replication.
-- Создать пользователя для Flink CDC
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'strongpassword';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;
Минимальный набор привилегий: SELECT для snapshot, REPLICATION SLAVE и REPLICATION CLIENT для binlog reader, RELOAD для flush logs.
DDL Flink CDC source
CREATE TABLE orders_cdc (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql.internal',
'port' = '3306',
'username' = 'flink_cdc',
'password' = '...',
'database-name' = 'shop',
'table-name' = 'orders',
'server-id' = '5400-5404',
'scan.startup.mode' = 'initial',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096',
'connect.timeout' = '30s',
'heartbeat.interval' = '30s'
);
Разберём ключевые параметры.
server-id
server-id в Flink CDC — это range, не одно число (например, 5400-5404). Для каждого parallel snapshot reader Flink выделяет один ID из диапазона.
При parallelism=4 нужно 4 server-id (для snapshot phase) + 1 (для binlog reader) = 5. Range 5400-5404 даёт 5 IDs.
parallelism=4 -> нужен range размером >= 5 (4 snapshot + 1 binlog)
parallelism=8 -> range >= 9
parallelism=N -> range >= N+1
Никогда не делите server-id range между двумя Flink jobs или с реальным MySQL slave. Каждый ID должен быть уникален в кластере MySQL. Collision приводит к “A slave with the same server_uuid/server_id has already connected” — оба коннекта дисконнект’ятся. На больших кластерах с множеством Flink jobs координируйте диапазоны через документацию или central config (например, jobs 1-100 в range 5400-5499, jobs 101-200 в range 5500-5599).
scan.startup.mode
С какого момента начинать CDC чтение:
initial(default) — сначала snapshot всех существующих rows, потом switch на binlog. Полная история данных в downstream. Это что вам обычно нужно.latest-offset— пропускает snapshot, начинает читать только новые binlog events. Полезно, если вы строите incremental sync и initial state уже есть в downstream откуда-то ещё.earliest-offset— читает binlog с самого начала. Опасно: если binlog старый, можно пропустить уже rotated logs.specific-offset— с конкретной binlog position (file + position или gtid set).timestamp— с конкретного timestamp в binlog.
Чаще всего — initial для нового pipeline или latest-offset для resume после сторонней initial sync.
scan.incremental.snapshot.enabled
true — incremental snapshot algorithm (введён в Flink CDC 2.0). Параллельный, resumable, lockless.
false — legacy snapshot algorithm. Требует FLUSH TABLES WITH READ LOCK для consistent point-in-time view, что блокирует writes на доли секунды. Не параллелен.
Practically always используйте true. Legacy режим — только для edge-cases или для совместимости со старыми pipeline.
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096',
'scan.incremental.snapshot.chunk.key-column' = 'order_id', -- какая колонка chunking
chunk.size — сколько rows в одном chunk. Меньше = больше параллельных тасков, но больше overhead на coordination. Дефолт 8096 хорош для большинства случаев.
Pipeline YAML format (Flink CDC 3.0+)
Flink CDC 3.0 ввёл декларативный YAML формат для CDC-pipelines. Никакого Java/Scala, никакого SQL DDL — целостный pipeline в одном файле.
# shop-cdc-pipeline.yaml
source:
type: mysql
name: shop-mysql-source
hostname: mysql.internal
port: 3306
username: flink_cdc
password: ${MYSQL_PASSWORD}
tables: shop.orders, shop.products, shop.customers
server-id: 5400-5404
server-time-zone: UTC
scan.startup.mode: initial
scan.incremental.snapshot.enabled: true
scan.incremental.snapshot.chunk.size: 8192
sink:
type: paimon
name: lakehouse-sink
catalog.warehouse: s3://lake/paimon/
catalog.metastore: hive
catalog.uri: thrift://hive-metastore:9083
route:
- source-table: shop.orders
sink-table: lakehouse.shop_orders
- source-table: shop.products
sink-table: lakehouse.shop_products
- source-table: shop.customers
sink-table: lakehouse.shop_customers
transform:
- source-table: shop.orders
projection: order_id, customer_id, amount, status, updated_at
filter: status NOT IN ('cancelled', 'draft')
pipeline:
name: shop-to-lakehouse
parallelism: 4
schema.change.behavior: evolve
Запуск:
bin/flink-cdc.sh shop-cdc-pipeline.yaml \
--flink-home /opt/flink \
--jar flink-cdc-pipeline-connector-paimon-3.0.0.jar
Что это даёт:
- Multi-table в одном pipeline — все три таблицы (orders, products, customers) синхронизируются одним job.
- Schema evolution —
schema.change.behavior: evolveозначает что DDL изменения в MySQL автоматически применяются в Paimon. - Routes и transforms — куда какая таблица идёт и какие преобразования применяются перед записью.
- Декларативность — DDL и preferences вынесены в YAML, можно version control как код.
Архитектура CDC-pipeline
MySQL primary
MySQL primary. binlog включён, gtid режим. Flink CDC подключается как MySQL slave.Snapshot tasks (N parallel)
Snapshot phase: N parallel subtasks читают chunks таблицы через SELECT с PK range. Каждый chunk emit'тится в downstream как +I events.Binlog reader (1 subtask)
Binlog phase: один subtask читает binlog позицию начиная с timestamp/gtid из snapshot. Emit insert/update/delete как +I/-U/+U/-D events.Phase switch
Switch point: после завершения всех snapshot chunks, source emit special event 'snapshot complete' и переходит на binlog phase. Downstream видит continuous changelog без разрывов.Flink downstream
Downstream Flink pipeline (transforms, route, sink в lakehouse). Видит unified changelog без понимания, snapshot это или binlog.GTID vs binlog file+position
Старые MySQL replication использовали (binlog_file_name, position) для идентификации позиции. Это работает, но fragile: при master switch (failover) file_name меняется, и slave не знает, какой position соответствует.
GTID (Global Transaction Identifier) — современный способ. Каждая транзакция получает уникальный GTID типа 3E11FA47-71CA-11E1-9E33-C80AA9429562:23. При failover GTID-set unchanged — slave продолжает с того же транзакционного состояния, независимо от файла.
Flink CDC поддерживает оба, но рекомендован GTID:
WITH (
'connector' = 'mysql-cdc',
...
-- При scan.startup.mode = 'specific-offset' с GTID:
'scan.startup.specific-offset.gtid-set' = '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-100'
);
Включить gtid в MySQL:
gtid_mode = ON
enforce_gtid_consistency = ON
При recovery Flink CDC сохраняет в state GTID-set, не file+position. Это makes failover MySQL transparent для CDC pipeline.
Production gotchas
Long-running transactions блокируют snapshot. Snapshot читает с consistent point — если в MySQL долгая транзакция (например, 30-минутный analytics-query), snapshot ждёт её завершения. На production OLTP это редко, но возможно. Мониторьте information_schema.innodb_trx на источнике.
Binlog retention. Если CDC job упал и не работал 8 дней, а binlog retention 7 дней — нужные events потеряны. Recovery невозможен из binlog, нужен full re-snapshot. Мониторьте Flink CDC lag и retention.
Schema changes mid-flight. Если в MySQL делается ALTER TABLE пока CDC активен — Flink CDC должен обработать DDL. С schema.change.behavior: evolve (Flink CDC 3.0+) применит DDL в sink. С lenient — игнорирует. С exception — упадёт. Выбирайте по требованиям.
Tabular character set. Если MySQL таблица в latin1, а Flink CDC ожидает utf8 — данные побьются. Убедитесь что binlog читается в правильной кодировке (server-time-zone, decoding.charset).
Попробуй сам
- Настрой Flink CDC на тестовый MySQL с двумя таблицами. Запусти, проверь, что snapshot и binlog работают. Что произойдёт, если выключить gtid_mode в середине?
- Запусти два Flink CDC job на один MySQL с overlapping server-id range. Что увидите в логах MySQL?
- Симулируй MySQL master failover. Как ведёт себя Flink CDC с file+position vs gtid?