Transforms и routes
Flink CDC pipeline framework (3.0+) даёт декларативный способ описать что делать с CDC-events без написания Java/Scala/SQL вне YAML. Два главных механизма — transforms (преобразование row-data: projection, filter, computed columns) и routes (mapping source-таблиц в sink-таблицы с возможным переименованием).
В этом уроке разберём оба, плюс schema evolution — как Flink CDC переносит изменения схемы из MySQL в lakehouse автоматически.
Transforms
Transform — это декларативная инструкция, как преобразовать CDC events перед записью в sink. Каждый transform применяется к конкретной source-таблице.
transform:
- source-table: shop.orders
projection: order_id, customer_id, amount, status, updated_at, \
amount * 1.2 AS amount_with_tax, \
UPPER(status) AS status_upper
filter: status NOT IN ('cancelled', 'draft') AND amount > 0
primary-keys: order_id
Что доступно:
- projection — список колонок и computed expressions. Поддерживает все Flink SQL scalar functions (UPPER, LOWER, CAST, DATE_FORMAT, REGEXP, и т.д.).
- filter — WHERE условие, фильтрует строки.
- primary-keys — переопределяет PK (на случай, если source PK не подходит для sink).
- partition-keys — задаёт partition keys для sink (если sink — partitioned table).
- table-options — sink-specific параметры (например, для Paimon — bucket count).
Computed columns могут использовать любые SQL-функции. Несколько примеров:
Routing и transforms в Debezium: Single Message Transformsprojection: |
user_id,
email,
REGEXP_EXTRACT(email, '@(.+)$', 1) AS email_domain,
DATE_FORMAT(created_at, 'yyyy-MM-dd') AS created_date,
CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END AS age_group
Часто переименование колонок и computed expressions нужны для одной цели — нормализовать данные на пути в lakehouse. Например, source-таблица MySQL имеет колонки в snake_case, downstream BigQuery предпочитает camelCase — transform с переименованиями делает это elegantly без кастомного кода.
Routes
Route — mapping source-таблиц в sink-таблицы. Без route Flink CDC создаёт sink-таблицу с тем же именем что и source. С route — переименовывает, объединяет, разделяет.
Simple rename
route:
- source-table: shop.orders
sink-table: lakehouse.cdc.orders_v2
- source-table: shop.products
sink-table: lakehouse.cdc.products_v2
Wildcard routing
Все таблицы базы shop в один sink-database:
route:
- source-table: shop.*
sink-table: lakehouse.cdc.\0
\0 — placeholder для имени таблицы из source pattern. Это переносит shop.orders в lakehouse.cdc.orders, shop.products в lakehouse.cdc.products, и т.д.
Multi-source to single sink (merge)
Слить несколько MySQL-таблиц в одну Paimon-таблицу:
route:
- source-table: shop_us.orders
sink-table: lakehouse.global_orders
description: "US orders"
- source-table: shop_eu.orders
sink-table: lakehouse.global_orders
description: "EU orders"
- source-table: shop_asia.orders
sink-table: lakehouse.global_orders
description: "Asia orders"
Все три source-таблицы попадают в одну sink. Полезно для multi-region setup: shards в разных MySQL -> unified lakehouse-таблица.
Multi-source-to-single-sink требует, чтобы все source-таблицы имели одинаковую схему. Если у shop_us.orders есть колонка country=‘US’, у shop_eu.orders колонка country=‘EU’ — нужны разные source-таблицы либо transform добавляющий country перед merge. Иначе schema mismatch.
Архитектура transform + route
MySQL: shop.orders, shop.products
MySQL source-таблицы. Flink CDC читает CDC events из binlog.Transform
Transform operator. Применяет projection, filter, computed expressions per source table. Filter может drop events; projection меняет row schema.Route
Route operator. Маппит source-table в sink-table. Один-в-один, wildcard, или multi-to-one.Schema evolution
Когда в MySQL делается ALTER TABLE — добавляется колонка, меняется тип, dropping колонки — Flink CDC должен это обработать. Без обработки sink-таблица становится несовместимой со source schema.
pipeline:
schema.change.behavior: evolve
Возможные значения:
evolve— автоматически применяет DDL changes в sink. Add column -> ALTER sink-table. Drop column -> ALTER sink-table. Rename — зависит от sink-коннектора.lenient— игнорирует DDL changes. Sink остаётся со старой схемой; новые колонки в source просто отбрасываются.exception— Flink CDC падает с ошибкой при первом DDL. Самый safe, но требует ручной интервенции.try_evolve— пробует evolve, при ошибке fall-back на lenient.
Что поддерживается на side sink
Не каждый sink-коннектор поддерживает все DDL операции. Грубо:
- Paimon, Iceberg — Add column, Drop column, Rename column (через alter), Change column type (узко).
- Doris/StarRocks — Add column, Drop column. Rename и change type — частично.
- JDBC (Postgres/MySQL) — Add column. Drop column и rename — зависит от настроек.
- Kafka — schema хранится в Schema Registry; evolve работает через compatibility-modes (BACKWARD/FORWARD).
schema.change.behavior: evolve атомично НЕ применяет DDL: между моментом, когда source видит ALTER, и моментом, когда sink применил соответствующий DDL — есть окно времени (миллисекунды-секунды). За это окно могут пройти events со старой схемой. Большинство коннекторов корректно обрабатывают это окно через buffering, но edge cases возможны. В production делайте schema changes в low-traffic время.
Полный pipeline пример
# customer-cdc-pipeline.yaml
source:
type: mysql
name: crm-mysql
hostname: crm-mysql.internal
port: 3306
username: flink_cdc
password: ${MYSQL_PASSWORD}
tables: crm.customers, crm.addresses, crm.orders
server-id: 5400-5404
scan.startup.mode: initial
scan.incremental.snapshot.enabled: true
transform:
- source-table: crm.customers
projection: |
customer_id,
LOWER(email) AS email,
first_name || ' ' || last_name AS full_name,
country_code,
created_at,
updated_at
filter: status = 'active'
primary-keys: customer_id
- source-table: crm.addresses
projection: address_id, customer_id, country_code, city, street, zip
filter: deleted_at IS NULL
primary-keys: address_id
- source-table: crm.orders
projection: order_id, customer_id, amount, status, created_at
filter: status NOT IN ('cancelled', 'draft')
primary-keys: order_id
partition-keys: order_date
sink:
type: paimon
catalog.warehouse: s3://lake/paimon/
catalog.metastore: hive
catalog.uri: thrift://hive-metastore:9083
route:
- source-table: crm.\*
sink-table: lakehouse.crm.\0
pipeline:
name: crm-to-lakehouse
parallelism: 8
schema.change.behavior: evolve
checkpoint-interval: 1min
state.backend: rocksdb
Что этот pipeline делает:
- Подключается к MySQL CRM, читает три таблицы параллельно (incremental snapshot).
- Применяет transforms — фильтрация и normalization для каждой таблицы независимо.
- Routes через wildcard в Paimon database
lakehouse.crm. - Schema evolution автоматическая.
- Checkpoint каждую минуту в RocksDB state backend.
Pre-transform и post-transform
Иногда нужно сделать transformation до route (для решения “куда роутить”), а иногда — после route (для финальной shape под sink-схему).
Flink CDC применяет в порядке: transform -> route. Если нужно после route — это уже отдельная фаза, не в YAML, а в Flink job downstream (через регулярный Table API/SQL).
Попробуй сам
- Напиши transform, который добавляет computed column
is_high_value = amount > 1000для таблицы orders и фильтрует только high-value заказы. - Сделай wildcard route, который переносит все таблицы из MySQL database
shopв Paimon databaselake_shop, добавляя префикс_v2к именам. - Подумай, как обработать ALTER TABLE с переименованием колонки: что произойдёт с
schema.change.behavior: evolveи Paimon sink?