Перейти к содержанию
Learning Platform
Продвинутый
45 минут
Configuration PostgreSQL MySQL PyFlink Kafka Connect

Требуемые знания:

  • module-7/04-multi-database-architecture

Multi-Database Configuration

В предыдущем уроке вы узнали об архитектуре multi-database CDC pipeline и паттернах объединения данных из PostgreSQL и MySQL. Теперь время перейти к практической конфигурации — настроить оба коннектора и создать unified PyFlink consumer.

Эта конфигурация готова к использованию в вашем расширенном capstone проекте.


Цель урока

К концу урока вы сможете:

  • Настроить PostgreSQL connector с Outbox Event Router SMT
  • Настроить MySQL connector с уникальными параметрами (server.id, schema history topic)
  • Развернуть оба коннектора в Kafka Connect
  • Создать PyFlink consumer, обрабатывающий события из обоих источников
  • Мониторить multi-database CDC pipeline
  • Диагностировать типичные проблемы multi-database deployments

Формат: Complete copy-paste ready конфигурации с пояснениями критических параметров.


MySQL Outbox Table Schema

Прежде чем настраивать MySQL connector, создайте outbox table в MySQL database (параллельно PostgreSQL outbox table из основного capstone).

MySQL Outbox DDL

-- MySQL outbox table schema
-- Сравните с PostgreSQL версией из основного capstone проекта
CREATE TABLE outbox (
    id CHAR(36) PRIMARY KEY DEFAULT (UUID()),  -- MySQL UUID() syntax
    aggregatetype VARCHAR(255) NOT NULL,       -- Routing key (e.g., "orders")
    aggregateid VARCHAR(255) NOT NULL,         -- Entity ID (e.g., order_id)
    type VARCHAR(255) NOT NULL,                -- Event type (e.g., "OrderCreated")
    payload JSON NOT NULL,                     -- Event data (JSON vs PostgreSQL JSONB)
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_created_at (created_at)          -- Cleanup queries index
) ENGINE=InnoDB;

Ключевые отличия от PostgreSQL

АспектPostgreSQLMySQL
UUID generationgen_random_uuid()UUID()
JSON typeJSONB (binary format)JSON (text-based with validation)
REPLICA IDENTITYALTER TABLE outbox REPLICA IDENTITY FULL;Не требуется (binlog ROW format захватывает full row data)
Default storageN/AENGINE=InnoDB (mandatory для binlog)
Note

MySQL Binlog Configuration

MySQL binlog с ROW format автоматически захватывает полные данные строк — не требуется эквивалент PostgreSQL REPLICA IDENTITY FULL. Убедитесь, что binlog_format=ROW установлен (см. Module 8 - MySQL Configuration).


PostgreSQL Connector Configuration

Конфигурация PostgreSQL connector для multi-database deployment (с небольшими модификациями для уникальности topic naming).

Connector JSON Config

