Prerequisites:
- module-7/04-multi-database-architecture
Multi-Database Configuration
В предыдущем уроке вы узнали об архитектуре multi-database CDC pipeline и паттернах объединения данных из PostgreSQL и MySQL. Теперь время перейти к практической конфигурации — настроить оба коннектора и создать unified PyFlink consumer.
Эта конфигурация готова к использованию в вашем расширенном capstone проекте.
Цель урока
К концу урока вы сможете:
- Настроить PostgreSQL connector с Outbox Event Router SMT
- Настроить MySQL connector с уникальными параметрами (server.id, schema history topic)
- Развернуть оба коннектора в Kafka Connect
- Создать PyFlink consumer, обрабатывающий события из обоих источников
- Мониторить multi-database CDC pipeline
- Диагностировать типичные проблемы multi-database deployments
Формат: Complete copy-paste ready конфигурации с пояснениями критических параметров.
MySQL Outbox Table Schema
Прежде чем настраивать MySQL connector, создайте outbox table в MySQL database (параллельно PostgreSQL outbox table из основного capstone).
MySQL Outbox DDL
-- MySQL outbox table schema
-- Сравните с PostgreSQL версией из основного capstone проекта
CREATE TABLE outbox (
id CHAR(36) PRIMARY KEY DEFAULT (UUID()), -- MySQL UUID() syntax
aggregatetype VARCHAR(255) NOT NULL, -- Routing key (e.g., "orders")
aggregateid VARCHAR(255) NOT NULL, -- Entity ID (e.g., order_id)
type VARCHAR(255) NOT NULL, -- Event type (e.g., "OrderCreated")
payload JSON NOT NULL, -- Event data (JSON vs PostgreSQL JSONB)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_created_at (created_at) -- Cleanup queries index
) ENGINE=InnoDB;
Ключевые отличия от PostgreSQL
| Аспект | PostgreSQL | MySQL |
|---|---|---|
| UUID generation | gen_random_uuid() | UUID() |
| JSON type | JSONB (binary format) | JSON (text-based with validation) |
| REPLICA IDENTITY | ALTER TABLE outbox REPLICA IDENTITY FULL; | Не требуется (binlog ROW format захватывает full row data) |
| Default storage | N/A | ENGINE=InnoDB (mandatory для binlog) |
MySQL Binlog Configuration
MySQL binlog с ROW format автоматически захватывает полные данные строк — не требуется эквивалент PostgreSQL REPLICA IDENTITY FULL. Убедитесь, что binlog_format=ROW установлен (см. Module 8 - MySQL Configuration).
PostgreSQL Connector Configuration
Конфигурация PostgreSQL connector для multi-database deployment (с небольшими модификациями для уникальности topic naming).
Connector JSON Config
{
"name": "postgres-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Database connection
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password",
"database.dbname": "ecommerce",
// Logical name (используется в topic naming)
"database.server.name": "postgres_prod",
// Table filtering - только outbox table
"table.include.list": "public.outbox",
// PostgreSQL logical decoding
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_outbox_slot_pg",
// Operational parameters
"heartbeat.interval.ms": "10000",
"snapshot.mode": "when_needed",
// Outbox Event Router SMT
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.postgres.${routedByValue}",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",
// Serialization
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
Критические параметры для Multi-Database
| Parameter | Value | Purpose |
|---|---|---|
database.server.name | postgres_prod | Уникальный logical name (отличается от MySQL connector) |
slot.name | debezium_outbox_slot_pg | Replication slot name (уникален per connector) |
transforms.outbox.route.topic.replacement | outbox.event.postgres.${routedByValue} | Topic prefix включает postgres для разделения источников |
Topic naming пример:
- Событие с
aggregatetype = "orders"→ topicoutbox.event.postgres.orders - Событие с
aggregatetype = "customers"→ topicoutbox.event.postgres.customers
MySQL Connector Configuration
MySQL connector требует дополнительные параметры, которых нет в PostgreSQL connector: database.server.id и schema history topic.
Connector JSON Config
{
"name": "mysql-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// Database connection
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium_user",
"database.password": "secure_password",
// Logical name (уникальный per connector)
"database.server.name": "mysql_prod",
// MySQL-specific: Server ID (must be unique)
"database.server.id": "184054",
// Table filtering - только outbox table
"table.include.list": "ecommerce.outbox",
// MySQL-specific: Schema history topic (КРИТИЧНО - must be unique per connector)
"schema.history.internal.kafka.topic": "schema-changes.mysql-outbox",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
// Operational parameters
"heartbeat.interval.ms": "10000",
"snapshot.mode": "when_needed",
"snapshot.locking.mode": "minimal",
// Outbox Event Router SMT (identical to PostgreSQL)
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.mysql.${routedByValue}",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",
// Serialization
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
MySQL-Specific Parameters Explained
CRITICAL: Unique Schema History Topic
schema.history.internal.kafka.topic MUST be unique per MySQL connector. Sharing schema history topic между connectors приводит к DDL pollution — connector получает DDL changes от другого connector и fails с schema mismatch errors.
Rule: One connector = one schema history topic.
database.server.id Selection
database.server.id должен быть уникальным и НЕ конфликтовать с server_id MySQL server или других MySQL connectors.
Recommended range: 184000-184999 для Debezium connectors (см. Module 8 - Multi-Connector Deployment для registry pattern).
Example assignment:
- MySQL Server: server_id = 1
- Connector 1: database.server.id = 184054
- Connector 2: database.server.id = 184055
Topic naming пример:
- Событие с
aggregatetype = "orders"→ topicoutbox.event.mysql.orders - Событие с
aggregatetype = "customers"→ topicoutbox.event.mysql.customers
Проверка знанийКакие три параметра MySQL connector ОБЯЗАНЫ быть уникальными в multi-database deployment и почему?
Deploying Both Connectors
Разверните оба коннектора через Kafka Connect REST API.
Deployment Commands
# Deploy PostgreSQL connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-outbox-connector.json
# Deploy MySQL connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-outbox-connector.json
# Verify both connectors deployed
curl http://localhost:8083/connectors | jq
Expected output:
[
"postgres-outbox-connector",
"mysql-outbox-connector"
]
Verification Checklist
# Check PostgreSQL connector status
curl http://localhost:8083/connectors/postgres-outbox-connector/status | jq
# Expected: "state": "RUNNING"
# Check MySQL connector status
curl http://localhost:8083/connectors/mysql-outbox-connector/status | jq
# Expected: "state": "RUNNING"
# List Kafka topics (should see both postgres and mysql topics)
kafka-topics --bootstrap-server kafka:9092 --list | grep outbox.event
# Expected output:
# outbox.event.postgres.orders
# outbox.event.mysql.orders
# schema-changes.mysql-outbox
Connector Health Check
Регулярно проверяйте connector status:
# Watch connector status
watch -n 5 'curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq .connector.state'Healthy state: "RUNNING" с 0 errors в tasks.
PyFlink Multi-Source Consumer
Unified PyFlink consumer, обрабатывающий события из обоих PostgreSQL и MySQL topics.
Complete PyFlink Job
from pyflink.table import EnvironmentSettings, TableEnvironment
# Initialize Flink Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Configure checkpointing (fault tolerance)
table_env.get_config().set("execution.checkpointing.interval", "60000") # 60s
table_env.get_config().set("parallelism.default", "2")
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")
# Define PostgreSQL source
postgres_source_ddl = """
CREATE TABLE orders_postgres_cdc (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
status STRING,
total_amount DECIMAL(10, 2),
created_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'outbox.event.postgres.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pyflink-multi-db-processor',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'false'
)
"""
table_env.execute_sql(postgres_source_ddl)
# Define MySQL source
mysql_source_ddl = """
CREATE TABLE orders_mysql_cdc (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
status STRING,
total_amount DECIMAL(10, 2),
created_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'outbox.event.mysql.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pyflink-multi-db-processor',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'false'
)
"""
table_env.execute_sql(mysql_source_ddl)
# Unified view combining both sources with source tracking
unified_view_sql = """
CREATE VIEW unified_orders AS
SELECT
order_id,
customer_id,
product_id,
status,
total_amount,
created_at,
'postgresql' AS source_database
FROM orders_postgres_cdc
UNION ALL
SELECT
order_id,
customer_id,
product_id,
status,
total_amount,
created_at,
'mysql' AS source_database
FROM orders_mysql_cdc
"""
table_env.execute_sql(unified_view_sql)
# Define transformation view with enrichment
transform_sql = """
CREATE VIEW enriched_orders AS
SELECT
order_id,
customer_id,
product_id,
status,
total_amount,
created_at,
source_database,
CASE
WHEN status = 'completed' THEN 'FULFILLED'
WHEN status = 'cancelled' THEN 'CANCELLED'
ELSE 'PENDING'
END AS fulfillment_status,
CASE source_database
WHEN 'postgresql' THEN 'PG-' || CAST(order_id AS STRING)
WHEN 'mysql' THEN 'MY-' || CAST(order_id AS STRING)
END AS composite_key,
CURRENT_TIMESTAMP AS processed_at
FROM unified_orders
"""
table_env.execute_sql(transform_sql)
# Define BigQuery sink (via Kafka topic)
sink_ddl = """
CREATE TABLE orders_bigquery_sink (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
status STRING,
total_amount DECIMAL(10, 2),
created_at TIMESTAMP(3),
source_database STRING,
fulfillment_status STRING,
composite_key STRING,
processed_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'bigquery.unified.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
table_env.execute_sql(sink_ddl)
# Execute streaming query
print("Starting unified multi-database CDC processor...")
table_env.execute_sql("""
INSERT INTO orders_bigquery_sink
SELECT * FROM enriched_orders
""").wait()
Key Patterns Explained
| Pattern | Implementation | Purpose |
|---|---|---|
| UNION ALL | SELECT ... FROM orders_postgres_cdc UNION ALL SELECT ... FROM orders_mysql_cdc | Merge streams from both sources |
| Source tracking | 'postgresql' AS source_database | Identify origin database for audit and troubleshooting |
| Composite key | 'PG-' || CAST(order_id AS STRING) | Prevent key conflicts if same order_id exists in both databases |
| Debezium format | 'format' = 'debezium-json' | Automatic parsing of CDC envelope (before/after/op) |
| Duplicate handling | 'table.exec.source.cdc-events-duplicate' = 'true' | At-least-once semantics with CDC |
Why source_database Field?
source_database column критичен для:
- Troubleshooting: Быстро определить, какой connector создал событие
- Audit trail: Track origin в data lineage
- Conditional processing: Разная логика для PostgreSQL vs MySQL events
- Monitoring: Metrics per source database
Monitoring Multi-Database Connectors
Multi-database CDC pipeline требует мониторинга обоих connector types с учетом их operational differences.
Monitoring Architecture
Unified observability для PostgreSQL + MySQL connectors
Key Differences в metrics:
- • PostgreSQL: WAL lag измеряется в байтах (pg_wal_lsn_diff)
- • MySQL: Binlog lag измеряется во времени (MilliSecondsBehindSource)
- • Unified view: normalize обе метрики к latency (ms) для сравнения
- • source_database column в CDC events критична для traceability
Key Metrics by Connector Type
PostgreSQL Connector Metrics
-- Direct PostgreSQL query для WAL lag monitoring
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_bytes,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes_numeric
FROM pg_replication_slots
WHERE slot_name = 'debezium_outbox_slot_pg';
Prometheus exporter config:
# postgres_exporter query
- query: |
SELECT
slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_outbox_slot_pg'
metrics:
- slot_name:
usage: LABEL
- lag_bytes:
usage: GAUGE
description: "PostgreSQL replication slot lag in bytes"
MySQL Connector Metrics
# PromQL query для MySQL replication lag
debezium_metrics_MilliSecondsBehindSource{
connector="mysql-outbox-connector"
}
# Throughput (events/second)
rate(debezium_metrics_TotalNumberOfEventsSeen{
connector="mysql-outbox-connector"
}[1m])
# Connector task status (1=RUNNING, 0=FAILED)
kafka_connect_connector_status{
connector="mysql-outbox-connector"
}
Unified Grafana Dashboard Panels
| Panel | Metric Source | Query | Alert Threshold |
|---|---|---|---|
| PostgreSQL WAL Lag | PostgreSQL exporter | pg_replication_slot_lag_bytes{slot_name="debezium_outbox_slot_pg"} | > 100 MB |
| MySQL Time Lag | JMX exporter | debezium_metrics_MilliSecondsBehindSource{connector="mysql-outbox-connector"} | > 10000 ms (10s) |
| Combined Events/Sec | JMX exporter | sum(rate(debezium_metrics_TotalNumberOfEventsSeen[1m])) | < 0.1 (no traffic) |
| Connector Status | Kafka Connect API | kafka_connect_connector_status | < 1 (failed) |
| PyFlink Checkpoint Duration | Flink metrics | flink_jobmanager_job_lastCheckpointDuration | > 60000 ms (1 min) |
Different Lag Metrics
PostgreSQL and MySQL используют разные lag metrics:
- PostgreSQL: WAL lag в байтах (volume of unprocessed WAL data)
- MySQL: Time lag в миллисекундах (time difference between binlog event timestamp and processing time)
Не сравнивайте их напрямую. Установите отдельные alert thresholds для каждого типа.
Verification Checklist
Пошаговая проверка multi-database CDC pipeline.
Infrastructure Verification
-
Both connectors RUNNING
curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq '.connector.state' # Expected: "RUNNING" curl -s http://localhost:8083/connectors/mysql-outbox-connector/status | jq '.connector.state' # Expected: "RUNNING" -
PostgreSQL replication slot active
SELECT slot_name, active FROM pg_replication_slots WHERE slot_name = 'debezium_outbox_slot_pg'; -- Expected: active = true -
MySQL binlog enabled
SHOW VARIABLES LIKE 'binlog_format'; -- Expected: ROW
Event Flow Verification
-
PostgreSQL topic receives events
kafka-console-consumer --bootstrap-server kafka:9092 \ --topic outbox.event.postgres.orders \ --from-beginning --max-messages 1 -
MySQL topic receives events
kafka-console-consumer --bootstrap-server kafka:9092 \ --topic outbox.event.mysql.orders \ --from-beginning --max-messages 1 -
PyFlink consumer processes both streams
kafka-console-consumer --bootstrap-server kafka:9092 \ --topic bigquery.unified.orders \ --from-beginning --max-messages 5 | jq '.source_database' # Expected: Mix of "postgresql" and "mysql" -
source_database column correctly identifies origin
# Insert test event in PostgreSQL INSERT INTO outbox (aggregatetype, aggregateid, type, payload) VALUES ('orders', '1001', 'OrderCreated', '{"amount": 100}'); # Insert test event in MySQL INSERT INTO outbox (aggregatetype, aggregateid, type, payload) VALUES ('orders', '1002', 'OrderCreated', '{"amount": 200}'); # Verify in unified topic kafka-console-consumer --bootstrap-server kafka:9092 \ --topic bigquery.unified.orders --from-beginning | jq '.source_database' # Expected: "postgresql" for order_id 1001, "mysql" for order_id 1002
Common Issues and Troubleshooting
Типичные проблемы multi-database CDC deployments и их решения.
| Issue | Symptom | Diagnosis | Remediation |
|---|---|---|---|
| Duplicate server_id | MySQL connector fails с “Server ID already in use” | SHOW SLAVE HOSTS; показывает конфликт | Выбрать уникальный database.server.id (184000-184999 range) |
| Schema mismatch errors | MySQL connector fails после restart | Shared schema.history.internal.kafka.topic between connectors | Создать unique schema history topic per connector |
| Topic naming conflicts | Events пропадают, только один source виден | Identical database.server.name values | Use distinct logical names: postgres_prod vs mysql_prod |
| PyFlink parsing errors | Flink job fails с “Cannot parse Debezium event” | Incorrect format specification | Ensure 'format' = 'debezium-json' в DDL |
| PostgreSQL WAL bloat | Disk usage растет | Replication slot не consumer WAL из-за connector failure | Check connector status, restart connector, drop inactive slots |
| MySQL binlog purged | Connector fails с “binlog file no longer available” | binlog_expire_logs_seconds слишком короткий | Increase retention или resnapshot connector |
| No events in unified topic | PyFlink sink пуст | Consumer group offset tracking issues | Reset consumer group: --reset-offsets --to-earliest |
| Mixed schema versions | Consumer crashes на schema incompatibility | PostgreSQL and MySQL outbox tables have different schemas | Align outbox table schemas across databases |
Проверка знанийЗачем PyFlink multi-source consumer добавляет колонку source_database и composite_key при UNION ALL потоков из PostgreSQL и MySQL?
Debugging Commands
# Check connector logs
curl -s http://localhost:8083/connectors/postgres-outbox-connector/status | jq '.tasks[0].trace'
# Verify topic partitions
kafka-topics --bootstrap-server kafka:9092 --describe --topic outbox.event.postgres.orders
# Check consumer group lag
kafka-consumer-groups --bootstrap-server kafka:9092 \
--group pyflink-multi-db-processor --describe
# PostgreSQL replication slot diagnostics
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
# MySQL binlog diagnostics
SHOW BINARY LOGS;
SHOW MASTER STATUS;
Isolation Testing
При troubleshooting:
- Остановите один connector для изоляции issue source
- Проверьте каждый connector независимо
- Verify consumer обрабатывает each topic отдельно перед unified processing
- Check topic data с
kafka-console-consumerдля schema validation
Ключевые выводы
- PostgreSQL и MySQL connectors сосуществуют в одном Kafka Connect cluster без конфликтов при правильной конфигурации
- MySQL требует unique parameters:
database.server.idиschema.history.internal.kafka.topicmust be unique per connector - Topic naming convention критично: Use database prefix (
postgres/mysql) для source traceability - PyFlink UNION ALL pattern объединяет streams с
source_databasetracking column - Monitoring requires database-specific metrics: PostgreSQL WAL lag (bytes) vs MySQL time lag (milliseconds)
- Schema consistency across databases предотвращает consumer failures
- Shared schema history topic = data corruption для MySQL connectors
- Composite key strategy предотвращает conflicts если same IDs exist в обоих databases
- Verification checklist обязателен для multi-database deployments
- Isolation testing упрощает troubleshooting — verify each connector independently перед unified processing
Что дальше?
В следующем уроке Multi-Database Checklist Extension вы расширите capstone project checklist с multi-database requirements:
- Extended success criteria для dual-source CDC
- Testing procedures для cross-database scenarios
- Documentation templates для multi-database architecture
- Production readiness requirements для multi-connector deployments
Используйте конфигурации из этого урока как copy-paste ready starting point для вашего расширенного capstone проекта!
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress