Learning Platform
Глоссарий Troubleshooting
Урок 15.02 · 22 мин
Средний
Flink CDCMySQLBinlogGTIDserver-idSnapshotPipeline YAML

MySQL CDC pipeline

MySQL — самый частый источник для CDC. В этом уроке настроим production-готовый MySQL CDC pipeline через Flink CDC: разберём server-id, snapshot стратегии, разные форматы binlog, gtid configuration, и YAML pipeline format Flink CDC 3.0+.


Подготовка MySQL

MySQL должен быть настроен для binlog replication. Без этого Flink CDC не сможет читать changes.

MySQL Binary Log: архитектура и форматы GTID Mode: глобальные идентификаторы транзакций
# /etc/mysql/my.cnf
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
gtid_mode = ON
enforce_gtid_consistency = ON

Что это значит:

  • server-id — уникальный ID MySQL instance. На master = 1, на каждом slave/Flink CDC consumer должен быть уникальный.
  • log_bin = mysql-bin — включает binlog с prefix mysql-bin.
  • binlog_format = ROW — пишет полные row changes (а не SQL statements). Обязательно для CDC. STATEMENT и MIXED не подходят.
  • binlog_row_image = FULL — пишет полные before/after образы row. Без этого Flink CDC не сможет правильно сформировать -U/+U пары для updates.
  • expire_logs_days = 7 — сколько хранить binlog. На production обычно 7-30 дней; CDC consumer должен успеть прочитать.
  • gtid_mode = ON — включает Global Transaction Identifiers. Это упрощает recovery и позволяет cross-server replication.
-- Создать пользователя для Flink CDC
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'strongpassword';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;

Минимальный набор привилегий: SELECT для snapshot, REPLICATION SLAVE и REPLICATION CLIENT для binlog reader, RELOAD для flush logs.


CREATE TABLE orders_cdc (
  order_id BIGINT,
  customer_id BIGINT,
  amount DECIMAL(10, 2),
  status STRING,
  created_at TIMESTAMP(3),
  updated_at TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql.internal',
  'port' = '3306',
  'username' = 'flink_cdc',
  'password' = '...',
  'database-name' = 'shop',
  'table-name' = 'orders',
  'server-id' = '5400-5404',
  'scan.startup.mode' = 'initial',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.incremental.snapshot.chunk.size' = '8096',
  'connect.timeout' = '30s',
  'heartbeat.interval' = '30s'
);

Разберём ключевые параметры.


server-id

server-id в Flink CDC — это range, не одно число (например, 5400-5404). Для каждого parallel snapshot reader Flink выделяет один ID из диапазона.

При parallelism=4 нужно 4 server-id (для snapshot phase) + 1 (для binlog reader) = 5. Range 5400-5404 даёт 5 IDs.

parallelism=4 -> нужен range размером >= 5 (4 snapshot + 1 binlog)
parallelism=8 -> range >= 9
parallelism=N -> range >= N+1
WARNING

Никогда не делите server-id range между двумя Flink jobs или с реальным MySQL slave. Каждый ID должен быть уникален в кластере MySQL. Collision приводит к “A slave with the same server_uuid/server_id has already connected” — оба коннекта дисконнект’ятся. На больших кластерах с множеством Flink jobs координируйте диапазоны через документацию или central config (например, jobs 1-100 в range 5400-5499, jobs 101-200 в range 5500-5599).


scan.startup.mode

С какого момента начинать CDC чтение:

  • initial (default) — сначала snapshot всех существующих rows, потом switch на binlog. Полная история данных в downstream. Это что вам обычно нужно.
  • latest-offset — пропускает snapshot, начинает читать только новые binlog events. Полезно, если вы строите incremental sync и initial state уже есть в downstream откуда-то ещё.
  • earliest-offset — читает binlog с самого начала. Опасно: если binlog старый, можно пропустить уже rotated logs.
  • specific-offset — с конкретной binlog position (file + position или gtid set).
  • timestamp — с конкретного timestamp в binlog.

Чаще всего — initial для нового pipeline или latest-offset для resume после сторонней initial sync.


scan.incremental.snapshot.enabled

trueincremental snapshot algorithm (введён в Flink CDC 2.0). Параллельный, resumable, lockless.

false — legacy snapshot algorithm. Требует FLUSH TABLES WITH READ LOCK для consistent point-in-time view, что блокирует writes на доли секунды. Не параллелен.

Practically always используйте true. Legacy режим — только для edge-cases или для совместимости со старыми pipeline.

'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096',
'scan.incremental.snapshot.chunk.key-column' = 'order_id',  -- какая колонка chunking

chunk.size — сколько rows в одном chunk. Меньше = больше параллельных тасков, но больше overhead на coordination. Дефолт 8096 хорош для большинства случаев.


Flink CDC 3.0 ввёл декларативный YAML формат для CDC-pipelines. Никакого Java/Scala, никакого SQL DDL — целостный pipeline в одном файле.

# shop-cdc-pipeline.yaml
source:
  type: mysql
  name: shop-mysql-source
  hostname: mysql.internal
  port: 3306
  username: flink_cdc
  password: ${MYSQL_PASSWORD}
  tables: shop.orders, shop.products, shop.customers
  server-id: 5400-5404
  server-time-zone: UTC
  scan.startup.mode: initial
  scan.incremental.snapshot.enabled: true
  scan.incremental.snapshot.chunk.size: 8192

sink:
  type: paimon
  name: lakehouse-sink
  catalog.warehouse: s3://lake/paimon/
  catalog.metastore: hive
  catalog.uri: thrift://hive-metastore:9083