{
  "name": "postgres-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

    // Database connection
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password",
    "database.dbname": "ecommerce",

    // Logical name (используется в topic naming)
    "database.server.name": "postgres_prod",

    // Table filtering - только outbox table
    "table.include.list": "public.outbox",

    // PostgreSQL logical decoding
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "slot.name": "debezium_outbox_slot_pg",

    // Operational parameters
    "heartbeat.interval.ms": "10000",
    "snapshot.mode": "when_needed",

    // Outbox Event Router SMT
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregatetype",
    "transforms.outbox.route.topic.replacement": "outbox.event.postgres.${routedByValue}",
    "transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",

    // Serialization
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

Критические параметры для Multi-Database

ParameterValuePurpose
database.server.namepostgres_prodУникальный logical name (отличается от MySQL connector)
slot.namedebezium_outbox_slot_pgReplication slot name (уникален per connector)
transforms.outbox.route.topic.replacementoutbox.event.postgres.${routedByValue}Topic prefix включает postgres для разделения источников

Topic naming пример:

  • Событие с aggregatetype = "orders" → topic outbox.event.postgres.orders
  • Событие с aggregatetype = "customers" → topic outbox.event.postgres.customers

MySQL Connector Configuration

MySQL connector требует дополнительные параметры, которых нет в PostgreSQL connector: database.server.id и schema history topic.

Connector JSON Config

{
  "name": "mysql-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    // Database connection
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium_user",
    "database.password": "secure_password",

    // Logical name (уникальный per connector)
    "database.server.name": "mysql_prod",

    // MySQL-specific: Server ID (must be unique)
    "database.server.id": "184054",

    // Table filtering - только outbox table
    "table.include.list": "ecommerce.outbox",

    // MySQL-specific: Schema history topic (КРИТИЧНО - must be unique per connector)
    "schema.history.internal.kafka.topic": "schema-changes.mysql-outbox",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",

    // Operational parameters
    "heartbeat.interval.ms": "10000",
    "snapshot.mode": "when_needed",
    "snapshot.locking.mode": "minimal",

    // Outbox Event Router SMT (identical to PostgreSQL)
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregatetype",
    "transforms.outbox.route.topic.replacement": "outbox.event.mysql.${routedByValue}",
    "transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",

    // Serialization
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

MySQL-Specific Parameters Explained

Warning

CRITICAL: Unique Schema History Topic

schema.history.internal.kafka.topic MUST be unique per MySQL connector. Sharing schema history topic между connectors приводит к DDL pollution — connector получает DDL changes от другого connector и fails с schema mismatch errors.

Rule: One connector = one schema history topic.

Note

database.server.id Selection

database.server.id должен быть уникальным и НЕ конфликтовать с server_id MySQL server или других MySQL connectors.

Recommended range: 184000-184999 для Debezium connectors (см. Module 8 - Multi-Connector Deployment для registry pattern).

Example assignment:

  • MySQL Server: server_id = 1
  • Connector 1: database.server.id = 184054
  • Connector 2: database.server.id = 184055

Topic naming пример:

  • Событие с aggregatetype = "orders" → topic outbox.event.mysql.orders
  • Событие с aggregatetype = "customers" → topic outbox.event.mysql.customers
Проверка знаний
Какие три параметра MySQL connector ОБЯЗАНЫ быть уникальными в multi-database deployment и почему?
Ответ
(1) database.server.name — используется как prefix в Kafka topic names; дублирование приводит к коллизии топиков и смешиванию событий. (2) database.server.id — MySQL видит каждый connector как replica-сервер; дублирование вызывает отключение одного из readers. (3) schema.history.internal.kafka.topic — DDL от одного connector загрязняет историю другого, вызывая schema mismatch при restart.

Deploying Both Connectors

Разверните оба коннектора через Kafka Connect REST API.

Deployment Commands

# Deploy PostgreSQL connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-outbox-connector.json

# Deploy MySQL connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-outbox-connector.json

# Verify both connectors deployed
curl http://localhost:8083/connectors | jq

Expected output:

[
  "postgres-outbox-connector",
  "mysql-outbox-connector"
]

Verification Checklist

# Check PostgreSQL connector status
curl http://localhost:8083/connectors/postgres-outbox-connector/status | jq

# Expected: "state": "RUNNING"

# Check MySQL connector status
curl http://localhost:8083/connectors/mysql-outbox-connector/status | jq

# Expected: "state": "RUNNING"

# List Kafka topics (should see both postgres and mysql topics)
kafka-topics --bootstrap-server kafka:9092 --list | grep outbox.event

# Expected output:
# outbox.event.postgres.orders
# outbox.event.mysql.orders
# schema-changes.mysql-outbox
Tip

Connector Health Check

Регулярно проверяйте connector status:

# Watch connector status
watch -n 5 'curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq .connector.state'

Healthy state: "RUNNING" с 0 errors в tasks.


Unified PyFlink consumer, обрабатывающий события из обоих PostgreSQL и MySQL topics.

from pyflink.table import EnvironmentSettings, TableEnvironment

# Initialize Flink Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Configure checkpointing (fault tolerance)
table_env.get_config().set("execution.checkpointing.interval", "60000")  # 60s
table_env.get_config().set("parallelism.default", "2")
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")

# Define PostgreSQL source
postgres_source_ddl = """
    CREATE TABLE orders_postgres_cdc (
        order_id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        status STRING,
        total_amount DECIMAL(10, 2),
        created_at TIMESTAMP(3),
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'outbox.event.postgres.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'pyflink-multi-db-processor',
        'format' = 'debezium-json',
        'scan.startup.mode' = 'earliest-offset',
        'debezium-json.schema-include' = 'false'
    )
"""
table_env.execute_sql(postgres_source_ddl)

# Define MySQL source
mysql_source_ddl = """
    CREATE TABLE orders_mysql_cdc (
        order_id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        status STRING,
        total_amount DECIMAL(10, 2),
        created_at TIMESTAMP(3),
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'outbox.event.mysql.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'pyflink-multi-db-processor',
        'format' = 'debezium-json',
        'scan.startup.mode' = 'earliest-offset',
        'debezium-json.schema-include' = 'false'
    )
"""
table_env.execute_sql(mysql_source_ddl)

# Unified view combining both sources with source tracking
unified_view_sql = """
    CREATE VIEW unified_orders AS
    SELECT
        order_id,
        customer_id,
        product_id,
        status,
        total_amount,
        created_at,
        'postgresql' AS source_database
    FROM orders_postgres_cdc
    UNION ALL
    SELECT
        order_id,
        customer_id,
        product_id,
        status,
        total_amount,
        created_at,
        'mysql' AS source_database
    FROM orders_mysql_cdc
"""
table_env.execute_sql(unified_view_sql)

# Define transformation view with enrichment
transform_sql = """
    CREATE VIEW enriched_orders AS
    SELECT
        order_id,
        customer_id,
        product_id,
        status,
        total_amount,
        created_at,
        source_database,
        CASE
            WHEN status = 'completed' THEN 'FULFILLED'
            WHEN status = 'cancelled' THEN 'CANCELLED'
            ELSE 'PENDING'
        END AS fulfillment_status,
        CASE source_database
            WHEN 'postgresql' THEN 'PG-' || CAST(order_id AS STRING)
            WHEN 'mysql' THEN 'MY-' || CAST(order_id AS STRING)
        END AS composite_key,
        CURRENT_TIMESTAMP AS processed_at
    FROM unified_orders
"""
table_env.execute_sql(transform_sql)

# Define BigQuery sink (via Kafka topic)
sink_ddl = """
    CREATE TABLE orders_bigquery_sink (
        order_id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        status STRING,
        total_amount DECIMAL(10, 2),
        created_at TIMESTAMP(3),
        source_database STRING,
        fulfillment_status STRING,
        composite_key STRING,
        processed_at TIMESTAMP(3),
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'bigquery.unified.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
"""
table_env.execute_sql(sink_ddl)

# Execute streaming query
print("Starting unified multi-database CDC processor...")
table_env.execute_sql("""
    INSERT INTO orders_bigquery_sink
    SELECT * FROM enriched_orders
""").wait()

Key Patterns Explained

PatternImplementationPurpose
UNION ALLSELECT ... FROM orders_postgres_cdc UNION ALL SELECT ... FROM orders_mysql_cdcMerge streams from both sources
Source tracking'postgresql' AS source_databaseIdentify origin database for audit and troubleshooting
Composite key'PG-' || CAST(order_id AS STRING)Prevent key conflicts if same order_id exists in both databases
Debezium format'format' = 'debezium-json'Automatic parsing of CDC envelope (before/after/op)
Duplicate handling'table.exec.source.cdc-events-duplicate' = 'true'At-least-once semantics with CDC
Note

Why source_database Field?

source_database column критичен для:

  1. Troubleshooting: Быстро определить, какой connector создал событие
  2. Audit trail: Track origin в data lineage
  3. Conditional processing: Разная логика для PostgreSQL vs MySQL events
  4. Monitoring: Metrics per source database

Monitoring Multi-Database Connectors

Multi-database CDC pipeline требует мониторинга обоих connector types с учетом их operational differences.

Monitoring Architecture

Monitoring Multi-Database CDC

Unified observability для PostgreSQL + MySQL connectors

PostgreSQL Metrics
WAL Lag (bytes)
Slot Status
MySQL Metrics
Binlog Lag (time)
Binlog Position
Export JMX metrics
Prometheus
(unified scraping)
Grafana Dashboard
(multi-database view)

Key Differences в metrics:

  • PostgreSQL: WAL lag измеряется в байтах (pg_wal_lsn_diff)
  • MySQL: Binlog lag измеряется во времени (MilliSecondsBehindSource)
  • • Unified view: normalize обе метрики к latency (ms) для сравнения
  • • source_database column в CDC events критична для traceability

Key Metrics by Connector Type

PostgreSQL Connector Metrics

-- Direct PostgreSQL query для WAL lag monitoring
SELECT
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_bytes,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes_numeric
FROM pg_replication_slots
WHERE slot_name = 'debezium_outbox_slot_pg';

Prometheus exporter config:

# postgres_exporter query
- query: |
    SELECT
      slot_name,
      pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
    FROM pg_replication_slots
    WHERE slot_name = 'debezium_outbox_slot_pg'
  metrics:
    - slot_name:
        usage: LABEL
    - lag_bytes:
        usage: GAUGE
        description: "PostgreSQL replication slot lag in bytes"

MySQL Connector Metrics

# PromQL query для MySQL replication lag
debezium_metrics_MilliSecondsBehindSource{
    connector="mysql-outbox-connector"
}

# Throughput (events/second)
rate(debezium_metrics_TotalNumberOfEventsSeen{
    connector="mysql-outbox-connector"
}[1m])

# Connector task status (1=RUNNING, 0=FAILED)
kafka_connect_connector_status{
    connector="mysql-outbox-connector"
}

Unified Grafana Dashboard Panels

PanelMetric SourceQueryAlert Threshold
PostgreSQL WAL LagPostgreSQL exporterpg_replication_slot_lag_bytes{slot_name="debezium_outbox_slot_pg"}> 100 MB
MySQL Time LagJMX exporterdebezium_metrics_MilliSecondsBehindSource{connector="mysql-outbox-connector"}> 10000 ms (10s)
Combined Events/SecJMX exportersum(rate(debezium_metrics_TotalNumberOfEventsSeen[1m]))< 0.1 (no traffic)
Connector StatusKafka Connect APIkafka_connect_connector_status< 1 (failed)
PyFlink Checkpoint DurationFlink metricsflink_jobmanager_job_lastCheckpointDuration> 60000 ms (1 min)
Warning

Different Lag Metrics

PostgreSQL and MySQL используют разные lag metrics:

  • PostgreSQL: WAL lag в байтах (volume of unprocessed WAL data)
  • MySQL: Time lag в миллисекундах (time difference between binlog event timestamp and processing time)

Не сравнивайте их напрямую. Установите отдельные alert thresholds для каждого типа.


Verification Checklist

Пошаговая проверка multi-database CDC pipeline.

Infrastructure Verification

  • Both connectors RUNNING

    curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq '.connector.state'
    # Expected: "RUNNING"
    
    curl -s http://localhost:8083/connectors/mysql-outbox-connector/status | jq '.connector.state'
    # Expected: "RUNNING"
  • PostgreSQL replication slot active

    SELECT slot_name, active FROM pg_replication_slots WHERE slot_name = 'debezium_outbox_slot_pg';
    -- Expected: active = true
  • MySQL binlog enabled

    SHOW VARIABLES LIKE 'binlog_format';
    -- Expected: ROW

Event Flow Verification

  • PostgreSQL topic receives events

    kafka-console-consumer --bootstrap-server kafka:9092 \
      --topic outbox.event.postgres.orders \
      --from-beginning --max-messages 1
  • MySQL topic receives events

    kafka-console-consumer --bootstrap-server kafka:9092 \
      --topic outbox.event.mysql.orders \
      --from-beginning --max-messages 1
  • PyFlink consumer processes both streams

    kafka-console-consumer --bootstrap-server kafka:9092 \
      --topic bigquery.unified.orders \
      --from-beginning --max-messages 5 | jq '.source_database'
    
    # Expected: Mix of "postgresql" and "mysql"
  • source_database column correctly identifies origin

    # Insert test event in PostgreSQL
    INSERT INTO outbox (aggregatetype, aggregateid, type, payload)
    VALUES ('orders', '1001', 'OrderCreated', '{"amount": 100}');
    
    # Insert test event in MySQL
    INSERT INTO outbox (aggregatetype, aggregateid, type, payload)
    VALUES ('orders', '1002', 'OrderCreated', '{"amount": 200}');
    
    # Verify in unified topic
    kafka-console-consumer --bootstrap-server kafka:9092 \
      --topic bigquery.unified.orders --from-beginning | jq '.source_database'
    
    # Expected: "postgresql" for order_id 1001, "mysql" for order_id 1002

Common Issues and Troubleshooting

Типичные проблемы multi-database CDC deployments и их решения.

IssueSymptomDiagnosisRemediation
Duplicate server_idMySQL connector fails с “Server ID already in use”SHOW SLAVE HOSTS; показывает конфликтВыбрать уникальный database.server.id (184000-184999 range)
Schema mismatch errorsMySQL connector fails после restartShared schema.history.internal.kafka.topic between connectorsСоздать unique schema history topic per connector
Topic naming conflictsEvents пропадают, только один source виденIdentical database.server.name valuesUse distinct logical names: postgres_prod vs mysql_prod
PyFlink parsing errorsFlink job fails с “Cannot parse Debezium event”Incorrect format specificationEnsure 'format' = 'debezium-json' в DDL
PostgreSQL WAL bloatDisk usage растетReplication slot не consumer WAL из-за connector failureCheck connector status, restart connector, drop inactive slots
MySQL binlog purgedConnector fails с “binlog file no longer available”binlog_expire_logs_seconds слишком короткийIncrease retention или resnapshot connector
No events in unified topicPyFlink sink пустConsumer group offset tracking issuesReset consumer group: --reset-offsets --to-earliest
Mixed schema versionsConsumer crashes на schema incompatibilityPostgreSQL and MySQL outbox tables have different schemasAlign outbox table schemas across databases
Проверка знаний
Зачем PyFlink multi-source consumer добавляет колонку source_database и composite_key при UNION ALL потоков из PostgreSQL и MySQL?
Ответ
source_database (postgresql/mysql) обеспечивает traceability: при troubleshooting можно быстро определить origin database для каждого события. composite_key (PG-{id}/MY-{id}) предотвращает конфликт primary key: order_id=5 в PostgreSQL и order_id=5 в MySQL — разные записи, но при MERGE в BigQuery по order_id одна перезапишет другую. Composite key гарантирует уникальность в unified stream.

Debugging Commands

# Check connector logs
curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq '.tasks[0].trace'

# Verify topic partitions
kafka-topics --bootstrap-server kafka:9092 --describe --topic outbox.event.postgres.orders

# Check consumer group lag
kafka-consumer-groups --bootstrap-server kafka:9092 \
  --group pyflink-multi-db-processor --describe

# PostgreSQL replication slot diagnostics
SELECT
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

# MySQL binlog diagnostics
SHOW BINARY LOGS;
SHOW MASTER STATUS;
Tip

Isolation Testing

При troubleshooting:

  1. Остановите один connector для изоляции issue source
  2. Проверьте каждый connector независимо
  3. Verify consumer обрабатывает each topic отдельно перед unified processing
  4. Check topic data с kafka-console-consumer для schema validation

Ключевые выводы

  1. PostgreSQL и MySQL connectors сосуществуют в одном Kafka Connect cluster без конфликтов при правильной конфигурации
  2. MySQL требует unique parameters: database.server.id и schema.history.internal.kafka.topic must be unique per connector
  3. Topic naming convention критично: Use database prefix (postgres/mysql) для source traceability
  4. PyFlink UNION ALL pattern объединяет streams с source_database tracking column
  5. Monitoring requires database-specific metrics: PostgreSQL WAL lag (bytes) vs MySQL time lag (milliseconds)
  6. Schema consistency across databases предотвращает consumer failures
  7. Shared schema history topic = data corruption для MySQL connectors
  8. Composite key strategy предотвращает conflicts если same IDs exist в обоих databases
  9. Verification checklist обязателен для multi-database deployments
  10. Isolation testing упрощает troubleshooting — verify each connector independently перед unified processing

Что дальше?

В следующем уроке Multi-Database Checklist Extension вы расширите capstone project checklist с multi-database requirements:

  • Extended success criteria для dual-source CDC
  • Testing procedures для cross-database scenarios
  • Documentation templates для multi-database architecture
  • Production readiness requirements для multi-connector deployments

Используйте конфигурации из этого урока как copy-paste ready starting point для вашего расширенного capstone проекта!

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В multi-database конфигурации PostgreSQL connector использует topic replacement outbox.event.postgres.${routedByValue}, а MySQL -- outbox.event.mysql.${routedByValue}. Зачем в topic naming включен тип базы данных?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс