Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 20 мин
Средний
Flink CDCTransformRouteSchema EvolutionPipelineYAML

Transforms и routes

Flink CDC pipeline framework (3.0+) даёт декларативный способ описать что делать с CDC-events без написания Java/Scala/SQL вне YAML. Два главных механизма — transforms (преобразование row-data: projection, filter, computed columns) и routes (mapping source-таблиц в sink-таблицы с возможным переименованием).

В этом уроке разберём оба, плюс schema evolution — как Flink CDC переносит изменения схемы из MySQL в lakehouse автоматически.


Transforms

Transform — это декларативная инструкция, как преобразовать CDC events перед записью в sink. Каждый transform применяется к конкретной source-таблице.

transform:
  - source-table: shop.orders
    projection: order_id, customer_id, amount, status, updated_at, \
                amount * 1.2 AS amount_with_tax, \
                UPPER(status) AS status_upper
    filter: status NOT IN ('cancelled', 'draft') AND amount > 0
    primary-keys: order_id

Что доступно:

  • projection — список колонок и computed expressions. Поддерживает все Flink SQL scalar functions (UPPER, LOWER, CAST, DATE_FORMAT, REGEXP, и т.д.).
  • filter — WHERE условие, фильтрует строки.
  • primary-keys — переопределяет PK (на случай, если source PK не подходит для sink).
  • partition-keys — задаёт partition keys для sink (если sink — partitioned table).
  • table-options — sink-specific параметры (например, для Paimon — bucket count).

Computed columns могут использовать любые SQL-функции. Несколько примеров:

Routing и transforms в Debezium: Single Message Transforms
projection: |
  user_id,
  email,
  REGEXP_EXTRACT(email, '@(.+)$', 1) AS email_domain,
  DATE_FORMAT(created_at, 'yyyy-MM-dd') AS created_date,
  CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END AS age_group
TIP

Часто переименование колонок и computed expressions нужны для одной цели — нормализовать данные на пути в lakehouse. Например, source-таблица MySQL имеет колонки в snake_case, downstream BigQuery предпочитает camelCase — transform с переименованиями делает это elegantly без кастомного кода.


Routes

Route — mapping source-таблиц в sink-таблицы. Без route Flink CDC создаёт sink-таблицу с тем же именем что и source. С route — переименовывает, объединяет, разделяет.

Simple rename

route:
  - source-table: shop.orders
    sink-table: lakehouse.cdc.orders_v2
  - source-table: shop.products
    sink-table: lakehouse.cdc.products_v2

Wildcard routing

Все таблицы базы shop в один sink-database:

route:
  - source-table: shop.*
    sink-table: lakehouse.cdc.\0

\0 — placeholder для имени таблицы из source pattern. Это переносит shop.orders в lakehouse.cdc.orders, shop.products в lakehouse.cdc.products, и т.д.

Multi-source to single sink (merge)

Слить несколько MySQL-таблиц в одну Paimon-таблицу:

route:
  - source-table: shop_us.orders
    sink-table: lakehouse.global_orders
    description: "US orders"
  - source-table: shop_eu.orders
    sink-table: lakehouse.global_orders
    description: "EU orders"
  - source-table: shop_asia.orders
    sink-table: lakehouse.global_orders
    description: "Asia orders"

Все три source-таблицы попадают в одну sink. Полезно для multi-region setup: shards в разных MySQL -> unified lakehouse-таблица.

WARNING

Multi-source-to-single-sink требует, чтобы все source-таблицы имели одинаковую схему. Если у shop_us.orders есть колонка country=‘US’, у shop_eu.orders колонка country=‘EU’ — нужны разные source-таблицы либо transform добавляющий country перед merge. Иначе schema mismatch.


Архитектура transform + route

Transform и Route в CDC pipeline

MySQL: shop.orders, shop.products

MySQL source-таблицы. Flink CDC читает CDC events из binlog.
raw CDC events

Transform

Transform operator. Применяет projection, filter, computed expressions per source table. Filter может drop events; projection меняет row schema.

Route

Route operator. Маппит source-table в sink-table. Один-в-один, wildcard, или multi-to-one.
Sink table 1Paimon table lakehouse.cdc.orders. Получает transformed events для shop.orders.
Sink table 2Paimon table lakehouse.cdc.products. Получает transformed events для shop.products.

Schema evolution

Когда в MySQL делается ALTER TABLE — добавляется колонка, меняется тип, dropping колонки — Flink CDC должен это обработать. Без обработки sink-таблица становится несовместимой со source schema.

pipeline:
  schema.change.behavior: evolve

Возможные значения:

  • evolve — автоматически применяет DDL changes в sink. Add column -> ALTER sink-table. Drop column -> ALTER sink-table. Rename — зависит от sink-коннектора.
  • lenient — игнорирует DDL changes. Sink остаётся со старой схемой; новые колонки в source просто отбрасываются.
  • exception — Flink CDC падает с ошибкой при первом DDL. Самый safe, но требует ручной интервенции.
  • try_evolve — пробует evolve, при ошибке fall-back на lenient.

Что поддерживается на side sink

Не каждый sink-коннектор поддерживает все DDL операции. Грубо:

  • Paimon, Iceberg — Add column, Drop column, Rename column (через alter), Change column type (узко).
  • Doris/StarRocks — Add column, Drop column. Rename и change type — частично.
  • JDBC (Postgres/MySQL) — Add column. Drop column и rename — зависит от настроек.
  • Kafka — schema хранится в Schema Registry; evolve работает через compatibility-modes (BACKWARD/FORWARD).
WARNING

schema.change.behavior: evolve атомично НЕ применяет DDL: между моментом, когда source видит ALTER, и моментом, когда sink применил соответствующий DDL — есть окно времени (миллисекунды-секунды). За это окно могут пройти events со старой схемой. Большинство коннекторов корректно обрабатывают это окно через buffering, но edge cases возможны. В production делайте schema changes в low-traffic время.


Полный pipeline пример

# customer-cdc-pipeline.yaml
source:
  type: mysql
  name: crm-mysql
  hostname: crm-mysql.internal
  port: 3306
  username: flink_cdc
  password: ${MYSQL_PASSWORD}
  tables: crm.customers, crm.addresses, crm.orders
  server-id: 5400-5404
  scan.startup.mode: initial
  scan.incremental.snapshot.enabled: true

transform:
  - source-table: crm.customers
    projection: |
      customer_id,
      LOWER(email) AS email,
      first_name || ' ' || last_name AS full_name,
      country_code,
      created_at,
      updated_at
    filter: status = 'active'
    primary-keys: customer_id

  - source-table: crm.addresses
    projection: address_id, customer_id, country_code, city, street, zip
    filter: deleted_at IS NULL
    primary-keys: address_id

  - source-table: crm.orders
    projection: order_id, customer_id, amount, status, created_at
    filter: status NOT IN ('cancelled', 'draft')
    primary-keys: order_id
    partition-keys: order_date

sink:
  type: paimon
  catalog.warehouse: s3://lake/paimon/
  catalog.metastore: hive
  catalog.uri: thrift://hive-metastore:9083

route:
  - source-table: crm.\*
    sink-table: lakehouse.crm.\0

pipeline:
  name: crm-to-lakehouse
  parallelism: 8
  schema.change.behavior: evolve
  checkpoint-interval: 1min
  state.backend: rocksdb

Что этот pipeline делает:

  1. Подключается к MySQL CRM, читает три таблицы параллельно (incremental snapshot).
  2. Применяет transforms — фильтрация и normalization для каждой таблицы независимо.
  3. Routes через wildcard в Paimon database lakehouse.crm.
  4. Schema evolution автоматическая.
  5. Checkpoint каждую минуту в RocksDB state backend.

Pre-transform и post-transform

Иногда нужно сделать transformation до route (для решения “куда роутить”), а иногда — после route (для финальной shape под sink-схему).

Flink CDC применяет в порядке: transform -> route. Если нужно после route — это уже отдельная фаза, не в YAML, а в Flink job downstream (через регулярный Table API/SQL).


Попробуй сам

  1. Напиши transform, который добавляет computed column is_high_value = amount > 1000 для таблицы orders и фильтрует только high-value заказы.
  2. Сделай wildcard route, который переносит все таблицы из MySQL database shop в Paimon database lake_shop, добавляя префикс _v2 к именам.
  3. Подумай, как обработать ALTER TABLE с переименованием колонки: что произойдёт с schema.change.behavior: evolve и Paimon sink?
Проверка знанийKnowledge check
Команда настраивает Flink CDC pipeline с тремя source таблицами (shop.orders, shop_eu.orders, shop_asia.orders) -> один Paimon sink (lakehouse.global_orders). Все три MySQL-таблицы имеют идентичную схему, но country отличается. После запуска в global_orders приходят orders, но определить из какого региона нельзя — нет country column. Что добавить в pipeline?
ОтветAnswer
Добавить computed column 'country' в transform для каждой source-таблицы с константным значением для соответствующего региона. Например: transform: - source-table: shop.orders projection: order_id, customer_id, amount, status, 'US' AS country - source-table: shop_eu.orders projection: order_id, customer_id, amount, status, 'EU' AS country - source-table: shop_asia.orders projection: order_id, customer_id, amount, status, 'ASIA' AS country Теперь все три source эмитят одинаковую schema (с country column), и Paimon sink lakehouse.global_orders может корректно объединить данные. country становится partition key или просто колонкой для downstream-аналитики. Альтернатива (хуже): использовать computed column на основе hostname source MySQL — но это fragile и зависит от инфраструктуры. Hardcoded строка в transform — самый явный и поддерживаемый способ.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что делает route в Flink CDC pipeline и какие сценарии поддерживает?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4