route:
  - source-table: shop.orders
    sink-table: lakehouse.shop_orders
  - source-table: shop.products
    sink-table: lakehouse.shop_products
  - source-table: shop.customers
    sink-table: lakehouse.shop_customers

transform:
  - source-table: shop.orders
    projection: order_id, customer_id, amount, status, updated_at
    filter: status NOT IN ('cancelled', 'draft')

pipeline:
  name: shop-to-lakehouse
  parallelism: 4
  schema.change.behavior: evolve

Запуск:

bin/flink-cdc.sh shop-cdc-pipeline.yaml \
  --flink-home /opt/flink \
  --jar flink-cdc-pipeline-connector-paimon-3.0.0.jar

Что это даёт:

  • Multi-table в одном pipeline — все три таблицы (orders, products, customers) синхронизируются одним job.
  • Schema evolutionschema.change.behavior: evolve означает что DDL изменения в MySQL автоматически применяются в Paimon.
  • Routes и transforms — куда какая таблица идёт и какие преобразования применяются перед записью.
  • Декларативность — DDL и preferences вынесены в YAML, можно version control как код.

Архитектура CDC-pipeline

MySQL CDC pipeline architecture

MySQL primary

MySQL primary. binlog включён, gtid режим. Flink CDC подключается как MySQL slave.
binlog + initial state

Snapshot tasks (N parallel)

Snapshot phase: N parallel subtasks читают chunks таблицы через SELECT с PK range. Каждый chunk emit'тится в downstream как +I events.

Binlog reader (1 subtask)

Binlog phase: один subtask читает binlog позицию начиная с timestamp/gtid из snapshot. Emit insert/update/delete как +I/-U/+U/-D events.

Phase switch

Switch point: после завершения всех snapshot chunks, source emit special event 'snapshot complete' и переходит на binlog phase. Downstream видит continuous changelog без разрывов.
changelog stream

Flink downstream

Downstream Flink pipeline (transforms, route, sink в lakehouse). Видит unified changelog без понимания, snapshot это или binlog.

GTID vs binlog file+position

Старые MySQL replication использовали (binlog_file_name, position) для идентификации позиции. Это работает, но fragile: при master switch (failover) file_name меняется, и slave не знает, какой position соответствует.

GTID (Global Transaction Identifier) — современный способ. Каждая транзакция получает уникальный GTID типа 3E11FA47-71CA-11E1-9E33-C80AA9429562:23. При failover GTID-set unchanged — slave продолжает с того же транзакционного состояния, независимо от файла.

Flink CDC поддерживает оба, но рекомендован GTID:

WITH (
  'connector' = 'mysql-cdc',
  ...
  -- При scan.startup.mode = 'specific-offset' с GTID:
  'scan.startup.specific-offset.gtid-set' = '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-100'
);

Включить gtid в MySQL:

gtid_mode = ON
enforce_gtid_consistency = ON

При recovery Flink CDC сохраняет в state GTID-set, не file+position. Это makes failover MySQL transparent для CDC pipeline.


Production gotchas

Long-running transactions блокируют snapshot. Snapshot читает с consistent point — если в MySQL долгая транзакция (например, 30-минутный analytics-query), snapshot ждёт её завершения. На production OLTP это редко, но возможно. Мониторьте information_schema.innodb_trx на источнике.

Binlog retention. Если CDC job упал и не работал 8 дней, а binlog retention 7 дней — нужные events потеряны. Recovery невозможен из binlog, нужен full re-snapshot. Мониторьте Flink CDC lag и retention.

Schema changes mid-flight. Если в MySQL делается ALTER TABLE пока CDC активен — Flink CDC должен обработать DDL. С schema.change.behavior: evolve (Flink CDC 3.0+) применит DDL в sink. С lenient — игнорирует. С exception — упадёт. Выбирайте по требованиям.

Tabular character set. Если MySQL таблица в latin1, а Flink CDC ожидает utf8 — данные побьются. Убедитесь что binlog читается в правильной кодировке (server-time-zone, decoding.charset).


Попробуй сам

  1. Настрой Flink CDC на тестовый MySQL с двумя таблицами. Запусти, проверь, что snapshot и binlog работают. Что произойдёт, если выключить gtid_mode в середине?
  2. Запусти два Flink CDC job на один MySQL с overlapping server-id range. Что увидите в логах MySQL?
  3. Симулируй MySQL master failover. Как ведёт себя Flink CDC с file+position vs gtid?
Проверка знанийKnowledge check
Команда задеплоила Flink CDC pipeline для MySQL с server-id='1234' (одно число), parallelism=4. После старта job логи MySQL показывают: "A slave with the same server_uuid/server_id has already connected". В чём проблема?
ОтветAnswer
Проблема в том, что server-id указан как одно число '1234', а не как range. Flink CDC при parallelism=4 пытается создать 4 параллельных snapshot reader + 1 binlog reader, каждый из которых должен иметь уникальный server-id. Все они пытаются использовать server-id=1234, что вызывает collision в MySQL: MySQL фиксирует первый подключившийся subtask с 1234, остальные дисконнектятся с ошибкой. Решение: указать server-id как range, например '1234-1238' (5 IDs для 4 параллельных + 1 binlog). Flink CDC автоматически распределит уникальные ID из диапазона. Альтернативно — использовать parallelism=1 (одна subtask использует одно ID), но тогда теряется параллельный snapshot.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда установила в Flink CDC source 'server-id'='1234' (одно число), parallelism=4. После запуска MySQL логи показывают: 'A slave with the same server_uuid/server_id has already connected'. Что не так?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4